Skip to content

Kafka Input

The KafkaInput reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka input(s) from the same topology or from topologies identically named but belonging to the same configured group.

Note

also refer to the kafka input javadoc documentation.

Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. Check the example configuration properties below. In most case you want the default configuration.

type: kafka_input
settings:
  # Fetch data from the Kafka cluster identified by "front".
  # You must have a corresponding entry in your
  # punchplatform.properties kafka cluster section.
  brokers: front
  # the topic to read data from
  topic: mytenant.apache
  # Possible values are "earliest", "latest" or "last_committed".
  start_offset_strategy: last_committed

Mandatory Parameters

  • topic : String
  • The topic to send data to
  • brokers : String
  • the kafka cluster id, chosen from the keys in your kafka.clusters dictionary in > $PUNCHPLATFORM_CONF_DIR/punchplatform.properties configuration > file. > If bootstrap.servers is set, this parameter is only used to define a metric id to identify your cluster in Punchplatform > metrics
  • start_offset_strategy : String
  • Check next section for more details

Using only the mandatory parameters, the kafka bolt will expect to read Kafka record containing lumberjack encoded payload (otherwise see codec settings in Kafka Record Format section)

Offset Strategy

Here are the various values you can use for the start_offset_strategy:

  • earliest makes the KafkaInput start reading from the oldest offset found in a topic partition EACH TIME the punchline JVM starts. Watch out : in case of later restart of the topology (due to any problem), the input will start again at the current beginning of the topic, re-processing massive amounts of data, and causing potential duplicates.

    1
    2
    3
    In addition, the offset is not committed in the usual consumer group (i.e. if you go back to the normal 'last_committed' strategy, the processings done during the 'earliest' executions will not be taken into account).
    
    So, for resetting ONCE the processing of a production consumer to the beginning of the topic, prefer using [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md) '--reset-to-earliest' command while the consumer is temporarily stopped.
    
  • latest makes it start from the latest offset EACH TIME the punchline JVM starts/restarts, hence skipping a potentially large amount of past data. Watch out : using that mode, there will be no auto commit. In addition, in case of later restart of the topology (due to any problem), the input will start again at the current end of the topic, skipping any records arrived in the topic during the stop/restart phase.

    1
    2
    3
    In addition, the offset is not committed in the usual consumer group (i.e. if you go back to the normal 'last_committed' strategy, the processings done during the 'earliest' executions will not be taken into account).
    
    So, for resetting ONCE the processing of a production consumer to the end of the topic, prefer using [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md) '--reset-to-latest' command while the consumer is temporarily stopped.
    
  • last_committed makes it start from the latest committed offset, hence not replaying old data already processed and acknowledged. In this mode, the processed offset will be stored in Kafka (CONSUMER_OFFSETS topic) to be able to start again from the same point in case of JVM or application restart, providing AT_LEAST_ONCE processing guarantee.

    1
    2
    This is therefore the value that should ALWAYS be used in production. For shifting the committed offset
    in production, please refer to [punchplatform-kafka-consumers](../../../Manual_Pages/punchplatform-kafka-consumers.sh.md)
    
  • from_offset makes it start from a specific offset. Watch out : using that mode, there will be no persistent offset commit and processing will restart from the same point each time the punchline/JVM restarts. So, see punchplatform-kafka-consumers capability to reset processing to a specified offset of each partition, while the consumer is temporarily stopped.

  • from_datetime makes it start from the first message received after this date. The message date is the Kafka internal record timestamp. See also the optional parameters. Watch out : using that mode, there will be no persistent offset commit and processing will restart from the same point each time the punchline/JVM restarts. So, see punchplatform-kafka-consumers capability to reset processing to a specified message date of each partition, while the consumer is temporarily stopped.

from_datetime_time_ms : Integer : 10000 If you use from_datetime, this is the kafka request timeout expressed in milliseconds before giving up. The request referred to here is the one used to fetch the starting offset.

Failures and Record Replay

The default mode of the Kafka input in case a tuple failure occurs is to rewind the affected partition reading to the failed tuple. That basically means that all Kafka records from that failed offset will be re-read and re-submitted to the topology.

