Kafka Stream Output¶
Overview¶
The kafka_stream_output
node writes record to a kafka topic.
The input dataset must have a column named value
, and an optional column named key
.
These columns will be used for the written record value (resp. key). You can specify some custom column names
using the column_value
and the column_key
parameters.
Runtime Compatibility¶
- PySpark : ✅
- Spark : ✅
Example¶
---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: kafka_stream_input
component: input
settings:
topic: mytenant_apache_httpd_archiving
start_offset_strategy: latest
publish:
- stream: data
- type: sql
component: sql
settings:
statement: SELECT CAST(vSTRING) AS parsed_log FROM input_data
subscribe:
- component: input
stream: data
publish:
- stream: data
- type: kafka_stream_output
component: output
settings:
topic: new_topic_for_test
column_value: tmp
mode: Append
checkpoint_location: "/tmp/kafkatest"
subscribe:
- stream: data
component: sql
What did we achieved ? the received dataset has a column parsed_log
that we want to write to Kafka. We let our Kafka writer to map parsed_log
as our value
column.
Parameters¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
brokers | List of Json | true | NONE | a list of json objects with two fields: host : String and port : Integer. This list is used to start a connection with your kafka cluster. |
column_value | String | false | value | select the column data you want to store in your kafka cluster. By default, will consider that the incoming dataframe has a value column... |
column_key | String | false | key | select the column key you want to store in your kafka cluster. By default, will consider that the incoming dataframe has a key column... |
topic | String | true | NONE | name of the topic on which you want to push data. |
checkpoint_location | String | true | NONE | path where you want to store checkpoint data in case of failure recovery. |
kafka_settings | Map K-V | false | NONE | Valid spark kafka settings. See Kafka 0.10 doc |
Kafka Settings¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
kafka.security.protocol | String | true with Kafka SSL | NONE | Security protocol to contact Kafka |
kafka.ssl.truststore.location | String | true with Kafka SSL | NONE | Path to client truststore to check Kafka SSL certificate. |
kafka.ssl.truststore.password | String | true with Kafka SSL | NONE | Password to open client truststore. |
kafka.ssl.keystore.location | String | true with Kafka SSL and client authentication | NONE | Path to the client keystore to send to Kafka SSL. |
kafka.ssl.keystore.password | String | true with Kafka SSL and client authentication | NONE | Password to open client keystore. |