Skip to content

Kafka Input

The KafkaInput reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka input(s) from the same topology or from topologies identically named but belonging to other Storm clusters.

Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. Check the example configuration properties below. In most case you want the default configuration.

{
    "type" : "kafka_input",
    "settings" : {

       # Fetch data from the Kafka cluster identified by "front".
       # You must have a corresponding entry in your
       # punchplatform.properties kafka cluster section.
       "brokers" : "front",

       # the topic to read data from
       "topic" : "mytenant.apache",

       # Possible values are "earliest", "latest" or "last_committed".
       "start_offset_strategy" : "last_committed"
     }
 }

Mandatory Parameters

  • topic : String
  • The topic to send data to
  • brokers : MANDATORY String
  • the kafka cluster id, chosen from the keys in your kafka.clusters dictionary in $PUNCHPLATFORM_CONF_DIR/punchplatform.properties configuration file
  • bootstrap.servers : String
  • Supersedes the values computed based on the brokers parameter. If prsent, it will override the default brokers addresses/ports from punchplatform.properties to resolve your kafka cluster. Ex : "localhost:9092,otherhost:9092". This can be useful in some context where the kafka cluster is not addressed with the same names/addresses depending on the source machine (here, the one running the punchline), or in case of ports routing. Otherwise, leave blank and nodes/ports from deployment settings will apply.

Using only the mandatory parameters, the kafka bolt will expect to read Kafka record containing lumberjack encoded payload (otherwise see coddec settings in Kafka Record Format section)

Offset Strategy

Here are the various values you can use for the start_offset_strategy:

  • earliest makes the KafkaInput start reading from the oldest offset found in a topic partition EACH TIME the punchline JVM starts. Watch out : in case of later restart of the topology (due to any problem), the input will start again at the current beginning of the topic, re-processing massive amounts of data, and causing potential duplicates.

    1
    2
    3
    In addition, the offset is not committed in the usual consumer group (i.e. if you go back to the normal 'last_committed' strategy, the processings done during the 'earliest' executions will not be taken into account).
    
    So, for resetting ONCE the processing of a production consumer to the beginning of the topic, prefer using [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md) '--reset-to-earliest' command while the consumer is temporarily stopped.
    
  • latest makes it start from the latest offset EACH TIME the punchline JVM starts/restarts, hence skipping potentially large amount of past data. Watch out : using that mode, there will be no auto commit. In addition, in case of later restart of the topology (due to any problem), the input will start again at the current end of the topic, skipping any records arrived in the topic during the stop/restart phase.

    1
    2
    3
    In addition, the offset is not committed in the usual consumer group (i.e. if you go back to the normal 'last_committed' strategy, the processings done during the 'earliest' executions will not be taken into account).
    
    So, for resetting ONCE the processing of a production consumer to the end of the topic, prefer using [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md) '--reset-to-latest' command while the consumer is temporarily stopped.
    
  • last_committed makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. In this mode, the proccessed offset will be stored in Kafka (CONSUMER_OFFSETS topic) to be able to start again from the same point in case of JVM or application restart, providing AT_LEAST_ONCE processing guarantee.

    1
    2
    This is therefore the value that should ALWAYS be used in production. For shifting the committed offset
    in production, please refer to [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md)
    
  • from_offset makes it start from a specific offset. Watch out : using that mode, there will be no persistent offset commit and processing will restart from the same point each time the punchline/JVM restarts. So, see punchplatform-kafka-consumers capability to reset processing to a specified offset of each partition, while the consumer is temporarily stopped.

  • from_datetime makes it start from the first message received after this date. The message date is the Kafka internal record timestamp. See also the optional parameters. Watch out : using that mode, there will be no persistent offset commit and processing will restart from the same point each time the punchline/JVM restarts. So, see punchplatform-kafka-consumers capability to reset processing to a specified message date of each partition, while the consumer is temporarily stopped.

    from_datetime_time_ms : Integer : 10000 If you use from_datetime, this is the kafka request timeout expressed in milliseconds before giving up. The request referred to here is the one used to fetch the starting offset.

Failures and Record Replay

The default mode of the Kafka input in case a tuple failure occurs is to rewind the affected partition reading to the failed tuple. That basically means that all Kafka records from that failed offset will be re-read and re-submitted to the topology.

Two important behaviors can be configured with respect to that strategy :

  1. the input can emit only the failed tuple, and not resend already acked tuples. This behavior is controlled by the smart_replay configuration property. It is true by default. If you set it to false, all tuples will be repeated from the failed one which will increase the number of replayed tuples.
  2. some downstream bolts expect to never receive any repeated records. This is the case of the file bolt in charge of archiving batches of tuples. If you use the Kafka spout for archiving use cases you must configure the Kafka spout to fail altogether should a tuple failure occur. As a result, the topology will restart shortly and will resume sending records from the last committed offset. You can control that using the fail_action property.

Important

Replaying tuple is only easy to process for stateless applications where idempotency is achieved using per record identifier or field. For example a topology that reads Kafka to insert records into elasticsearch can have the default configuration. At the end a unique identifier associated to the records will avoid storing duplicates into Elasticsearch.

