Skip to content

HOWTO flush kafka backlog

Why do that

Warning

Be aware that this procedure will make you loose data that hasn 't been processed yet !

Let 's say you have a huge load of backlog to treat that will take you a huge amount of time to cope with. You consider that processing in real time is more important than processing all the data. Then flushing a Kafka queue will position your consumer in the latest logs and will ignore all remainiong documents.

What to do

Note before everything that these action require two restart of your consumer topology.

1 . First, you need to tune your topology to keep up with the latest offset of kafka. To do so check the occurence of the topology 's Kafka spout and replace it with latest.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
"spouts" : [
    {
      "type" : "kafka_spout",
      "spout_settings" : {
 -       "start_offset_strategy" : "last_committed",
 +       "start_offset_strategy" : "latest",
        "brokers" : "local",
        ...
      },
      "storm_settings" : {
        "executors": 1,
        "component" : "kafka_spout",
        "publish" : [ { "stream" : "logs", "fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp" ] } ] 
      }
    }
],

2 . Commit your change (it is important that this kind of replay is logged somewhere, GIT versioning is the solution here):

1
$ git add %PUNCHPLATFORM_CONF_DIR%/tenants/%TENANT%/channels/%MYCHANNEL%/%MY_TOPOLOGY%.json ; git commit –m « empty Kafka buffer for techno XXX »)

Note somewhere the commit id. You will need this later.

3 . Restart the channel:

1
$ punchplatform-channel.sh --reload %TENANT%/%MYCHANNEL% # or --stop/--start withstanding your channel_structure configuration

Now the Kafka buffer is emptied. You shall however put back previous configuration after, because a topology fail will restart this very configuration and empty again the backlog:

4 . Rollback the configuration:

1
$ git revert <commit_number>

5 . Restart the channel:

1
$ punchplatform-channel.sh --reload %TENANT%/%MYCHANNEL% # or --stop/--start withstanding your channel_structure configuration

6 . Push the changes:

1
$ git push