Kafka Stream Output¶
The kafka_stream_output
node writes record to a kafka topic.
The input dataset must have a column named value
, and an optional column named key
.
These column will be used for the written record value (resp. key).
In the following example the received dataset has a column parsed_log
that we want to
write to Kafka. A select expressions is set to rename it to value
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | { type : kafka_stream_output component: output settings: { select_expressions: [ parsed_log AS value ] topic: parsed_apache_httpd brokers: [ { host : localhost port : 9092 } ] checkpoint_location : /tmp/checkpoint_location mode : append } subscribe: [ { component: punch stream: data } ] } |
Note
For now, the python version of this PML is missing checkpoint_location parameter. On the other hand, you have the ability to configure the request_timeout option of your kafka producer (not available in the Java one).
PYTHON
1 2 3 4 5 6 7 8 9 10 11 12 | { "type": "kafka_structured_stream_output", "component": "output", "settings": { "brokers": [{"host": "localhost", "port": 9092}], "column_value": "myKeyValue", "request_timeout": 30, "topic": "a_topic_name" }, "subscribe": [{"component": "input", "stream": "data"}], "publish": [{"stream": "data"}] } |
Below is a full PML version (PYTHON) you can try:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | { job = [ { "type": "batch_input", "component": "input", "publish": [{"stream": "data"}], "settings": { "input_data": [ { "age": 21, "name": "phil" }, { "age": 22, "name": "dimi" } ] } }, { "type": "kafka_structured_stream_output", "component": "output", "settings": { "brokers": [{"host": "localhost", "port": 9092}], "column_value": "myKeyValue", "request_timeout": 30 }, "subscribe": [{"component": "input", "stream": "data"}], "publish": [{"stream": "data"}] }, { "type": "show", "component": "show", "subscribe": [{"component": "output", "stream": "data"}] } ] } |
Configuration(s)¶
-
brokers
: ListDescription: a list of json objects with two fields:
host
: String andport
: Integer. This list is used to start a connection with your kafka cluster. -
column_value
: StringDescription: each row of your spark dataframe that will be stored into your kafka cluster will be referenced by a key:
column_value
-
request_timeout
: Integer (seconds)Description: PYTHON only. Because kafka producers API works in an async way, setting this parameter is a must. Depending on the size of your inputted dataframe, you might want to tweak this parameter. By default, if not set, it will result to 30 sec.
-
topic
: StringDescription: name of the topic on which you want to push data.