Skip to content

HOWTO flush kafka backlog

Why do that


Be aware that this procedure will make you loose data that hasn 't been processed yet !

Let 's say you have a huge load of backlog to treat that will take you a huge amount of time to cope with. You consider that processing in real time is more important than processing all the data. Then flushing a Kafka queue will position your consumer in the latest logs and will ignore all remainiong documents.

What to do

Note before everything that these action require two restart of your consumer topology.

1 . First, you need to tune your topology to keep up with the latest offset of kafka. To do so check the occurence of the topology 's Kafka spout and replace it with latest.

"spouts" : [
      "type" : "kafka_input",
      "settings" : {
 -       "start_offset_strategy" : "last_committed",
 +       "start_offset_strategy" : "latest",
        "brokers" : "local",
      "storm_settings" : {
        "executors": 1,
        "component" : "kafka_input",
        "publish" : [ { "stream" : "logs", "fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp" ] } ] 

2 . Commit your change (it is important that this kind of replay is logged somewhere, GIT versioning is the solution here):

$ git add %PUNCHPLATFORM_CONF_DIR%/tenants/%TENANT%/channels/%MYCHANNEL%/%MY_TOPOLOGY%.json ; git commit –m « empty Kafka buffer for techno XXX »)

Note somewhere the commit id. You will need this later.

3 . Restart the channel:

channelctl stop %MYCHANNEL%
channelctl start %MYCHANNEL%

Now the Kafka buffer is emptied. You shall however put back previous configuration after, because a topology fail will restart this very configuration and empty again the backlog:

4 . Rollback the configuration:

$ git revert <commit_number>

5 . Restart the channel:

channelctl stop %MYCHANNEL%
channelctl start %MYCHANNEL%

6 . Push the changes:

$ git push