public class KafkaInput extends AbstractKafkaNode
"dag" : [
{
"type" : "kafka_input",
"settings" : {
"brokers" : "front",
"topic" : "sampleTenant.apache1",
"start_offset_strategy": "last_committed"
}
"executors": 1,
"publish" : [ { "stream" : "default", "fields" : ["default", "time"] } ]
}
The KafkaInput node can be configured to decode plain strings, lumberjack records, or avro records.
By default it expects lumberjack record (as generated by the punchplatform KafkaOutput node).
Whatever variant you use, these input data all consist in maps of key-value pairs. You must configure your node to emit tuples with all or a subset of these value(s) in the punchline, as part of a stream of your choice. An example will make this easy to understand. Say you read from your Kafka topic a Json string:
{
"id" : "ejKld9iwoo",
"data" : "hello world",
"something_else" : "you do not need this one"
}
You can configure the KafkaInput to emit tuple with (say) the "id"
and "data" fields, but not the "something_else" field. In addition you want these two value tuples to be emitted in a
stream of
your choice, for example "logs". To do that you would add the following publish settings to your node:
{
"type" : "kafka_input",
"settings" : { ... }
"publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ]
}
The KafkaInput node is implemented on top of the punch
kafka consumer. It automatically takes responsibility for reading one
or several partitions of the topic, depending on the runtime presence of
other KafkaInput nodes from the same punchline.
Pay attention to the strategy you choose to read records from the topic. The most common strategy is to start reading data from the last saved offset. The KafkaInput node saves its offset regularly. Check the configuration properties below. In most case you want the default configuration.
You can configure the node to read several topics at once. Simply use an array of topics as illustrated next::
"dag" : [
{
"type" : "kafka_input",
"settings" : {
"brokers" : "front",
"topics" : ["mytenant_apache", "mytenant_sourcefire"],
"start_offset_strategy": "last_committed"
}
"publish" : [ { "stream" : "logs", "fields" : ["id", "data"] } ]
}
When reading several topic it may be useful to publish each record onto a different storm stream.
To do that easily you can declare you
published stream as follows:
"dag" : [
{
"type" : "kafka_input",
"settings" : {
...
"topics" : ["mytenant_apache", "mytenant_sourcefire"],
}
"publish" : [ { "stream" : "%{topic}", "fields" : ["default", "time"] } ]
}
With that settings, kafka records read from the "mytenant_apache" topic will be emitted on the "mytenant_apache" stream.
You can then arrange for upstream bolts to subscribe to that specific stream you want.
property | mandatory | type | default | comment |
---|---|---|---|---|
"topic" | yes | String | - | The Kafka topic. |
"partition" | no | int | - | The listening Kafka partition. If this property is not defined the Kafka input will read all partitions. If several nodes are running (i.e. the number of executors is greater than one), the nodes will automatically share the partitions. If in contrast you define a partition number the node will only consume that partition. You then must setup one node per partition |
"brokers" | yes | String | - | The Kafka brokers to work with. This MUST be an identifier defined in your punchplatform.properties file. |
"fail_action" | no | String | "none" | If you use the Kafka node for archiving, you must set this parameter to "exit" to make the node exit the whole jvm in case of tuple failure. This is a by design behavior you need so as to make the whole punchline restart and reconsume the records from the last saved batch offset. By default the Kafka node will automatically reconsume records from the last saved offset, and will use a best effort strategy to only reemit only those failed. This latter behavior is however not the one expected by archiving file bolts. |
"group.id" | false | String | - | The group identifier shared among the consumers. By default a default group id is generated using the tenant, channel and punchline name separated by dots. |
"auto.commit.interval.ms" | no | long | 10000 | the interval in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will read data from the last save. This is not an issue as long as re-read records will be handled in an idempotent way. Note that this property does not mean the offset will be auto committed. It will only be committed if tuples have been acked |
"start_offset_strategy" | yes | String | - | Possible values are "earliest", "latest", "last_committed", "from_offset" or "from_datetime".
"earliest" makes the Kafkanode start reading from the oldest offset found in a topic partition. Watch out : using that mode, there will be no auto commit. "latest" makes it start from the latest offset, hence skipping potentially large amount of past data. Watch out : using that mode, there will be no auto commit. "last_committed" makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. This mode activates the auto commit. "from_offset" makes it start from a specific offset. Watch out : using that mode, there will be no auto commit. "from_datetime" makes it start from the first message received after this date. The message date is the Kafka internal's partition one. See also "from_datetime" and "from_datetime_format" parameters. Watch out : using that mode, there will be no auto commit. |
"from_datetime" | no | string | the current datetime | The datetime from where message will be read. It depends on the chosen "from_datetime_format".
Mandatory when 'start_offset_strategy' is set to 'from_datetime'. |
"from_datetime_format" | no | string | iso | The datetime format. Accepted values are "iso" and "epoch_ms". Examples:
iso: "2018-06-25T07:57:42Z" or "2018-06-25T09:57:42.761+02:00" (ISO-8601) epoch_ms: "1529913462761" |
"offset_watchdog_timeout" | no | Long | null | The watch dog timeout in millisecond. If too much time goes by without a partition committed offset going up, then the JVM should suicide. Works only if metrics enabled |
"offset" | no | long | 0 | The offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'. |
"load_control" | no | string | "none" | One of "none" or "rate". |
errorStream, failSleep, failStop, failuresDelayMs
Constructor and Description |
---|
KafkaInput(org.thales.punch.libraries.storm.api.NodeSettings settings,
String kafkaClusterId)
Create a new Kafka node
|
Modifier and Type | Method and Description |
---|---|
void |
ack(Object o) |
void |
fail(Object o) |
void |
nextTuple() |
void |
onReceive(org.thales.punch.kafka.api.IPartition partition,
org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]> record) |
void |
open(Map conf,
org.apache.storm.task.TopologyContext topologyContext,
org.apache.storm.spout.SpoutOutputCollector collector) |
getTupleId, process
close, deactivate, declareOutputFields, getPublishedStreams, regulate, sendLatencyRecord
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
onPartitionAssigned, onPartitionRevoked, onTick
public KafkaInput(org.thales.punch.libraries.storm.api.NodeSettings settings, String kafkaClusterId)
settings
- the spout configurationkafkaClusterId
- id of the used kafka cluster. This is used for naming metrics.public void nextTuple()
nextTuple
in interface org.apache.storm.spout.ISpout
nextTuple
in class AbstractKafkaNode
public void open(Map conf, org.apache.storm.task.TopologyContext topologyContext, org.apache.storm.spout.SpoutOutputCollector collector)
open
in interface org.apache.storm.spout.ISpout
open
in class AbstractKafkaNode
public void ack(Object o)
ack
in interface org.apache.storm.spout.ISpout
ack
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void fail(Object o)
fail
in interface org.apache.storm.spout.ISpout
fail
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void onReceive(org.thales.punch.kafka.api.IPartition partition, org.apache.kafka.clients.consumer.ConsumerRecord<byte[],byte[]> record)
Copyright © 2023. All rights reserved.