Kafka Stream Output¶
Overview¶
The kafka_stream_output node writes record to a kafka topic.
The input dataset must have a column named value, and an optional column named key.
These columns will be used for the written record value (resp. key). You can specify some custom column names
using the column_value and the column_key parameters.
Runtime Compatibility¶
- PySpark : ✅
- Spark : ✅
Example¶
---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: kafka_stream_input
component: input
settings:
topic: mytenant_apache_httpd_archiving
start_offset_strategy: latest
publish:
- stream: data
- type: sql
component: sql
settings:
statement: SELECT CAST(vSTRING) AS parsed_log FROM input_data
subscribe:
- component: input
stream: data
publish:
- stream: data
- type: kafka_stream_output
component: output
settings:
topic: new_topic_for_test
column_value: tmp
mode: Append
checkpoint_location: "/tmp/kafkatest"
subscribe:
- stream: data
component: sql
What did we achieved ? the received dataset has a column parsed_log that we want to write to Kafka. We let our Kafka writer to map parsed_log as our value column.
Parameters¶
| Name | Type | mandatory | Default value | Description |
|---|---|---|---|---|
| brokers | List of Json | true | NONE | a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster. |
| column_value | String | false | value | select the column data you want to store in your kafka cluster. By default, will consider that the incoming dataframe has a value column... |
| column_key | String | false | key | select the column key you want to store in your kafka cluster. By default, will consider that the incoming dataframe has a key column... |
| topic | String | true | NONE | name of the topic on which you want to push data. |
| checkpoint_location | String | true | NONE | path where you want to store checkpoint data in case of failure recovery. |
| kafka_settings | Map K-V | false | NONE | Valid spark kafka settings. See Kafka 0.10 doc |
Kafka Settings¶
| Name | Type | mandatory | Default value | Description |
|---|---|---|---|---|
| kafka.security.protocol | String | true with Kafka SSL | NONE | Security protocol to contact Kafka |
| kafka.ssl.truststore.location | String | true with Kafka SSL | NONE | Path to client truststore to check Kafka SSL certificate. |
| kafka.ssl.truststore.password | String | true with Kafka SSL | NONE | Password to open client truststore. |
| kafka.ssl.keystore.location | String | true with Kafka SSL and client authentication | NONE | Path to the client keystore to send to Kafka SSL. |
| kafka.ssl.keystore.password | String | true with Kafka SSL and client authentication | NONE | Password to open client keystore. |