Skip to content

Kafka Batch Input

Overview

The kafka_batch_input node makes your job act as a kafka consumer. Records are read from Kafka and send as Dataset. You can then plug in your job sql operator to perform arbitrary batch queries.

Input records are automatically mapped to rows with the following columns:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

We provided a custom kafka offset manager to read from kafka in at least_once mode.

Important

In batch mode, you will have to read from a starting offset to an ending offset. By default, they are set respectively to earliest and latest.

It is also highly recommended to not use spark checkpointing, since they are not adapted for production use-cases and cannot be reused when spark version upgrades...

databrick documentation

The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

---
type: punchline
runtime: spark
version: '6.0'
dag:
- type: elastic_input
  component: kafka_offset
  settings:
    index: kafka-offset-*
    hits_hits: true
    query:
      size: 1
      sort:
      - processed_date:
          order: desc
      query:
        bool:
          must:
          - constant_score:
              filter:
                match:
                  application_offset_id: test_index_offset
  publish:
  - stream: data
- type: kafka_batch_input
  component: input
  settings:
    topic: hello_world
    punch_offset_manager: true
    application_offset_id: test_index_offset
    kafka_settings:
      failOnDataLost: false
  publish:
  - stream: data
  subscribe:
  - stream: data
    component: kafka_offset
    alias: last_committed_meta
- type: sql
  component: cast
  settings:
    statement: |
      SELECT 
        key, 
        CAST(value AS STRING) AS value, 
        topic, 
        timestamp, 
        timestampType, 
        offset, 
        partition,
        processed_date,
        application_offset_id FROM input_data
  publish:
  - stream: data
  subscribe:
  - stream: data
    component: input
- type: elastic_output
  component: commit_offset
  settings:
    index: 
      type: constant
      value: kafka-offset-2020.06.08
    output_mode: append
  subscribe:
  - stream: data
    component: input

Parameters

Name Type mandatory Default value Description
brokers List of Json true NONE a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.
topic String true NONE name of the topic on which you want to push data.
punch_offset_manager Boolean false NONE Activate punch custom offset manager for reading
kafka_settings Map K-V false NONE Valid spark kafka settings. See Kafka 0.10 doc
application_offset_id String false default An Id that will be added to the published dataframe

Kafka Settings

Name Type mandatory Default value Description
kafka.security.protocol String true with Kafka SSL NONE Security protocol to contact Kafka
kafka.ssl.truststore.location String true with Kafka SSL NONE Path to client truststore to check Kafka SSL certificate.
kafka.ssl.truststore.password String true with Kafka SSL NONE Password to open client truststore.
kafka.ssl.keystore.location String true with Kafka SSL and client authentication NONE Path to the client keystore to send to Kafka SSL.
kafka.ssl.keystore.password String true with Kafka SSL and client authentication NONE Password to open client keystore.