Skip to content

HOWTO alter existing kafka topics

Why do that

In case of non-nominal cluster status, or during/after an incident, you may want - To increase the partition count of a topic (in order to then allow scaling of process that consumes the topic, using additional kafka spouts executors) - To activate/deactivate replication of a topic - To free some disk space on a kafka data disk by changing the retention settings of a topic

Adding a partition to an existing topic

This can only be used to increase partitions count of the topic. This is useful if there is need to scale the consumers group for load sharing, and the consumers group will then be bigger than the current number of partitions (remember : a partition can be consumed by only one of the consumers of a load-sharing group at a given time.)

Use kafka-topics.sh (or the punchplatform-kafka-topic.sh wrapper) tool with --alter command on an existing topic ;

: from a PunchPlatform admin account on a station where PunchPlatform distribution is deployed, run :

1
2
3
```sh
> punchplatform-kafka-topics.sh --kafkaCluster front --topic myTopic --alter --partitions 4  
```

Changing replicas count of an existing topic

This is needed to increase failure-resilience of a Kafka topic.

The procedure is in three steps :

  • determine existing replicas and associated nodes, using kafka-topic.sh tool, or punchplatform-kafka-topic.sh wrapper command
1
punchplatform-kafka-topics.sh --kafkaCluster front --topic myTopic --describe 
  • create a new replicas assignment file, including new replicas assigned to nodes that did not have already a replica for this topic. For instance :
1
2
3
4
5
6
7
#/tmp/toto.json
{"partitions":
  [{"topic": "mytenant_arkoon-output","partition":0,"replicas":[1,2]},
  {"topic": "mytenant_arkoon-output","partition":1,"replicas":[2,3]},
  {"topic": "mytenant_arkoon-output","partition":2,"replicas":[3,1]}],
  "version":1
}
  • use kafka re-assignment tool to request from Kafka cluster the desired assignement of partitions
1
$ /data/opt/kafka_2.10-0.10.0.1/bin/kafka-reassign-partitions.sh --execute --reassignment-json-file /tmp/toto.json --zookeeper LMCSOCKAF01I:2181/punchplatform/kafka-local

Please refer to this wiki for details on assignment file format.

Changing retention settings for a specific topic

This procedure allow to override, for a specific topic, the minimum duration/size of a topic partition data before Kafka erases it.

When choosing the appropriate values for your new settigns, you must remember the following facts :

  • Kafka does not know if your messages have been processed or not. For speed sale, Kafka will just keep all data until the retention limit is reached for the topic partition, and then remove the oldest data in partition
  • Kafka destroys oldest data from a topic partition as soon as ONE of the retention limit is reached :
    • the oldest partition data is older than the given retention time setting
    • the total partition data size is bigger than the given retention size setting
  • The retention settings apply to each partition within a topic. It means that with a default settting common to all topic, topics with more partitions are allowed to use more disk space ! By default, the retention settings applicable to a topic is the common settings found in /data/opt/xxxx-server.properties :
  • log.retention.hours
  • log.retention.bytes Then, for each topic, a specific retention can be specified. You can see the specific retention settings for topic using the punchplatform-kafka-topics.sh --describe --kafkaCluster \<cluster\If no specific retention settings is displayed for a topic, then the default applies from the broker settings (see above). Retention settings can be changed using the following command lines run as application administrator (ppadmin) on PunchPlatform administration environement (no need to stop Kafka, no interruption to the service)
1
2
punchplatform-kafka-topics.sh --kafkaCluster <cluster> --topic <topic_name> --alter --config retention.bytes=NNNNNNNNNNN
punchplatform-kafka-topics.sh --kafkaCluster <cluster> --topic <topic_name> --alter --config retention.ms=NNNNNNNNNNN

Kafka documentation : topic level > configuration