Skip to content

Kafka Batch Input

Overview

The kafka_batch_input node makes you job act as a kafka consumer. Records are read from Kafka and send as Dataset. You can then plug in your job sql operator to perform arbitrary batch queries.

Input records are automatically mapped to rows with the following columns:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

We provided a custom kafka offset manager to read from kafka in at least_once mode.

Important

In batch mode, you will have to read from a starting offset to an ending offset. By default, they are set respectively to earliest and latest.

It is also highly recommended to not use spark checkpointing, since they are not adapted for production use-cases and cannot be reused when spark version upgrades...

databrick documentation

The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

{
  type: punchline
  runtime: spark
  version: "6.0"
  dag:
  [
    {
      type: elastic_input
      component: kafka_offset
      settings:
      {
        index: kafka-offset-*
        hits_hits: true
        query:
        {
          size: 1
          sort:
          [
            {
              processed_date:
              {
                order: desc
              }
            }
          ]
          query:
          {
            bool:
            {
              must:
              [
                {
                  constant_score:
                  {
                    filter:
                    {
                      match:
                      {
                        application_offset_id: test_index_offset
                      }
                    }
                  }
                }
              ]
            }
          }
        }
      }
      publish:
      [
        {
          stream: data
        }
      ]
    }
    {
      type: kafka_batch_input
      component: input
      settings:
      {
        topic: hello_world
        punch_offset_manager: true
        application_offset_id: test_index_offset
        kafka_settings:
        {
          failOnDataLost: false
        }
      }
      publish:
      [
        {
          stream: data
        }
      ]
      subscribe:
      [
        {
          stream: data
          component: kafka_offset
          alias: last_committed_meta
        }
      ]
    }
    {
      type: sql
      component: cast
      settings:
      {
        statement:
          '''
          SELECT 
            key, 
            CAST(value AS STRING) AS value, 
            topic, 
            timestamp, 
            timestampType, 
            offset, 
            partition,
            processed_date,
            application_offset_id
          FROM input_data
          '''
      }
      publish:
      [
        {
          stream: data
        }
      ]
      subscribe:
      [
        {
          stream: data
          component: input
        }
      ]
    }
    {
      type: elastic_batch_output
      component: commit_offset
      settings:
      {
        index: kafka-offset-2020.06.08
        output_mode: append
      }
      subscribe:
      [
        {
          stream: data
          component: input
        }
      ]
    }
  ]
}

Parameters

Name Type mandatory Default value Description
brokers List of Json true NONE a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.
topic String true NONE name of the topic on which you want to push data.
punch_offset_manager Boolean false NONE Activate punch custom offset manager for reading
kafka_settings Map K-V false NONE Valid spark kafka settings
application_offset_id String false default An Id that will be added to the published dataframe