Skip to content

Kafka Spout

The KafkaSpout reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka spout(s) from the same topology or from topologies identically named but belonging to other Storm clusters.

Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. Check the example configuration properties below. In most case you want the default configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
    "type" : "kafka_spout",
    "spout_settings" : {

       # Fetch data from the Kafka cluster identified by "front".
       # You must have a corresponding entry in your
       # punchplatform.properties kafka cluster section.
       "brokers" : "front",

       # the topic to read data from
       "topic" : "mytenant.apache",

       # Possible values are "earliest", "latest" or "last_committed".
       "start_offset_strategy" : "last_committed"
     }
 }

Using these parameters the kafka bolt will expect to read Kafka record containing lumberjack encoded payload.

Offset Strategy

Here are the various values you can use for the start_offset_strategy:

  • earliest makes the KafkaSpout start reading from the oldest offset found in a topic partition. Watch out : using that mode, there will be no auto commit.
  • latest makes it start from the latest offset, hence skipping potentially large amount of past data. Watch out : using that mode, there will be no auto commit.
  • last_committed makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. This mode activates the auto commit.
  • from_offset makes it start from a specific offset. Watch out : using that mode, there will be no auto commit.
  • from_datetime makes it start from the first message received after this date. The message date is the Kafka internal record timestamp. See also the optional parameters. Watch out : using that mode, there will be no auto commit. > from_datetime_timeout_ms : Integer : 10000 > If you use from_datetime, this is the kafka request timeout expressed in milliseconds before giving up. The request referred to here is the one used to fetch the starting offset.

Kafa Record Format

In the general case, Kafka records contain a key and a value. Sometimes the key is not used. Let us go through the several options you have.

Lumberjack

This is the default settings. The Kafka Spout expects to receive Kafka record containing no key and a lumberjack encoded value. Lumberjack is a key value binary format used by elastic components and supported by the punch. For example a lumberjack record could contain the following map of key-value:

1
2
3
4
5
 {
     "id" : "ejKld9iwoo",
     "data" : "hello world",
     "something_else" : "you do not need this one"
}

In order to emit in your topology some or all of the lumberjack record fields, you must define them as part of the pusblish stream settings. For example, to emit only the "id" and the "data" fields, use:

1
2
3
4
5
6
7
        {
             "type" : "kafka_spout",
             "spout_settings" : { ... }
             "storm_settings" : {
                  "publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ]
             }
         }

Arbitrary Key and Value format

If you receive records using other encoding schemes, possibly with a record key, here is how to proceed. First define the (optional) key codec, the the value codec as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
    "type" : "kafka_spout",
    "spout_settings" : {

        # if you define a key_codex the spout will expect a key in incoming records.
        # Its type can be "string", "bytes" or "epoch"
        "key_codec": {
            "type": "string"
        },
        # if you define a value_codec the spout will read the record value using the specified
        # encoding. Accepted type can be  "string", "bytes" or "epoch"
        "value_codec": {
            "type": "string"
        }
    }
    "storm_settings" : {
        "publish" : [ { "stream" : "logs", "fields" : ["key", "value"] } ]
    }

Warning

if you set a key_codec but receive kafka record without a key, a null value will be emitted.

Batch Mode

It it sometimes required to consume records using a batch strategy. For example to archive records to a third party storage, the punch file bolt expect to receive batch of (say) 50000 records in order to write compressed files to the archive. Here is how you configure the Kafka spout to work using a batch mode:

  • batch_size : Integer

    This parameter activates the batch strategy. It defines the number of message handled in one batch.

  • batch_interval : String

    Using the batch strategy, if no message is received, this will make the batch flushed after some time. Use the time interval notation.

Advanced Parameters

Warning

Use with care.

  • group_name : String

    the consumer group name, used to make different spout instances cooperate and consume the same topic. By default all spouts from the same topology share the incoming load.

  • partition : Integer

    The Kafka partition to read. If this property is not defined the Kafka spout will read all partitions. If several spouts are running (i.e. the number of executors is greater than one), the spouts will automatically share the partitions. If in contrast you define a partition number the spout will only consume that partition.

  • offset : long

    The offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.

  • auto_commit_interval : String : 10000

    the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.

  • load_control : String

    If set to rate the spout will limit the rate of the emitted data. If set to adaptative, the spout will perform an automatic backpressure rate limiting. If set to none no load control is performed.

Refer to the batch kafka spout javadoc documentation

Metrics

See KafkaSpoutMetrics