Kafka Input¶
The KafkaInput reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka input(s) from the same topology or from topologies identically named but belonging to the same configured group.
Note
also refer to the kafka input javadoc documentation.
Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. Check the example configuration properties below. In most case you want the default configuration.
{
"type" : "kafka_input",
"settings" : {
# Fetch data from the Kafka cluster identified by "front".
# You must have a corresponding entry in your
# punchplatform.properties kafka cluster section.
"brokers" : "front",
# the topic to read data from
"topic" : "mytenant.apache",
# Possible values are "earliest", "latest" or "last_committed".
"start_offset_strategy" : "last_committed"
}
}
Mandatory Parameters¶
topic
: String
- The topic to send data to
brokers
: String
- the kafka cluster id, chosen from the keys in your kafka.clusters dictionary in $PUNCHPLATFORM_CONF_DIR/punchplatform.properties configuration file. If
bootstrap.servers
is set, this parameter is only used to define a metric id to identify your cluster in Punchplatform metrics
start_offset_strategy
: String
- Check next section for more details
Using only the mandatory parameters, the kafka bolt will expect to read Kafka record containing lumberjack encoded payload (otherwise see coddec settings in Kafka Record Format section)
Offset Strategy¶
Here are the various values you can use for the start_offset_strategy
:
-
earliest
makes the KafkaInput start reading from the oldest offset found in a topic partition EACH TIME the punchline JVM starts. Watch out : in case of later restart of the topology (due to any problem), the input will start again at the current beginning of the topic, re-processing massive amounts of data, and causing potential duplicates.1 2 3
In addition, the offset is not committed in the usual consumer group (i.e. if you go back to the normal 'last_committed' strategy, the processings done during the 'earliest' executions will not be taken into account). So, for resetting ONCE the processing of a production consumer to the beginning of the topic, prefer using [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md) '--reset-to-earliest' command while the consumer is temporarily stopped.
-
latest
makes it start from the latest offset EACH TIME the punchline JVM starts/restarts, hence skipping potentially large amount of past data. Watch out : using that mode, there will be no auto commit. In addition, in case of later restart of the topology (due to any problem), the input will start again at the current end of the topic, skipping any records arrived in the topic during the stop/restart phase.1 2 3
In addition, the offset is not committed in the usual consumer group (i.e. if you go back to the normal 'last_committed' strategy, the processings done during the 'earliest' executions will not be taken into account). So, for resetting ONCE the processing of a production consumer to the end of the topic, prefer using [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md) '--reset-to-latest' command while the consumer is temporarily stopped.
-
last_committed
makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. In this mode, the proccessed offset will be stored in Kafka (CONSUMER_OFFSETS topic) to be able to start again from the same point in case of JVM or application restart, providing AT_LEAST_ONCE processing guarantee.1 2
This is therefore the value that should ALWAYS be used in production. For shifting the committed offset in production, please refer to [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md)
-
from_offset
makes it start from a specific offset. Watch out : using that mode, there will be no persistent offset commit and processing will restart from the same point each time the punchline/JVM restarts. So, see punchplatform-kafka-consumers capability to reset processing to a specified offset of each partition, while the consumer is temporarily stopped. -
from_datetime
makes it start from the first message received after this date. The message date is the Kafka internal record timestamp. See also the optional parameters. Watch out : using that mode, there will be no persistent offset commit and processing will restart from the same point each time the punchline/JVM restarts. So, see punchplatform-kafka-consumers capability to reset processing to a specified message date of each partition, while the consumer is temporarily stopped.from_datetime_time_ms
: Integer : 10000 If you usefrom_datetime
, this is the kafka request timeout expressed in milliseconds before giving up. The request referred to here is the one used to fetch the starting offset.
Failures and Record Replay¶
The default mode of the Kafka input in case a tuple failure occurs is to rewind the affected partition reading to the failed tuple. That basically means that all Kafka records from that failed offset will be re-read and re-submitted to the topology.
Two important behaviors can be configured with respect to that strategy :
- the input can emit only the failed tuple, and not resend already acked tuples. This behavior is controlled by the
smart_replay
configuration property. It is true by default. If you set it to false, all tuples will be repeated from the failed one which will increase the number of replayed tuples. - some downstream bolts expect to never receive any repeated records. This is the case of the file bolt in charge of archiving batches of tuples. If you use the Kafka spout for archiving use cases you must configure the Kafka spout to fail altogether should a tuple failure occur. As a result, the topology will restart shortly and will resume sending records from the last committed offset. You can control that using the
fail_action
property.
Important
Replaying tuple is only easy to process for stateless applications where idempotency is achieved using per record identifier or field. For example a topology that reads Kafka to insert records into elasticsearch can have the default configuration. At the end a unique identifier associated to the records will avoid storing duplicates into Elasticsearch.
In contrast archiving applications are not designed to cope with tuple replays. The (so-called) fail stop behavior is exactly what you want. If at some point your topology cannot write anymore to the (S3|Ceph|Filesystem) archive, the topology will keep fail and be restarted until the archive service becomes up and ready again.
Kafka Record Format¶
In the general case, Kafka records contain a key and value, the value being encoded (often in lumberjack format) Sometimes the message key is not used (null) Let us go through the several options you have.
Lumberjack¶
This is the default settings. The Kafka input expects to receive Kafka record containing no key and a lumberjack encoded value. Lumberjack is a key value binary format used by elastic components and supported by the punch. For example a lumberjack record could contain the following map of key-value:
{
"id" : "ejKld9iwoo",
"data" : "hello world",
"something_else" : "you do not need this one"
}
In order to emit in your topology some or all of the lumberjack record fields, you must define them as part of the publish stream settings. For example, to emit only the "id" and the "data" fields, use:
{
"type" : "kafka_input",
"settings" : { ... }
"publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ]
}
Arbitrary Key and Value format¶
If you receive records using other encoding schemes, possibly with a record key, here is how to proceed. First define the (optional) key codec, the the value codec as follows:
{
"type" : "kafka_input",
"settings" : {
# if you define a key_codex the input will expect a key in incoming records.
# Its type can be "string", "bytes" or "epoch"
"key_codec": {
"type": "string"
},
# if you define a value_codec the input will read the record value using the specified
# encoding. Accepted type can be "string", "bytes" or "epoch"
"value_codec": {
"type": "string"
}
}
"publish" : [ { "stream" : "logs", "fields" : ["key", "value"] } ]
}
Warning
if you set a key_codec
but receive kafka record without a key, a null value will be
emitted.
If you set a key_codec
but receive kafka record without a key, a null value will be emitted.
Reserved fields¶
You can add some reserved fields as part of your published fields. These make the input node to insert special meta data values rather than using the read data. For example if you want to emit the topic name as part of the published data, you can include the '_ppf_topic_name' reserved field. The possible reserved fields are:
- _ppf_partition_id contains the corresponding Kafka partition identifier
- _ppf_partition_offset contains the current Kafka partition offset
- _ppf_topic_name contains the current Kafka topic name
- _ppf_kafka_timestamp contains the record timestamp in
epoch
format in milliseconds. Its value depends on the settings of Kafka’s server-sidelog.message.timestamp.type
broker andmessage.timestamp.type
topic parameters. Check Kafka documentation. - _ppf_batch_play_id contains a unique runtime identifier to identify batches of messages. This field is deprecated.
Here is an example:
{
"publish" : [
{
"stream" : "logs",
"fields" : [
"log"
"_ppf_partition_id",
"_ppf_partition_offset",
"_ppf_topic_name",
"_ppf_kafka_timestamp"
]
}
]
}
Custom headers¶
Kafka input allows a user to define headers contained in kafka records to inject it inside storm field.
The supported header types are :
string
: UTF_8 default encodinginteger
: Big endian and little endian allowed for byte orderinglong
: Big endian and little endian allowed for byte orderingbytes
: keep the byte array from the kafka header's value
The custom header configuration is set inside storm settings.
Important
For each configured custom kafka header, the field name MUST be included in the published fields if the user wants the value to be injected inside the resulting tuple from the stream output
Example of configuration :
{
"executors": 1,
"component": "kafka_input",
"headers_fields": [
{
"header": "myheader1",
"field": "field1",
"type": "string"
},
{
"header": "myheader2",
"field": "field2",
"type": "integer",
"ordering": "big_endian"
}
],
"publish": [
{
"stream": "logs",
"fields": [
"field1",
"field2",
"_ppf_topic_name"
]
}
]
}
}
Parameters :
-
headers_fields.header
: String, MandatoryKey of the kafka record's header to catch from the consumed topic
-
headers_fields.field
: String, MandatoryStream field name where the header's value will be injected
-
headers_fields.type
: String, MandatoryValue type. Allowed types are
string
,integer
,long
andbytes
-
headers_fields.encoding
: String, OptionalDefault is
UTF_8
. Encoding type for string values. This configuration is ignored for numbers. -
headers_fields.ordering
: String, OptionalDefault is big endian. This configuration is ignored for strings.
-
headers_fields.on_decoding_error
: String, OptionalDefault is
CRASH
. Allowed behaviors areCRASH
,WARN
, orPASS
. InCRASH
mode, a decoding error is logged and the system exits. InWARN
mode, a simple warn log is displayed and a null value is generated for the concerned header. InPASS
mode, a debug log is displayed and a null value is generated for the concerned header.
Batch Mode¶
Danger
Starting at punch v5.7 this mode is DEPRECATED (it used to be needed to work with legacy File Bolt archiving). Instead the batching for archiving is controlled directly at the level of the File Bolt.
Consumer Groups¶
Several Kafka input nodes can cooperate to consume partitions of a single topic. You can control what spouts are cooperating when processing a given topic, by assigning an identical group.id
settings to all cooperating spouts/executors. There is a sensible default value that usually suits the need of simple topologies. If you need different punchline instances (with different application id) to cooperate, you will have to provide explicetely the same value for this settings to all these punchlines.
It is easier to understand the various possibilities using the typical uses cases you can achieve.
For the sake of clarity we assume you have two tenants (tenant1
and tenant2
), with each two channels (channel1
and channel2
). In each channel you have two punchline punchline1
and punchline2
. Here is the configuration layout.
└── tenant1
├── channel1
│ ├── channel_structure.json
│ ├── punchline1.json
│ └── punchline2.json
└── channel2
├── punchline1.json
└── punchline2.json
In each punchline you have a KafkaInput node named(say)input
. Let us go through the configuration options you have.
Per Punchline Topic Consuming¶
This is a likely setup. You want a given punchline to consume a topic (say mytopic
) independently from other punchlines
fromn the same of from different channels. This mode is the default one and requires no settings. The group name is then automatically generated to achieve that behavior. For example the channel1/punchline1
will use the following group identifier:
tenant1-channel1-punchline1-input
tenant1-channel1-punchline2-input
tenant2-channel1-punchline1-input
tenant2-channel1-punchline2-input
With such settings each punchline KafkaInput node will consume all the mytopic
topic partitions and manage each partition offset saving on its own. There are no side effect from one punchline to the other.
Explicit Punchline Cooperation¶
In some advanced use cses you actually want several punchlines (from the same or from different channels) to cooperate and consume the same topic. To do that simply name the group.id
with a common value. For example:
{
"type" : "kafka_input",
"type" : "input",
"settings" : {
"topic" : "mytopic",
"group.id" : "mygroup"
}
}
What happens then ? The four punchlines will compete to read the same topic. The topic partitions will be assigned to a consumer randomly chosen from these four punchline. You will have no control over which punchline reads wchich partition. The partition assignement and rebalancing will be orchestrated by Kafka itself.
Tip
When is this useful ? In situation where you want a resilient yet active-active topic consuming from distinct single-process punchlines. For example you have two servers, each has a running punchline. The two punchline will share the load. Should one server crash, the surviving punchline will start consuming the orphan partitions and sustain the total input traffic. When the crashed server resumes, the traffic will rebalance qutomatically.
Earliest or Latest Offset Consuming¶
What we just explained makes only sense with the last_committed
offset strategy. i.e. whenever your punchline save their offsets so as to consume records from the latest saved state. If you use the earliest
or latest strategy
, several punchlines cannot cooperate. Each will use an ephemeral group identifier. Such punchline are only used for development, testing ofbackup procedures.
Explicit Partition Assignement¶
Another advanced but possible use case is to deploy several KafkaInput nodes in one or several punchlines, and make each consume a given ptopic partition. For example:
{
"type" : "kafka_input",
"type" : "input",
"settings" : {
"topic" : "mytopic",
"group.id" : "mygroup",
"partition" : 3
}
}
With such a settings, that node will only consume the third partition from the mytopic
topic.
Tip
When is this useful ? In situation you want fine grain control over your partition consuming. Note that with this settings, if a punchline fails, it will have to be restarted so that the given partition gets consumed again. This mode is thus typiclly used in conjunction with Kubernetes or Shiva schedulers to ensure that each failed punchline is resumed on some backup server.
Security Parameters¶
-
security.protocol
: StringSecurity protocol to contact Kafka. Only
SSL
is supported. -
ssl.truststore.location
: StringPath to client truststore to check Kafka SSL certificate.
-
ssl.truststore.password
: StringPassword to open client truststore.
-
ssl.keystore.location
: StringPath to client keystore to send client certificate to Kafka SSL. Mandatory if client authentication is required in Kafka SSL configuration.
-
ssl.keystore.password
: StringPassword to open client keystore. Mandatory if client authentication is required in Kafka SSL configuration.
Advanced Parameters¶
Warning
Use with care.
bootstrap.servers
: String- Supersedes the values computed based on the
brokers
parameter. If present, it will override the default brokers addresses/ports from punchplatform.properties to resolve your kafka cluster. Ex : "localhost:9092,otherhost:9092". This can be useful in some context where the kafka cluster is not addressed with the same names/addresses depending on the source machine (here, the one running the punchline), or in case of ports routing.
- Supersedes the values computed based on the
fail_action
: StringPossible values: "exit", "sleep", "none". Default value: "none". Use "exit" to achieve a fail stop behavior. The topology will fail when a tuple fail. Use "sleep" to slow down the topology when a tuple fail. Sleep time can be adjusted with
fail_sleep_ms
. Use "none" to replay tuple. Replay can be adjusted withsmart_replay
.fail_sleep_ms
: LongOnly relevant with
fail_action
set tosleep
.
Default value: "50". Make the node sleep when it receives a tuple failure. This can be useful to slow down a topology when the tuple traffic completely fails.smart_replay
: BooleanIf true, failed tuple are only replayed (i.e. re-emitted) if they were not previously acked nor are still pending in the topology. This reduces the amount of replayed data. Of course it is irrelevant if you set the
fail_action
to "exit".group.id
: Stringthe kafka consumer group name, used to make different input node instances cooperate and consume the same topic, whether they are in a same punchline instance or not. The default group id is
. . . so only multiple executors of the same component, in the same punchline will cooperate. partition
: IntegerThe Kafka partition to read. If this property is not defined the Kafka input will read all partitions. If several inputs are running (i.e. the number of executors is greater than one), the inputs will automatically share the partitions. If in contrast you define a partition number the input will only consume that partition.
offset
: longThe offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.
auto_commit_interval
: String : 10000the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.
load_control
: StringIf set to
rate
the input will limit the rate of the emitted data. If set toadaptative
, the spout will perform an automatic backpressure rate limiting. If set tonone
no load control is performed.auto.offset.reset
: StringWhat to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
- earliest (default): automatically reset the offset to the earliest offset. With this option, some data will be replayed if the current offset is lost.
- latest: automatically reset the offset to the latest offset. With this option, some data will be ignored if the current offset is lost.
Backward Compatibility¶
The group_name
property is deprecated but still supported for bacward compatibility with 5.x releases.
Metrics¶
See KafkaInputMetrics