In contrast archiving applications are not designed to cope with tuple replays. The (so-called) fail stop behavior is exactly what you want. If at some point your topology cannot write anymore to the (S3|Ceph|Filesystem) archive, the topology will keep fail and be restarted until the archive service becomes up and ready again.

Kafka Record Format

In the general case, Kafka records contain a key and value, the value being encoded (often in lumberjack format) Sometimes the message key is not used (null) Let us go through the several options you have.

Lumberjack

This is the default settings. The Kafka input expects to receive Kafka record containing no key and a lumberjack encoded value. Lumberjack is a key value binary format used by elastic components and supported by the punch. For example a lumberjack record could contain the following map of key-value:

 {
     "id" : "ejKld9iwoo",
     "data" : "hello world",
     "something_else" : "you do not need this one"
}

In order to emit in your topology some or all of the lumberjack record fields, you must define them as part of the publish stream settings. For example, to emit only the "id" and the "data" fields, use:

        {
             "type" : "kafka_input",
             "settings" : { ... }
              "publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ]
         }

Arbitrary Key and Value format

If you receive records using other encoding schemes, possibly with a record key, here is how to proceed. First define the (optional) key codec, the the value codec as follows:

{
    "type" : "kafka_input",
    "settings" : {

        # if you define a key_codex the input will expect a key in incoming records.
        # Its type can be "string", "bytes" or "epoch"
        "key_codec": {
            "type": "string"
        },
        # if you define a value_codec the input will read the record value using the specified
        # encoding. Accepted type can be  "string", "bytes" or "epoch"
        "value_codec": {
            "type": "string"
        }
    }
    "publish" : [ { "stream" : "logs", "fields" : ["key", "value"] } ]
}

Warning

if you set a key_codec but receive kafka record without a key, a null value will be emitted. if you set a key_codec but receive kafka record without a key, a null value will be emitted.

Reserved fields

Kafka input can configure reserved published fields insides its storm settings :

  • _ppf_partition_id contains the corresponding Kafka partition identifier
  • _ppf_partition_offset contains the current Kafka partition offset
  • _ppf_topic_name contains the current Kafka topic name
  • _ppf_kafka_timestamp contains the record timestamp in epoch format in milliseconds. Its value depends on the settings of Kafka’s server-side log.message.timestamp.type broker and message.timestamp.type topic parameters. Check Kafka documentation.
  • _ppf_batch_play_id contains unique runtime identifier for this batch. You can safely use that id as unique key in a hash table. You are guaranteed that in case of data replay, the batch play id will change. This field is only produced in the deprecated "batch" mode of kafka input node.
  • _ppf_batch_id contains the persisted batch. This id is an increasing counter, that will be replayed in case a topology is restarted before a batch was fully processed and acknowledged. It is the key identifier used to perform idempotent/exactly-once processing in case of replay. This field is part of both the user tuples and the end-of-batch marker tuple.This field is only produced in the deprecated "batch" mode of kafka input node.
{
      "publish" : [ 
        { 
          "stream" : "logs", 
          "fields" : [
            "_ppf_partition_id",
            "_ppf_partition_offset",
            "_ppf_topic_name",
            "_ppf_kafka_timestamp"
          ] 
        } 
      ]
}

Custom headers

Kafka input allows a user to define headers contained in kafka records to inject it inside storm field.

The supported header types are :

  • string : UTF_8 default encoding
  • integer : Big endian and little endian allowed for byte ordering
  • long : Big endian and little endian allowed for byte ordering
  • bytes : keep the byte array from the kafka header's value

The custom header configuration is set inside storm settings.

Important

For each configured custom kafka header, the field name MUST be included in published storm fields if the user wants the value to be injected inside the resulting tuple from the stream output

Example of configuration :

{
    "executors": 1,
    "component": "kafka_input",
    "headers_fields": [
      {
        "header": "myheader1",
        "field": "field1",
        "type": "string"
      },
      {
        "header": "myheader2",
        "field": "field2",
        "type": "integer",
        "ordering": "big_endian"
      }
    ],
    "publish": [
      {
        "stream": "logs",
        "fields": [
          "field1",
          "field2",
          "_ppf_topic_name"
        ]
      }
    ]
  }
}

Parameters :

  • headers_fields.header : String, Mandatory

    Key of the kafka record's header to catch from the consumed topic

  • headers_fields.field : String, Mandatory

    Stream field name where the header's value will be injected

  • headers_fields.type : String, Mandatory

    Value type. Allowed types are string, integer, long and bytes

  • headers_fields.encoding : String, Optional

    Default is UTF_8. Encoding type for string values. This configuration is ignored for numbers.

  • headers_fields.ordering : String, Optional

    Default is big endian. This configuration is ignored for strings.

  • headers_fields.on_decoding_error : String, Optional

    Default is CRASH. Allowed behaviors are CRASH, WARN, or PASS. In CRASH mode, a decoding error is logged and the system exits. In WARN mode, a simple warn log is displayed and a null value is generated for the concerned header. In PASS mode, a debug log is displayed and a null value is generated for the concerned header.

