Kafka Stream Input¶
Overview¶
The kafka_stream_input
node makes you job act as a kafka consumer. Records are continuously read from Kafka and send as Dataset
You can then plug in your job sql operator to perform arbitrary continuous queries.
Make sure you are familiar with Spark Structure Streaming concepts.
Input records are automatically mapped to rows with the following columns:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
!!!important
1 |
|
databrick documentation
The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.
Runtime Compatibility¶
- PySpark : ✅
- Spark : ✅
Example¶
---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: kafka_stream_input
component: input
settings:
topic: mytenant_apache_httpd_archiving
start_offset_strategy: latest
publish:
- stream: data
- type: sql
component: sql
settings:
statement: SELECT CAST(value AS AS valuestring FROM input_data
subscribe:
- component: input
stream: data
publish:
- stream: data
- type: console_stream_output
component: console
settings:
mode: Append
subscribe:
- stream: data
component: sql
What did we achieved ? we casted the key and record value as String type. If you need to convert and/or select only some of the columns, use a Sql Node.
Parameters¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
brokers | List of Json | true | NONE | a list of json objects with two fields: host : String and port : Integer. This list is used to start a connection with your kafka cluster. |
topic | String | true | NONE | name of the topic on which you want to push data. |
kafka_settings | Map K-V | false | NONE | Valid spark kafka settings. See Kafka 0.10 doc |
Kafka Settings¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
kafka.security.protocol | String | true with Kafka SSL | NONE | Security protocol to contact Kafka |
kafka.ssl.truststore.location | String | true with Kafka SSL | NONE | Path to client truststore to check Kafka SSL certificate. |
kafka.ssl.truststore.password | String | true with Kafka SSL | NONE | Password to open client truststore. |
kafka.ssl.keystore.location | String | true with Kafka SSL and client authentication | NONE | Path to the client keystore to send to Kafka SSL. |
kafka.ssl.keystore.password | String | true with Kafka SSL and client authentication | NONE | Password to open client keystore. |