Kafka Batch Output¶
Overview¶
The kafka_batch_output
node makes you job act as a kafka producer.
A column labeled value
is expected. Each row from your dataset will be sent to a kafka topic one by one.
By design, this node will stop when all rows from the inputted dataset are inserted into kafka.
Warning
On production platform it is required that you create the topic in which you want to insert manually.
Runtime Compatibility¶
- PySpark : ✅
- Spark : ✅
Examples¶
---
type: punchline
runtime: spark
version: '6.0'
dag:
- settings:
input_data:
- date: "{{ from }}"
name: from_date
- date: "{{ to }}"
name: to_date
component: input
publish:
- stream: data
type: dataset_generator
- settings:
truncate: false
component: show
subscribe:
- component: input
stream: data
type: show
- type: sql
component: sql
settings:
statement: SELECT to_json(map('people', named_struct('date', date, 'name', name)))
AS value FROM input_data
subscribe:
- component: input
stream: data
publish:
- stream: data
- type: kafka_batch_output
component: output_kafka
subscribe:
- component: sql
stream: data
settings:
topic: jonathan
Expected input dataset¶
key (optional) | value (required) | topic (optional) |
---|---|---|
can be string type or binary | can be string type or binary | string type only |
Key column can be used as an id column (i.e partition number) where all rows with similar id will be sent to the same partition/filtering data before parsing value column.
This can be used to guarantee data ordering within a kafka partition while also enabling the users to do Kafka Sql on topics...
In case no key is defined, data are sent to different partition using the round-robin fashion of a given topic.
Parameters¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
brokers | List of Json | true | NONE | a list of json objects with two fields: host : String and port : Integer. This list is used to start a connection with your kafka cluster. |
topic | String | true | NONE | name of the topic on which you want to push data. |
kafka_settings | Map K-V | false | NONE | Valid spark kafka settings. See Kafka 0.10 doc |
Kafka Settings¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
kafka.security.protocol | String | true with Kafka SSL | NONE | Security protocol to contact Kafka |
kafka.ssl.truststore.location | String | true with Kafka SSL | NONE | Path to client truststore to check Kafka SSL certificate. |
kafka.ssl.truststore.password | String | true with Kafka SSL | NONE | Password to open client truststore. |
kafka.ssl.keystore.location | String | true with Kafka SSL and client authentication | NONE | Path to the client keystore to send to Kafka SSL. |
kafka.ssl.keystore.password | String | true with Kafka SSL and client authentication | NONE | Password to open client keystore. |