Kafka Stream Input¶
The kafka_stream_input node makes you job act as a kafka consumer.
Records are continuously read from Kafka and send as Dataset
Make sure you are familiar with Spark Structure Streaming concepts.
Input records are automatically mapped to rows with the following columns:
1 2 3 4 5 6 7 8 | root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) |
databrick documentation
The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.
If you need to convert and/or select only some of the columns, use a Sql Expressions Node. Here is a simple example to only keep the key and record value as string:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | { type: kafka_stream_input component: input settings: { topic: raw_apache_httpd brokers: [ { host : localhost port : 9092 } ] start_offset_strategy : latest } publish: [ { stream: data } ] } { type: sql_select_expressions component: sql_select_expressions settings: { select_expressions: [ CAST(key AS STRING) CAST(value AS STRING) ] } subscribe: [ { component: input stream: data } ] publish: [ { stream: data } ] } |
Because it is likely you need some sql expressions, the Kafka Input Node
can be directly configured with some. The previous example is equivalent to:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | { type: kafka_structured_stream_input component: input settings: { topic: raw_apache_httpd brokers: [ { host : localhost port : 9092 } ] select_expressions: [ CAST(key AS STRING) CAST(value AS STRING) ] start_offset_strategy : latest } publish: [ { stream: data } ] } |
Important
With Spark Structured Streaming you need to set an output so as to start consuming the data. That is you must terminate your job using a kafka or a console structured srteaming node.