Skip to content

Kafka Stream Input

Overview

Compatible Spark/Pyspark

The kafka_stream_input node makes you job act as a kafka consumer. Records are continuously read from Kafka and send as Dataset. You can then plug in your job sql operator to perform arbitrary continuous queries.

Make sure you are familiar with Spark Structure Streaming concepts.

Input records are automatically mapped to rows with the following columns:

1
2
3
4
5
6
7
8
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

Notice

Important

Due to spark streaming concepts, it is expected that the terminating node of any structured streaming node to be a structured streaming itself. To make it simpler, make sure that the last node in your graph is an output(sink) structured streaming...

databrick documentation

The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.

Example(s)

If you need to convert and/or select only some of the columns, use a Sql Node. Here is a simple example to only keep the key and record value as string:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
{
    job: [
        {
            type: kafka_stream_input
            component: input
            settings: {
                topic: mytenant_apache_httpd_archiving
                brokers: [
                    {
                        host : localhost
                        port : 9092
                    }
                ]
                start_offset_strategy : latest
            }
            publish: [
                    {
                        stream: data
                    }
                ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: SELECT CAST(value AS STRING) AS valuestring FROM input_data
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: console_stream_output
            component: console
            settings: {
                mode: Append
            }
            subscribe: [
                {
                    stream: data
                    component: sql
                }
            ]
        }
    ]
}

Configuration(s)

  • brokers: List

    Description: [REQUIRED] a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.

  • topic: String

    Description: [REQUIRED] name of the topic on which you want to push data.