Skip to content

Kafka Stream Input

The kafka_stream_input node makes you job act as a kafka consumer. Records are continuously read from Kafka and send as Dataset. You can then plug in your job sql operator to perform arbitrary continuous queries.

Make sure you are familiar with Spark Structure Streaming concepts.

Input records are automatically mapped to rows with the following columns:

1
2
3
4
5
6
7
8
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

databrick documentation

The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.

If you need to convert and/or select only some of the columns, use a Sql Expressions Node. Here is a simple example to only keep the key and record value as string:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
    type: kafka_stream_input
    component: input
    settings: {
        topic: raw_apache_httpd
        brokers: [
            {
                    host : localhost
                    port : 9092
            }
        ]
        start_offset_strategy : latest
    }
    publish: [
            {
                stream: data
            }
        ]
    }
    {
        type: sql_select_expressions
        component: sql_select_expressions
        settings: {
            select_expressions: [
                CAST(key AS STRING)
                CAST(value AS STRING)
            ]
        }
        subscribe: [
            {
                component: input
                stream: data
            }
        ]
        publish: [
            {
                stream: data
            }
        ]
    }

Because it is likely you need some sql expressions, the Kafka Input Node can be directly configured with some. The previous example is equivalent to:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
    type: kafka_structured_stream_input
    component: input
    settings: {
        topic: raw_apache_httpd
        brokers: [
            {
                    host : localhost
                    port : 9092
            }
        ]
        select_expressions: [
                CAST(key AS STRING)
                CAST(value AS STRING)
        ]
        start_offset_strategy : latest
    }
    publish: [
            {
                stream: data
            }
        ]
    }

Important

With Spark Structured Streaming you need to set an output so as to start consuming the data. That is you must terminate your job using a kafka or a console structured srteaming node.