Kafka Spout¶
The KafkaSpout reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka spout(s) from the same topology or from topologies identically named but belonging to other Storm clusters.
Note
also refer to the kafka spout javadoc documentation and the batch kafka spout javadoc documentation.
Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. Check the example configuration properties below. In most case you want the default configuration.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | { "type" : "kafka_spout", "spout_settings" : { # Fetch data from the Kafka cluster identified by "front". # You must have a corresponding entry in your # punchplatform.properties kafka cluster section. "brokers" : "front", # the topic to read data from "topic" : "mytenant.apache", # Possible values are "earliest", "latest" or "last_committed". "start_offset_strategy" : "last_committed" } } |
Using these parameters the kafka bolt will expect to read Kafka record containing lumberjack encoded payload.
Offset Strategy¶
Here are the various values you can use for the start_offset_strategy
:
earliest
makes the KafkaSpout start reading from the oldest offset found in a topic partition. Watch out : using that mode, there will be no auto commit.latest
makes it start from the latest offset, hence skipping potentially large amount of past data. Watch out : using that mode, there will be no auto commit.last_committed
makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. This mode activates the auto commit.from_offset
makes it start from a specific offset. Watch out : using that mode, there will be no auto commit.from_datetime
makes it start from the first message received after this date. The message date is the Kafka internal record timestamp. See also the optional parameters. Watch out : using that mode, there will be no auto commit. >from_datetime_timeout_ms
: Integer : 10000 > If you usefrom_datetime
, this is the kafka request timeout expressed in milliseconds before giving up. The request referred to here is the one used to fetch the starting offset.
Kafa Record Format¶
In the general case, Kafka records contain a key and a value. Sometimes the key is not used. Let us go through the several options you have.
Lumberjack¶
This is the default settings. The Kafka Spout expects to receive Kafka record containing no key and a lumberjack encoded value. Lumberjack is a key value binary format used by elastic components and supported by the punch. For example a lumberjack record could contain the following map of key-value:
1 2 3 4 5 | { "id" : "ejKld9iwoo", "data" : "hello world", "something_else" : "you do not need this one" } |
In order to emit in your topology some or all of the lumberjack record fields, you must define them as part of the pusblish stream settings. For example, to emit only the "id" and the "data" fields, use:
1 2 3 4 5 6 7 | { "type" : "kafka_spout", "spout_settings" : { ... } "storm_settings" : { "publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ] } } |
Arbitrary Key and Value format¶
If you receive records using other encoding schemes, possibly with a record key, here is how to proceed. First define the (optional) key codec, the the value codec as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | { "type" : "kafka_spout", "spout_settings" : { # if you define a key_codex the spout will expect a key in incoming records. # Its type can be "string", "bytes" or "epoch" "key_codec": { "type": "string" }, # if you define a value_codec the spout will read the record value using the specified # encoding. Accepted type can be "string", "bytes" or "epoch" "value_codec": { "type": "string" } } "storm_settings" : { "publish" : [ { "stream" : "logs", "fields" : ["key", "value"] } ] } |
Warning
if you set a key_codec
but receive kafka record without a key, a null value will be emitted.
Batch Mode¶
It it sometimes required to consume records using a batch strategy. For example to archive records to a third party storage, the punch file bolt expect to receive batch of (say) 50000 records in order to write compressed files to the archive. Here is how you configure the Kafka spout to work using a batch mode:
batch_size
: IntegerThis parameter activates the batch strategy. It defines the number of message handled in one batch.
batch_interval
: StringUsing the batch strategy, if no message is received, this will make the batch flushed after some time. Use the time interval notation.
Advanced Parameters¶
Warning
Use with care.
group_name
: Stringthe consumer group name, used to make different spout instances cooperate and consume the same topic. By default all spouts from the same topology share the incoming load.
partition
: IntegerThe Kafka partition to read. If this property is not defined the Kafka spout will read all partitions. If several spouts are running (i.e. the number of executors is greater than one), the spouts will automatically share the partitions. If in contrast you define a partition number the spout will only consume that partition.
offset
: longThe offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.
auto_commit_interval
: String : 10000the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.
load_control
: StringIf set to
rate
the spout will limit the rate of the emitted data. If set toadaptative
, the spout will perform an automatic backpressure rate limiting. If set tonone
no load control is performed.
Refer to the batch kafka spout javadoc documentation
Metrics¶
See KafkaSpoutMetrics