Kafka Stream Output¶
Overview¶
Compatible Spark/Pyspark
The kafka_stream_output
node writes record to a kafka topic.
The input dataset must have a column named value
, and an optional column named key
.
These column will be used for the written record value (resp. key).
Example(s)¶
In the following example the received dataset has a column parsed_log
that we want to
write to Kafka. We let our Kafka writer to map parsed_log
as our value
column.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | { job: [ { type: kafka_stream_input component: input settings: { topic: mytenant_apache_httpd_archiving brokers: [ { host : localhost port : 9092 } ] start_offset_strategy : latest } publish: [ { stream: data } ] } { type: sql component: sql settings: { statement: SELECT CAST(value AS STRING) AS parsed_log FROM input_data } subscribe: [ { component: input stream: data } ] publish: [ { stream: data } ] } { type: kafka_stream_output component: output settings: { topic: new_topic_for_test column_value: tmp mode: Append checkpoint_location: /tmp/kafkatest brokers: [ { host : localhost port : 9092 } ] } subscribe: [ { stream: data component: sql } ] } ] } |
Configuration(s)¶
-
brokers
: ListDescription: [REQUIRED] a list of json objects with two fields:
host
: String andport
: Integer. This list is used to start a connection with your kafka cluster. -
column_value
: StringDescription: [NOT_REQUIRED] select the column data you want to store in your kafka cluster. By default, will consider that the incoming dataframe has a value column...
-
topic
: StringDescription: [REQUIRED] name of the topic on which you want to push data.
-
checkpoint_location
: StringDescription: [REQUIRED] path where you want to store checkpoint data in case of failure recovery.