Two important behaviors can be configured with respect to that strategy :

  1. the input can emit only the failed tuple, and not resend already acked tuples. This behavior is controlled by the smart_replay configuration property. It is true by default. If you set it to false, all tuples will be repeated from the failed one which will increase the number of replayed tuples.
  2. some downstream bolts expect to never receive any repeated records. This is the case of the file bolt in charge of archiving batches of tuples. If you use the Kafka spout for archiving use cases you must configure the Kafka spout to fail altogether should a tuple failure occur. As a result, the topology will restart shortly and will resume sending records from the last committed offset. You can control that using the fail_action property.

Important

Replaying tuple is only easy to process for stateless applications where idempotency is achieved using per record identifier or field. For example a topology that reads Kafka to insert records into elasticsearch can have the default configuration. At the end a unique identifier associated to the records will avoid storing duplicates into Elasticsearch.

In contrast archiving applications are not designed to cope with tuple replays. The (so-called) fail stop behavior is exactly what you want. If at some point your topology cannot write anymore to the (S3|Ceph|Filesystem) archive, the topology will keep fail and be restarted until the archive service becomes up and ready again.

Kafka Record Format

In the general case, Kafka records contain a key and value, the value being encoded (often in lumberjack format) Sometimes the message key is not used (null) Let us go through the several options you have.

Lumberjack

This is the default settings. The Kafka input expects to receive Kafka record containing no key, and a lumberjack encoded value. Lumberjack is a key value binary format used by elastic components and supported by the punch. For example a lumberjack record could contain the following map of key-value:

{
  "id": "ejKld9iwoo",
  "data": "hello world",
  "something_else": "you do not need this one"
}

In order to emit in your topology some or all of the lumberjack record fields, you must define them as part of the publishing stream settings. For example, to emit only the "id", and the "data" fields, use:

type: kafka_input
settings: { }
publish:
  - stream: logs
    fields:
      - id
      - data

Arbitrary Key and Value format

If you receive records using other encoding schemes, possibly with a record key, here is how to proceed. First define the (optional) key codec, and the value codec as follows:

type: kafka_input
settings:
  # if you define a key_codex the input will expect a key in incoming records.
  # Its type can be "string", "bytes" or "epoch"
  key_codec:
    type: string
  # if you define a value_codec the input will read the record value using the specified
  # encoding. Accepted type can be  "string", "bytes" or "epoch"
  value_codec:
    type: string
publish:
  - stream: logs
    fields:
      - key
      - value

If you provide a string codec type, you can also set the encoding to use. Default is UTF-8.

type: kafka_input
settings:
  key_codec:
    type: string
    encoding: utf-8
  value_codec:
    type: string
    encoding: utf-8
publish:
  - stream: logs
    fields:
      - key
      - value

Warning

if you set a key_codec but receive kafka record without a key, a null value will be emitted.

Reserved fields

You can add some reserved fields as part of your published fields. These make the input node to insert special metadata values rather than using the read data. For example if you want to emit the topic name as part of the published data, you can include the '_ppf_topic_name' reserved field. The possible reserved fields are:

  • _ppf_partition_id contains the corresponding Kafka partition identifier
  • _ppf_partition_offset contains the current Kafka partition offset
  • _ppf_topic_name contains the current Kafka topic name
  • _ppf_kafka_timestamp contains the record timestamp in epoch format in milliseconds. Its value depends on the settings of Kafka’s server-side log.message.timestamp.type broker and message.timestamp.type topic parameters. Check Kafka documentation.
  • _ppf_batch_play_id contains a unique runtime identifier to identify batches of messages. This field is deprecated.

Here is an example:

publish:
  - stream: logs
    fields:
      - log
      - _ppf_partition_id
      - _ppf_partition_offset
      - _ppf_topic_name
      - _ppf_kafka_timestamp

Custom headers

Kafka input allows a user to define headers contained in kafka records to inject it inside storm field.

The supported header types are :

  • string : UTF_8 default encoding
  • integer : Big endian and little endian allowed for byte ordering
  • long : Big endian and little endian allowed for byte ordering
  • bytes : keep the byte array from the kafka header's value

