Skip to content

Kafka Stream Output

Overview

The kafka_stream_output node writes record to a kafka topic. The input dataset must have a column named value, and an optional column named key. These columns will be used for the written record value (resp. key). You can specify some custom column names using the column_value and the column_key parameters.

Runtime Compatibility

  • PySpark :
  • Spark :

Example

---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: kafka_stream_input
  component: input
  settings:
    topic: mytenant_apache_httpd_archiving
    start_offset_strategy: latest
  publish:
  - stream: data
- type: sql
  component: sql
  settings:
    statement: SELECT  CAST(vSTRING) AS parsed_log FROM input_data
  subscribe:
  - component: input
    stream: data
  publish:
  - stream: data
- type: kafka_stream_output
  component: output
  settings:
    topic: new_topic_for_test
    column_value: tmp
    mode: Append
    checkpoint_location: "/tmp/kafkatest"
  subscribe:
  - stream: data
    component: sql

What did we achieved ? the received dataset has a column parsed_log that we want to write to Kafka. We let our Kafka writer to map parsed_log as our value column.

Parameters

Name Type mandatory Default value Description
brokers List of Json true NONE a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.
column_value String false value select the column data you want to store in your kafka cluster. By default, will consider that the incoming dataframe has a value column...
column_key String false key select the column key you want to store in your kafka cluster. By default, will consider that the incoming dataframe has a key column...
topic String true NONE name of the topic on which you want to push data.
checkpoint_location String true NONE path where you want to store checkpoint data in case of failure recovery.
kafka_settings Map K-V false NONE Valid spark kafka settings. See Kafka 0.10 doc

Kafka Settings

Name Type mandatory Default value Description
kafka.security.protocol String true with Kafka SSL NONE Security protocol to contact Kafka
kafka.ssl.truststore.location String true with Kafka SSL NONE Path to client truststore to check Kafka SSL certificate.
kafka.ssl.truststore.password String true with Kafka SSL NONE Password to open client truststore.
kafka.ssl.keystore.location String true with Kafka SSL and client authentication NONE Path to the client keystore to send to Kafka SSL.
kafka.ssl.keystore.password String true with Kafka SSL and client authentication NONE Password to open client keystore.