Skip to content

Kafka Batch Input

Overview

The kafka_batch_input node makes you job act as a kafka consumer. Records are read from Kafka and send as Dataset. You can then plug in your job sql operator to perform arbitrary batch queries.

Input records are automatically mapped to rows with the following columns:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

We provided a custom kafka offset manager to read from kafka in at least_once mode.

Important

In batch mode, you will have to read from a starting offset to an ending offset. By default, they are set respectively to earliest and latest.

It is also highly recommended to not use spark checkpointing, since they are not adapted for production use-cases and cannot be reused when spark version upgrades...

databrick documentation

The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

{
    type: punchline
    runtime: spark
    version: "6.0"
    dag: [
        {
            type: elastic_input
            component: kafka_offset
            settings: {
                index: kafka-offset-*
                hits_hits: true
                query: {
                    size: 1
                    sort: [
                        {
                            processed_date: {
                                order: desc
                            }
                        }
                    ]
                    query: {
                        bool: {
                            must: [
                                {
                                    constant_score: {
                                        filter: {
                                            match: {
                                                application_offset_id: test_index_offset
                                            }
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
                nodes: [
                    localhost
                ]
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: kafka_batch_input
            component: input
            settings: {
                topic: hello_world
                // offset manager will +1 on received offset
                punch_offset_manager: true
                application_offset_id: test_index_offset
                kafka_settings: {
                    failOnDataLost: false
                }
                brokers: [
                    {
                        host: localhost
                        port: 9092
                    }
                ]
            }
            publish: [
                {
                    stream: data
                }
            ]
            subscribe: [
                {
                    stream: data
                    component: kafka_offset
                    alias: last_committed_meta
                }
            ]
        }
        {
            type: sql
            component: cast
            settings: {
                statement: '''
                    SELECT key, 
                        CAST(value AS STRING) AS value, 
                        topic, 
                        timestamp, 
                        timestampType, 
                        offset, 
                        partition,
                        processed_date,
                        application_offset_id
                    FROM input_data
                '''
            }
            publish: [
                {
                    stream: data
                }
            ]
            subscribe: [
                {
                    stream: data
                    component: input
                }
            ]
        }
        {
            type: elastic_batch_output
            component: commit_offset
            settings: {
                index: "kafka-offset-2020.06.08"
                output_mode: append
                nodes: [
                    localhost
                ]
            }
            subscribe: [
                {
                    stream: data
                    component: input
                }
            ]
        }
    ]
}

Parameters

Name Type mandatory Default value Description
brokers List of Json true NONE a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.
topic String true NONE name of the topic on which you want to push data.
punch_offset_manager Boolean false NONE Activate punch custom offset manager for reading
kafka_settings Map K-V false NONE Valid spark kafka settings
application_offset_id String false default An Id that will be added to the published dataframe