Skip to content

Kafka Batch Output

Overview

The kafka_batch_output node makes you job act as a kafka producer. A column labeled value is expected. Each row from your dataset will be sent to a kafka topic one by one.

By design, this node will stop when all rows from the inputted dataset are inserted into kafka.

Warning

On production platform it is required that you create the topic in which you want to insert manually.

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

---
type: punchline
runtime: spark
version: '6.0'
dag:
- settings:
    input_data:
    - date: "{{ from }}"
      name: from_date
    - date: "{{ to }}"
      name: to_date
  component: input
  publish:
  - stream: data
  type: dataset_generator
- settings:
    truncate: false
  component: show
  subscribe:
  - component: input
    stream: data
  type: show
- type: sql
  component: sql
  settings:
    statement: SELECT to_json(map('people', named_struct('date', date, 'name', name)))
      AS value FROM input_data
  subscribe:
  - component: input
    stream: data
  publish:
  - stream: data
- type: kafka_batch_output
  component: output_kafka
  subscribe:
  - component: sql
    stream: data
  settings:
    topic: jonathan

Expected input dataset

key (optional) value (required) topic (optional)
can be string type or binary can be string type or binary string type only

Key column can be used as an id column (i.e partition number) where all rows with similar id will be sent to the same partition/filtering data before parsing value column.

This can be used to guarantee data ordering within a kafka partition while also enabling the users to do Kafka Sql on topics...

In case no key is defined, data are sent to different partition using the round-robin fashion of a given topic.

Parameters

Name Type mandatory Default value Description
brokers List of Json true NONE a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.
topic String true NONE name of the topic on which you want to push data.
kafka_settings Map K-V false NONE Valid spark kafka settings. See Kafka 0.10 doc

Kafka Settings

Name Type mandatory Default value Description
kafka.security.protocol String true with Kafka SSL NONE Security protocol to contact Kafka
kafka.ssl.truststore.location String true with Kafka SSL NONE Path to client truststore to check Kafka SSL certificate.
kafka.ssl.truststore.password String true with Kafka SSL NONE Password to open client truststore.
kafka.ssl.keystore.location String true with Kafka SSL and client authentication NONE Path to the client keystore to send to Kafka SSL.
kafka.ssl.keystore.password String true with Kafka SSL and client authentication NONE Password to open client keystore.