Batch Mode

Danger

Starting at punch v5.7 this mode is DEPRECATED (it used to be needed to work with legacy File Bolt archiving). Instead the batching for archiving is controlled directly at the level of the File Bolt.

Consumer Groups

Several Kafka input nodes can cooperate to consume partitions of a single topic. You can control what spouts are cooperating when processing a given topic, by assigning an identical group.id settings to all cooperating spouts/executors. There is a sensible default value that usually suits the need of simple topologies. If you need different punchline instances (with different application id) to cooperate, you will have to provide explicetely the same value for this settings to all these punchlines.

It is easier to understand the various possibilities using the typical uses cases you can achieve. For the sake of clarity we assume you have two tenants (tenant1 and tenant2), with each two channels (channel1 and channel2). In each channel you have two punchline punchline1 and punchline2. Here is the configuration layout.

└── tenant1
    ├── channel1
    │   ├── channel_structure.json
    │   ├── punchline1.json
    │   └── punchline2.json
    └── channel2
        ├── punchline1.json
        └── punchline2.json

In each punchline you have a KafkaInput node named(say)input. Let us go through the configuration options you have.

Per Punchline Topic Consuming

This is a likely setup. You want a given punchline to consume a topic (say mytopic) independently from other punchlines fromn the same of from different channels. This mode is the default one and requires no settings. The group name is then automatically generated to achieve that behavior. For example the channel1/punchline1 will use the following group identifier:

  • tenant1-channel1-punchline1-input
  • tenant1-channel1-punchline2-input
  • tenant2-channel1-punchline1-input
  • tenant2-channel1-punchline2-input

With such settings each punchline KafkaInput node will consume all the mytopic topic partitions and manage each partition offset saving on its own. There are no side effect from one punchline to the other.

Explicit Punchline Cooperation

In some advanced use cses you actually want several punchlines (from the same or from different channels) to cooperate and consume the same topic. To do that simply name the group.id with a common value. For example:

{
    "type" : "kafka_input",
    "type" : "input",
    "settings" : {
       "topic" : "mytopic",
       "group.id" : "mygroup"
     }
 }

What happens then ? The four punchlines will compete to read the same topic. The topic partitions will be assigned to a consumer randomly chosen from these four punchline. You will have no control over which punchline reads wchich partition. The partition assignement and rebalancing will be orchestrated by Kafka itself.

Tip

When is this useful ? In situation where you want a resilient yet active-active topic consuming from distinct single-process punchlines. For example you have two servers, each has a running punchline. The two punchline will share the load. Should one server crash, the surviving punchline will start consuming the orphan partitions and sustain the total input traffic. When the crashed server resumes, the traffic will rebalance qutomatically.

Earliest or Latest Offset Consuming

What we just explained makes only sense with the last_committed offset strategy. i.e. whenever your punchline save their offsets so as to consume records from the latest saved state. If you use the earliest or latest strategy, several punchlines cannot cooperate. Each will use an ephemeral group identifier. Such punchline are only used for development, testing ofbackup procedures.

Explicit Partition Assignement

Another advanced but possible use case is to deploy several KafkaInput nodes in one or several punchlines, and make each consume a given ptopic partition. For example:

{
    "type" : "kafka_input",
    "type" : "input",
    "settings" : {
       "topic" : "mytopic",
       "group.id" : "mygroup",
       "partition" : 3
     }
 }

With such a settings, that node will only consume the third partition from the mytopic topic.

Tip

When is this useful ? In situation you want fine grain control over your partition consuming. Note that with this settings, if a punchline fails, it will have to be restarted so that the given partition gets consumed again. This mode is thus typiclly used in conjunction with Kubernetes or Shiva schedulers to ensure that each failed punchline is resumed on some backup server.

Advanced Parameters

Warning

Use with care.

  • fail_action : String

    Use "exit" to achieve a fail stop behavior. The topology will fail should a tuple fail. By default "none" in which case you can control the tuple failure handling using the smart_replay parameter.

  • smart_replay : Boolean

    If true, failed tuple are only replayed (i.e. re-emitted) if they were not previously acked nor are still pending in the topology. This reduces the amount of replayed data. Of course it is irrelevant if you set the fail_action to "exit".

  • group.id : String

    the kafka consumer group name, used to make different input node instances cooperate and consume the same topic. By default all node from the same punchline share the incoming load.

  • partition : Integer

    The Kafka partition to read. If this property is not defined the Kafka input will read all partitions. If several inputs are running (i.e. the number of executors is greater than one), the inputs will automatically share the partitions. If in contrast you define a partition number the input will only consume that partition.

  • offset : long

    The offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.

  • auto_commit_interval : String : 10000

    the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.

  • load_control : String

    If set to rate the input will limit the rate of the emitted data. If set to adaptative, the spout will perform an automatic backpressure rate limiting. If set to none no load control is performed.

Backward Compatibility

The group_name property is deprecated but still supported for bacward compatibility with 5.x releases.

Metrics

See KafkaInputMetrics