Kafka Spout¶
The KafkaSpout reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka spout(s) from the same topology or from topologies identically named but belonging to other Storm clusters.
Note
also refer to the kafka spout javadoc documentation and the batch kafka spout javadoc documentation.
Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. Check the example configuration properties below. In most case you want the default configuration.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | { "type" : "kafka_spout", "spout_settings" : { # Fetch data from the Kafka cluster identified by "front". # You must have a corresponding entry in your # punchplatform.properties kafka cluster section. "brokers" : "front", # the topic to read data from "topic" : "mytenant.apache", # Possible values are "earliest", "latest" or "last_committed". "start_offset_strategy" : "last_committed" } } |
Using these parameters the kafka bolt will expect to read Kafka record containing lumberjack encoded payload.
Offset Strategy¶
Here are the various values you can use for the start_offset_strategy
:
-
earliest
makes the KafkaSpout start reading from the oldest offset found in a topic partition. Watch out : in case of later restart of the topology (due to any problem), the input will start again at the current beginning of the topic, re-processing massive amounts of data, and causing potential duplicates. For resetting the processing to the beginning of the topic, prefer using punchplatform-kafka-consumers '--reset-to-earliest' command. -
latest
makes it start from the latest offset, hence skipping potentially large amount of past data. Watch out : using that mode, there will be no auto commit. In addition, in case of later restart of the topology (due to any problem), the input will start again at the current end of the topic, skipping any records arrived in the topic during the stop/restart phase. For resetting the processing to the end of the topic, prefer using punchplatform-kafka-consumers '--reset-to-latest' command. -
last_committed
makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. This mode activates the auto commit. This is the value that should ALWAYS be used in production. For shifting the committed offset in production, please refer to punchplatform-kafka-consumers -
from_offset
makes it start from a specific offset. Watch out : using that mode, there will be no auto commit. from_datetime
makes it start from the first message received after this date. The message date is the Kafka internal record timestamp. See also the optional parameters. Watch out : using that mode, there will be no auto commit. >from_datetime_timeout_ms
: Integer : 10000 > If you usefrom_datetime
, this is the kafka request timeout expressed in milliseconds before giving up. The request referred to here is the one used to fetch the starting offset.
Failures and Record Replay¶
The default mode of the Kafka Spout in case a tuple failure occurs is to rewind the affected partition reading to the failed tuple. That basically means that all Kafka records from that failed offset will be re-read and re-submitted to the topology.
Two important behaviors can be configured with respect to that strategy :
- the spout can emit only the failed tuple, and not resend already acked tuples. TThis behavior is controlled by the
smart_replay
configuration property. It is true by default. If you set it to false, all tuples will be repeated from the failed one. - some downstream bolts expect to never receive any repeated records. This is the case of the file bolt in charge of archiving batches of tuples. If you use the Kafka spout for archiving use cases you must configure the Kafka spout to fail alltogether should a tuple failure occur. As a result, the topology will restart shortly and will resume sending records from the last committed offset. You can control that using the
fail_action
property.
Important
Replaying tuple is only acceptable for stateless applications where idempotency is achieved using per record identifier or field. For example a topology that reads Kafka to insert records into elasticsearch can have the default configuration. At the end a unique identifier associated to the records will avoid storing duplicates into Elasticsearch.
In contrast archiving applications are not designed to cope with tuple replays. The (so-called) fail stop behavior is exactly what you want. If at some point your topology cannot write anymore to the (S3|Ceph|Filesystem) archive, the topology will keep fail and be restarted until the archive service becomes up and ready again.
Kafka Record Format¶
In the general case, Kafka records contain a key and a value. Sometimes the key is not used. Let us go through the several options you have.
Lumberjack¶
This is the default settings. The Kafka Spout expects to receive Kafka record containing no key and a lumberjack encoded value. Lumberjack is a key value binary format used by elastic components and supported by the punch. For example a lumberjack record could contain the following map of key-value:
1 2 3 4 5 | { "id" : "ejKld9iwoo", "data" : "hello world", "something_else" : "you do not need this one" } |
In order to emit in your topology some or all of the lumberjack record fields, you must define them as part of the publish stream settings. For example, to emit only the "id" and the "data" fields, use:
1 2 3 4 5 6 7 | { "type" : "kafka_spout", "spout_settings" : { ... } "storm_settings" : { "publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ] } } |
Arbitrary Key and Value format¶
If you receive records using other encoding schemes, possibly with a record key, here is how to proceed. First define the (optional) key codec, the the value codec as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | { "type" : "kafka_spout", "spout_settings" : { # if you define a key_codex the spout will expect a key in incoming records. # Its type can be "string", "bytes" or "epoch" "key_codec": { "type": "string" }, # if you define a value_codec the spout will read the record value using the specified # encoding. Accepted type can be "string", "bytes" or "epoch" "value_codec": { "type": "string" } } "storm_settings" : { "publish" : [ { "stream" : "logs", "fields" : ["key", "value"] } ] } |
Warning
if you set a key_codec
but receive kafka record without a key, a null value will be emitted.
Reserved fields¶
Kafka spout can configure reserved published fields insides its storm settings :
- _ppf_partition_id contains the corresponding Kafka partition identifier
- _ppf_partition_offset contains the current Kafka partition offset
- _ppf_topic_name contains the current Kafka topic name
- _ppf_kafka_timestamp contains the record timestamp in
epoch
format in milliseconds. Its value depends on the settings of Kafka’s server-sidelog.message.timestamp.type
broker andmessage.timestamp.type
topic parameters. Check Kafka documentation. - _ppf_batch_play_id contains unique runtime identifier for this batch. You can safely use that id as unique key in a hash table. You are guaranteed that in case of data replay, the batch play id will change.
- _ppf_batch_id contains the persisted batch. This id is an increasing counter, that will be replayed in case a topology is restarted before a batch was fully processed and acknowledged. It is the key identifier used to perform idempotent/exactly-once processing in case of replay. This field is part of both the user tuples and the end-of-batch marker tuple.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | { "storm_settings" : { "publish" : [ { "stream" : "logs", "fields" : [ "_ppf_partition_id", "_ppf_partition_offset", "_ppf_topic_name", "_ppf_kafka_timestamp" ] } ] } } |
Custom headers¶
Kafka Spout allows a user to define headers contained in kafka records to inject it inside storm field.
The supported header types are :
string
: UTF_8 default encodinginteger
: Big endian and little endian allowed for byte orderinglong
: Big endian and little endian allowed for byte orderingbytes
: keep the byte array from the kafka header's value
The custom header configuration is set inside storm settings.
Important
For each configured custom kafka header, the field name MUST be included in published storm fields if the user wants the value to be injected inside the resulting tuple from the stream output
Example of configuration :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | { "storm_settings": { "executors": 1, "component": "kafka_spout", "headers_fields": [ { "header": "myheader1", "field": "field1", "type": "string" }, { "header": "myheader2", "field": "field2", "type": "integer", "ordering": "big_endian" } ], "publish": [ { "stream": "logs", "fields": [ "field1", "field2", "_ppf_topic_name" ] } ] } } |
Parameters :
-
headers_fields.header
: String, MandatoryKey of the kafka record's header to catch from the consumed topic
-
headers_fields.field
: String, MandatoryStream field name where the header's value will be injected
-
headers_fields.type
: String, MandatoryValue type. Allowed types are
string
,integer
,long
andbytes
-
headers_fields.encoding
: String, OptionalDefault is
UTF_8
. Encoding type for string values. This configuration is ignored for numbers. -
headers_fields.ordering
: String, OptionalDefault is big endian. This configuration is ignored for strings.
-
headers_fields.on_decoding_error
: String, OptionalDefault is
CRASH
. Allowed behaviors areCRASH
,WARN
, orPASS
. InCRASH
mode, a decoding error is logged and the system exits. InWARN
mode, a simple warn log is displayed and a null value is generated for the concerned header. InPASS
mode, a debug log is displayed and a null value is generated for the concerned header.
Batch Mode¶
It it sometimes required to consume records using a batch strategy. For example to archive records to a third party storage, the punch file bolt expect to receive batch of (say) 50000 records in order to write compressed files to the archive. Here is how you configure the Kafka spout to work using a batch mode:
batch_size
: IntegerThis parameter activates the batch strategy. It defines the number of message handled in one batch.
batch_interval
: StringUsing the batch strategy, if no message is received, this will make the batch flushed after some time. Use the time interval notation.
!!! important Starting at punch v5.6 this mode is deprecated. Instead the batch size is controlled directly at the level of the File Bolt.
Advanced Parameters¶
Warning
Use with care.
fail_action
: StringUse "exit" to achieve a fail stop behavior. The topology will fail should a tuple fail. By default "none" in which case you can control the tuple failure handling using the
smart_replay
parameter.smart_replay
: BooleanIf true, failed tuple are only replayed (i.e. re-emitted) if they were not previously acked nor are still pending in the topology. This reduces the amount of replayed data. Of course it is irrelevant if you set the
fail_action
to "exit".group_name
: Stringthe consumer group name, used to make different spout instances cooperate and consume the same topic. By default all spouts from the same topology share the incoming load.
partition
: IntegerThe Kafka partition to read. If this property is not defined the Kafka spout will read all partitions. If several spouts are running (i.e. the number of executors is greater than one), the spouts will automatically share the partitions. If in contrast you define a partition number the spout will only consume that partition.
offset
: longThe offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.
auto_commit_interval
: String : 10000the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.
load_control
: StringIf set to
rate
the spout will limit the rate of the emitted data. If set toadaptative
, the spout will perform an automatic backpressure rate limiting. If set tonone
no load control is performed.
Refer to the batch kafka spout javadoc documentation
Metrics¶
See KafkaSpoutMetrics