The custom header configuration is set inside storm settings.

Important

For each configured custom kafka header, the field name MUST be included in the published fields if the user wants the value to be injected inside the resulting tuple from the stream output

Example of configuration :

component: kafka_input
executors: 1
headers_fields:
  - header: myheader1
    field: field1
    type: string
  - header: myheader2
    field: field2
    type: integer
    ordering: big_endian
publish:
  - stream: logs
    fields:
      - field1
      - field2
      - _ppf_topic_name

Parameters :

  • headers_fields.header : String, Mandatory

    Key of the kafka record's header to catch from the consumed topic

  • headers_fields.field : String, Mandatory

    Stream field name where the header's value will be injected

  • headers_fields.type : String, Mandatory

    Value type. Allowed types are string, integer, long and bytes

  • headers_fields.encoding : String, Optional

    Default is UTF_8. Encoding type for string values. This configuration is ignored for numbers.

  • headers_fields.ordering : String, Optional

    Default is big endian. This configuration is ignored for strings.

  • headers_fields.on_decoding_error : String, Optional

    Default is CRASH. Allowed behaviors are CRASH, WARN, or PASS. In CRASH mode, a decoding error is logged, and the system exits. In WARN mode, a simple warn log is displayed, and a null value is generated for the concerned header. In PASS mode, a debug log is displayed, and a null value is generated for the concerned header.

Batch Mode

Danger

Starting at punch v5.7 this mode is DEPRECATED (it used to be needed to work with legacy File Bolt archiving). Instead the batching for archiving is controlled directly at the level of the File Bolt.

Consumer Groups

Several Kafka input nodes can cooperate to consume partitions of a single topic. You can control what spouts are cooperating when processing a given topic, by assigning an identical group.id settings to all cooperating spouts/executors. There is a sensible default value that usually suits the need of simple topologies. If you need different punchline instances (with different application id) to cooperate, you will have to provide explicitly the same value for these settings to all these punchlines.

It is easier to understand the various possibilities using the typical uses cases you can achieve. For the sake of clarity we assume you have two tenants (tenant1 and tenant2), with each two channels (channel1 and channel2). In each channel you have two punchline punchline1 and punchline2. Here is the configuration layout.

└── tenant1
    ├── channel1
    │   ├── channel_structure.json
    │   ├── punchline1.json
    │   └── punchline2.json
    └── channel2
        ├── punchline1.json
        └── punchline2.json

In each punchline you have a KafkaInput node named(say)input. Let us go through the configuration options you have.

Per Punchline Topic Consuming

This is a likely setup. You want a given punchline to consume a topic (say mytopic) independently from other punchlines from the same of from different channels. This mode is the default one and requires no settings. The group name is then automatically generated to achieve that behavior. For example the channel1/punchline1 will use the following group identifier:

  • tenant1-channel1-punchline1-input
  • tenant1-channel1-punchline2-input
  • tenant2-channel1-punchline1-input
  • tenant2-channel1-punchline2-input

With such settings each punchline KafkaInput node will consume all the mytopic topic partitions and manage each partition offset saving on its own. There are no side effect from one punchline to the other.

Explicit Punchline Cooperation

In some advanced use cases you actually want several punchlines (from the same or from different channels) to cooperate and consume the same topic. To do that simply name the group.id with a common value. For example:

type: kafka_input
component: input
settings:
  topic: mytopic
  group.id: mygroup

What happens then ? The four punchlines will compete to read the same topic. The topic partitions will be assigned to a consumer randomly chosen from these four punchline. You will have no control over which punchline reads which partition. The partition assignment and balancing will be orchestrated by Kafka itself.

Tip

When is this useful ? In situation where you want a resilient yet active-active topic consuming from distinct single-process punchlines. For example you have two servers, each has a running punchline. The two punchline will share the load. Should one server crash, the surviving punchline will start consuming the orphan partitions and sustain the total input traffic. When the crashed server resumes, the traffic will rebalance qutomatically.

Earliest or Latest Offset Consuming

