Skip to content

Kafka Input

The KafkaInput reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka input(s) from the same topology or from topologies identically named but belonging to other Storm clusters.

Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. Check the example configuration properties below. In most case you want the default configuration.

    "type" : "kafka_input",
    "settings" : {

       # Fetch data from the Kafka cluster identified by "front".
       # You must have a corresponding entry in your
       # kafka cluster section.
       "brokers" : "front",

       # the topic to read data from
       "topic" : "mytenant.apache",

       # Possible values are "earliest", "latest" or "last_committed".
       "start_offset_strategy" : "last_committed"

Mandatory Parameters

  • topic : String
  • The topic to send data to
  • brokers : String
  • the kafka brokers, as defined in your $PUNCHPLATFORM_CONF_DIR/ configuration file
  • bootstrap.servers : String
  • Alternative to brokers parameter. If don't want to use to resolve your kafka cluster. Ex : "localhost:9092"
  • metric_cluster_id : String
  • When bootstrap.servers is set, you must provide this parameter to identify your cluster in Punchplatform metrics

Using these parameters the kafka bolt will expect to read Kafka record containing lumberjack encoded payload.

Offset Strategy

Here are the various values you can use for the start_offset_strategy:

  • earliest makes the KafkaInput start reading from the oldest offset found in a topic partition. Watch out : in case of later restart of the topology (due to any problem), the input will start again at the current beginning of the topic, re-processing massive amounts of data, and causing potential duplicates. For resetting the processing to the beginning of the topic, prefer using punchplatform-kafka-consumers '--reset-to-earliest' command.

  • latest makes it start from the latest offset, hence skipping potentially large amount of past data. Watch out : using that mode, there will be no auto commit. In addition, in case of later restart of the topology (due to any problem), the input will start again at the current end of the topic, skipping any records arrived in the topic during the stop/restart phase. For resetting the processing to the end of the topic, prefer using punchplatform-kafka-consumers '--reset-to-latest' command.

  • last_committed makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. This mode activates the auto commit. This is the value that should ALWAYS be used in production. For shifting the committed offset in production, please refer to punchplatform-kafka-consumers

  • from_offset makes it start from a specific offset. Watch out : using that mode, there will be no auto commit.

  • from_datetime makes it start from the first message received after this date. The message date is the Kafka internal record timestamp. See also the optional parameters. Watch out : using that mode, there will be no auto commit. > from_datetime_timeout_ms : Integer : 10000 > If you use from_datetime, this is the kafka request timeout expressed in milliseconds before giving up. The request referred to here is the one used to fetch the starting offset.

Failures and Record Replay

The default mode of the Kafka input in case a tuple failure occurs is to rewind the affected partition reading to the failed tuple. That basically means that all Kafka records from that failed offset will be re-read and re-submitted to the topology.

Two important behaviors can be configured with respect to that strategy :

  1. the input can emit only the failed tuple, and not resend already acked tuples. TThis behavior is controlled by the smart_replay configuration property. It is true by default. If you set it to false, all tuples will be repeated from the failed one.
  2. some downstream bolts expect to never receive any repeated records. This is the case of the file bolt in charge of archiving batches of tuples. If you use the Kafka spout for archiving use cases you must configure the Kafka spout to fail altogether should a tuple failure occur. As a result, the topology will restart shortly and will resume sending records from the last committed offset. You can control that using the fail_action property.


Replaying tuple is only acceptable for stateless applications where idempotency is achieved using per record identifier or field. For example a topology that reads Kafka to insert records into elasticsearch can have the default configuration. At the end a unique identifier associated to the records will avoid storing duplicates into Elasticsearch.

In contrast archiving applications are not designed to cope with tuple replays. The (so-called) fail stop behavior is exactly what you want. If at some point your topology cannot write anymore to the (S3|Ceph|Filesystem) archive, the topology will keep fail and be restarted until the archive service becomes up and ready again.

Kafka Record Format

In the general case, Kafka records contain a key and a value. Sometimes the key is not used. Let us go through the several options you have.


This is the default settings. The Kafka input expects to receive Kafka record containing no key and a lumberjack encoded value. Lumberjack is a key value binary format used by elastic components and supported by the punch. For example a lumberjack record could contain the following map of key-value:

     "id" : "ejKld9iwoo",
     "data" : "hello world",
     "something_else" : "you do not need this one"

In order to emit in your topology some or all of the lumberjack record fields, you must define them as part of the publish stream settings. For example, to emit only the "id" and the "data" fields, use:

             "type" : "kafka_input",
             "settings" : { ... }
             "storm_settings" : {
                  "publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ]

Arbitrary Key and Value format

If you receive records using other encoding schemes, possibly with a record key, here is how to proceed. First define the (optional) key codec, the the value codec as follows:

    "type" : "kafka_input",
    "settings" : {

        # if you define a key_codex the input will expect a key in incoming records.
        # Its type can be "string", "bytes" or "epoch"
        "key_codec": {
            "type": "string"
        # if you define a value_codec the input will read the record value using the specified
        # encoding. Accepted type can be  "string", "bytes" or "epoch"
        "value_codec": {
            "type": "string"
    "storm_settings" : {
        "publish" : [ { "stream" : "logs", "fields" : ["key", "value"] } ]


if you set a key_codec but receive kafka record without a key, a null value will be emitted. if you set a key_codec but receive kafka record without a key, a null value will be emitted.

Reserved fields

Kafka input can configure reserved published fields insides its storm settings :

  • _ppf_partition_id contains the corresponding Kafka partition identifier
  • _ppf_partition_offset contains the current Kafka partition offset
  • _ppf_topic_name contains the current Kafka topic name
  • _ppf_kafka_timestamp contains the record timestamp in epoch format in milliseconds. Its value depends on the settings of Kafka’s server-side log.message.timestamp.type broker and message.timestamp.type topic parameters. Check Kafka documentation.
  • _ppf_batch_play_id contains unique runtime identifier for this batch. You can safely use that id as unique key in a hash table. You are guaranteed that in case of data replay, the batch play id will change.
  • _ppf_batch_id contains the persisted batch. This id is an increasing counter, that will be replayed in case a topology is restarted before a batch was fully processed and acknowledged. It is the key identifier used to perform idempotent/exactly-once processing in case of replay. This field is part of both the user tuples and the end-of-batch marker tuple.
    "storm_settings" : {
      "publish" : [ 
          "stream" : "logs", 
          "fields" : [

Custom headers

Kafka input allows a user to define headers contained in kafka records to inject it inside storm field.

The supported header types are :

  • string : UTF_8 default encoding
  • integer : Big endian and little endian allowed for byte ordering
  • long : Big endian and little endian allowed for byte ordering
  • bytes : keep the byte array from the kafka header's value

The custom header configuration is set inside storm settings.


For each configured custom kafka header, the field name MUST be included in published storm fields if the user wants the value to be injected inside the resulting tuple from the stream output

Example of configuration :

  "storm_settings": {
    "executors": 1,
    "component": "kafka_input",
    "headers_fields": [
        "header": "myheader1",
        "field": "field1",
        "type": "string"
        "header": "myheader2",
        "field": "field2",
        "type": "integer",
        "ordering": "big_endian"
    "publish": [
        "stream": "logs",
        "fields": [

Parameters :

  • headers_fields.header : String, Mandatory

    Key of the kafka record's header to catch from the consumed topic

  • headers_fields.field : String, Mandatory

    Stream field name where the header's value will be injected

  • headers_fields.type : String, Mandatory

    Value type. Allowed types are string, integer, long and bytes

  • headers_fields.encoding : String, Optional

    Default is UTF_8. Encoding type for string values. This configuration is ignored for numbers.

  • headers_fields.ordering : String, Optional

    Default is big endian. This configuration is ignored for strings.

  • headers_fields.on_decoding_error : String, Optional

    Default is CRASH. Allowed behaviors are CRASH, WARN, or PASS. In CRASH mode, a decoding error is logged and the system exits. In WARN mode, a simple warn log is displayed and a null value is generated for the concerned header. In PASS mode, a debug log is displayed and a null value is generated for the concerned header.

Batch Mode

It it sometimes required to consume records using a batch strategy. For example to archive records to a third party storage, the punch file bolt expect to receive batch of (say) 50000 records in order to write compressed files to the archive. Here is how you configure the Kafka input to work using a batch mode:

  • batch_size : Integer

    This parameter activates the batch strategy. It defines the number of message handled in one batch.

  • batch_interval : String

    Using the batch strategy, if no message is received, this will make the batch flushed after some time. Use the time interval notation.

!!! important Starting at punch v5.6 this mode is deprecated. Instead the batch size is controlled directly at the level of the File Bolt.

Advanced Parameters


Use with care.

  • fail_action : String

    Use "exit" to achieve a fail stop behavior. The topology will fail should a tuple fail. By default "none" in which case you can control the tuple failure handling using the smart_replay parameter.

  • smart_replay : Boolean

    If true, failed tuple are only replayed (i.e. re-emitted) if they were not previously acked nor are still pending in the topology. This reduces the amount of replayed data. Of course it is irrelevant if you set the fail_action to "exit".

  • group_name : String

    the consumer group name, used to make different input instances cooperate and consume the same topic. By default all spouts from the same topology share the incoming load.

  • partition : Integer

    The Kafka partition to read. If this property is not defined the Kafka input will read all partitions. If several inputs are running (i.e. the number of executors is greater than one), the inputs will automatically share the partitions. If in contrast you define a partition number the input will only consume that partition.

  • offset : long

    The offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.

  • auto_commit_interval : String : 10000

    the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.

  • load_control : String

    If set to rate the input will limit the rate of the emitted data. If set to adaptative, the spout will perform an automatic backpressure rate limiting. If set to none no load control is performed.

Refer to the batch kafka spout javadoc documentation


See KafkaInputMetrics