Skip to content

Kafka Stream Input

Before you start...

Before using...

The kafka_stream_input node makes you job act as a kafka consumer. Records are continuously read from Kafka and send as Dataset. You can then plug in your job sql operator to perform arbitrary continuous queries.

Make sure you are familiar with Spark Structure Streaming concepts.

Input records are automatically mapped to rows with the following columns:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
important to know

Due to spark streaming concepts, it is expected that the terminating node of any structured streaming node to be a structured streaming itself. To make it simpler, make sure that the last node in your graph is an output(sink) structured streaming...

databrick documentation

The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.

Pyspark ->

Spark ->

Examples

Use-cases

Our "hello world" punchline configuration.

beginner_use_case.punchline

{
    type: punchline
    version: "6.0"
    runtime: spark
    tenant: default
    dag: [
        {
            type: kafka_stream_input
            component: input
            settings: {
                topic: mytenant_apache_httpd_archiving
                brokers: [
                    {
                        host : localhost
                        port : 9092
                    }
                ]
                start_offset_strategy : latest
            }
            publish: [
                    {
                        stream: data
                    }
                ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: SELECT CAST(value AS STRING) AS valuestring FROM input_data
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: console_stream_output
            component: console
            settings: {
                mode: Append
            }
            subscribe: [
                {
                    stream: data
                    component: sql
                }
            ]
        }
    ]
}

run beginner_use_case.punchline by using the command below:

CONF=beginner_use_case.punchline
punchlinectl start -p $CONF
What did we achieved ?

Casting the key and record value as String type

tips

If you need to convert and/or select only some of the columns, use a Sql Node.

Comming soon

Comming soon

Parameters

Common Settings

Name Type mandatory Default value Description
brokers List of Json true NONE a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.
topic String true NONE name of the topic on which you want to push data.

Advanced Settings

No advanced settings