What we just explained makes only sense with the last_committed offset strategy. i.e. whenever your punchline save their offsets to consume records from the latest saved state. If you use the earliest or latest strategy, several punchlines cannot cooperate. Each will use an ephemeral group identifier. Such punchline are only used for development, testing of backup procedures.

Explicit Partition Assignment

Another advanced but possible use case is to deploy several KafkaInput nodes in one or several punchlines, and make each consume a given topic partition. For example:

type: kafka_input
component: input
settings:
  topic: mytopic
  group.id: mygroup
  partition: 3

With such a settings, that node will only consume the third partition from the mytopic topic.

Tip

When is this useful ? In situation you want fine grain control over your partition consuming. Note that with this settings, if a punchline fails, it will have to be restarted so that the given partition gets consumed again. This mode is thus typiclly used in conjunction with Kubernetes or Shiva schedulers to ensure that each failed punchline is resumed on some backup server.

Security Parameters

  • security.protocol : String

    Security protocol to contact Kafka. Only SSL is supported.

  • ssl.truststore.location : String

    Path to client truststore to check Kafka SSL certificate.

  • ssl.truststore.password : String

    Password to open client truststore.

  • ssl.keystore.location : String

    Path to client keystore to send client certificate to Kafka SSL. Mandatory if client authentication is required in Kafka SSL configuration.

  • ssl.keystore.password : String

    Password to open client keystore. Mandatory if client authentication is required in Kafka SSL configuration.

Advanced Parameters

Warning

Use with care.

  • bootstrap.servers : String
  • Supersedes the values computed based on the brokers parameter. If present, it will override the default brokers addresses/ports from punchplatform.properties to > resolve your kafka cluster. Ex : "localhost:9092,otherhost:9092". This can be useful in some context where the kafka cluster is not addressed with the same names/addresses depending on the source machine (here, the one running the punchline), or in case of ports routing.
  • fail_action : String

    Use "exit" to achieve a fail stop behavior. The topology will fail should a tuple fail. By default, "none" in which case you can control the tuple failure handling using the smart_replay parameter.

  • smart_replay : Boolean

    If true, failed tuple are only replayed (i.e. re-emitted) if they were not previously acked nor are still pending in the topology. This reduces the amount of replayed data. Of course, it is irrelevant if you set the fail_action to "exit".

  • group.id : String

    the kafka consumer group name, used to make different input node instances cooperate and consume the same topic, whether they are in a same punchline instance or not. The default group id is ... so only multiple executors of the same component, in the same punchline will cooperate. <<<<<<< HEAD

  • partition : Integer

    The Kafka partition to read. If this property is not defined, the Kafka input will read all partitions. If several inputs are running (i.e. the number of executors is greater than one), the inputs will automatically share the partitions. If, in contrast, you define a partition number, the input will only consume that partition.

  • offset : long

The offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.

  • auto_commit_interval : String : 10000

the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.

  • load_control : String

> If set to rate the input will limit the rate of the emitted data. If set to adaptative, the spout will perform an automatic backpressure rate limiting. If set to none no load control is performed.

  • partition : Integer The Kafka partition to read. If this property is not defined the Kafka input will read all partitions. If several inputs are running (i.e. the number of executors is greater than one), the inputs will automatically share the partitions. If in contrast you define a partition number the input will only consume that partition.
  • offset : long The offset to start from. Mandatory when 'start_offset_strategy' is set to 'from_offset'.
  • auto_commit_interval : String : 10000 the delay in milliseconds between offset commits. Usually there is no reason to do that often, in case of failure and restart, you will restart reading the data from the last saved offset. This is not an issue assuming idempotent data handling.
  • load_control : String If set to rate the input will limit the rate of the emitted data. If set to adaptative, the spout will perform an automatic backpressure rate limiting. If set to none no load control is performed.
  • auto.offset.reset : String What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
  • earliest (default): automatically reset the offset to the earliest offset. With this option, some data will be replayed if the current offset is lost.
  • latest: automatically reset the offset to the latest offset. With this option, some data will be ignored if the current offset is lost.

    6.3

Backward Compatibility

The group_name property is deprecated but still supported for backward compatibility with 5.x releases.

Metrics

See KafkaInputMetrics