Skip to content

HOW TO Alter an existing Kafka topic (partitions, retention, replicas)

Why do that

In case of non-nominal cluster status, or during/after an incident, you may want

  • To increase the partition count of a topic (in order to then allow scaling of process that consumes the topic, using additional kafka spouts executors)

  • To activate/deactivate replication of a topic, or relocate a replica

  • To free some disk space on a kafka data disk, or to keep some retention topic data for longer than the usual, by changing the retention settings of a topic

Adding a partition to an existing topic

This can only be used to increase partitions count of the topic. This is useful if there is need to scale the consumers group for load sharing, and the consumers group will then be bigger than the current number of partitions (remember : a partition can be consumed by only one of the consumers of a load-sharing group at a given time.)

Use kafka-topics.sh (or the punchplatform-kafka-topic.sh wrapper) tool with --alter command on an existing topic ;

: from a PunchPlatform admin account on a station where PunchPlatform distribution is deployed, run :

1
2
3
```sh
> punchplatform-kafka-topics.sh --kafkaCluster front --topic myTopic --alter --partitions 4  
```

Changing replicas count or location of an existing topic

This is useful to increase failure-resilience of a Kafka topic, to rebalance the replicas location, or to rebuild replicas somewhere else after a broker node has been removed.

The procedure is in three steps :

  • 1) determine existing replicas and associated nodes, using kafka-topic.sh tool, or punchplatform-kafka-topic.sh wrapper command

    punchplatform-kafka-topics.sh --kafkaCluster front --topic myTopic --describe 
    
  • 2) create a new replicas assignment file, including new replicas assigned to nodes that did not have already a replica for this topic. For instance :

    #/tmp/toto.json
    {"partitions":
      [{"topic": "mytenant_arkoon-output","partition":0,"replicas":[1,2]},
      {"topic": "mytenant_arkoon-output","partition":1,"replicas":[2,3]},
      {"topic": "mytenant_arkoon-output","partition":2,"replicas":[3,1]}],
      "version":1
    }
    
  • 3) use kafka re-assignment tool to request from Kafka cluster the desired assignment of partitions

    $ /data/opt/kafka_2.10-0.10.0.1/bin/kafka-reassign-partitions.sh --execute --reassignment-json-file /tmp/toto.json --zookeeper LMCSOCKAF01I:2181/punchplatform/kafka-local
    

Please refer to this wiki for details on assignment file format.

Changing retention settings for a specific topic

This procedure allow to override, for a specific topic, the minimum duration/size of a topic partition data before Kafka erases it.

When choosing the appropriate values for your new settings, you must remember the following facts :

  • Kafka does not know if your messages have been processed or not. For speed sale, Kafka will just keep all data until the retention limit is reached for the topic partition, and then remove the oldest data in partition
  • Kafka destroys oldest data from a topic partition as soon as ONE of the retention limit is reached :
    • the oldest partition data is older than the given retention time setting
    • the total partition data size is bigger than the given retention size setting
  • The retention settings apply to each partition within a topic. It means that with a default setting common to all topic, topics with more partitions are allowed to use more disk space ! By default, the retention settings applicable to a topic is the common settings found in /data/opt/xxxx-server.properties :
  • log.retention.hours
  • log.retention.bytes Then, for each topic, a specific retention can be specified. You can see the specific retention settings for topic using the punchplatform-kafka-topics.sh --describe --kafkaCluster \<cluster\If no specific retention settings is displayed for a topic, then the default applies from the broker settings (see above). Retention settings can be changed using the following command lines run as application administrator (ppadmin) on PunchPlatform administration environment (no need to stop Kafka, no interruption to the service)

punchplatform-kafka-topics.sh --kafkaCluster <cluster> --topic <topic_name> --add-config retention.bytes=NNNNNNNNNNN
punchplatform-kafka-topics.sh --kafkaCluster <cluster> --topic <topic_name> --add-config retention.ms=NNNNNNNNNNN
Kafka documentation : topic level > configuration