Skip to content

Troubleshooting Kafka errors in production

Kafka alerts

Why do that

Your supervision (nagios for example) or the punch admin dashboards report a red or yellow Kafka status.

What to do

Use the platformctl and kafkactl command to have precise status and health information about the kafka cluster, the topics and the consumer groups.

Kafka input fails to commit after rebalancing

In Storm topologies, the kafka_input might fail with following error :

o.t.p.k.i.KafkaConsumerImpl [INFO] message="on partitions assigned" partitions=[topicA-2, topicA-3]
o.a.k.c.c.i.AbstractCoordinator [INFO] [Consumer clientId=consumer-tenant.output-channel.output-topology.kafka_input-3, groupId=tenant.output-channel.output-topology.kafka_input] Discovered group coordinator <coordinator_kafkaback> (id: 2147483645 rack: null)
o.a.k.c.c.i.ConsumerCoordinator [ERROR] [Consumer clientId=consumer-tenant.output-channel.output-topology.kafka_input-3, groupId=tenant.output-channel.output-topology.kafka_input] Offset commit failed on partition gcs-channelB-backtopic-logs-1 at offset 12121212: The coordinator is not aware of this member. 
o.a.k.c.c.i.ConsumerCoordinator [INFO] [Consumer clientId=consumer-tenant.output-channel.output-topology.kafka_input-3, groupId=tenant.output-channel.output-topology.kafka_input] OffsetCommit failed with Generation{generationId=23, protocol='range'}: The coordinator is not aware of this member. 
o.t.p.k.i.OffsetTreePartitionImpl [FATAL] message="failed to commit kafka offset" 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1256) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1163) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1164) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1139) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1005) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[stormjar.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1443) ~[stormjar.jar:?]
        at org.thales.punch.kafka.impl.OffsetTreePartitionImpl.commit(OffsetTreePartitionImpl.java:142) [stormjar.jar:?]
        at org.thales.punch.kafka.impl.KafkaConsumerImpl.commit(KafkaConsumerImpl.java:333) [stormjar.jar:?]
        at org.thales.punch.libraries.storm.spout.KafkaInput.nextTuple(KafkaInput.java:280) [stormjar.jar:?]
        at org.apache.storm.executor.spout.SpoutExecutor$2.call(SpoutExecutor.java:193) [storm-client-2.3.0.jar:2.3.0]
        at org.apache.storm.executor.spout.SpoutExecutor$2.call(SpoutExecutor.java:160) [storm-client-2.3.0.jar:2.3.0]
        at org.apache.storm.utils.Utils$1.run(Utils.java:394) [storm-client-2.3.0.jar:2.3.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
o.a.s.u.Utils [INFO] Halting after 3 seconds 

This means that the kafka_input node has not acknowledged the messages polled from Kafka before max.poll.interval.ms.

As a result, Kafka exclude kafka_input from consumer group and performs a rebalance. If kafka_input eventually acknowledges the tuples and commit to Kafka, Kafka raises an exception as this consumer is not part of the consumer group anymore.

Usually, it is because your output node has an issue and cannot acknowledge Tuples. For instance, if you have a final elasticsearch_output that fails to index documents, the tuples will be replayed until success. If this success happens after max.poll.interval.ms, Kafka will raise the exception.

In kafka_input, max.poll.interval.ms is set to 300 seconds by default. This means that your tuples have 300 seconds to be acknowledged by output node. For most production platforms, this is usually enough. However, if you need more flexibility for your end-to-end processing, you can play on the following parameters in your kafka_input :

  • max.poll.interval.ms
  • max.poll.records

More about those parameters on official Kafka doc.

More about Kafka Rebalance protocol on this blog.