Kafka Batch Input¶
Overview¶
The kafka_batch_input
node makes your job act as a kafka consumer.
Records are read from Kafka and send as Dataset
Input records are automatically mapped to rows with the following columns:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
We provided a custom kafka offset manager to read from kafka in at least_once mode.
Important
In batch mode, you will have to read from a starting offset to an ending offset. By default, they are set respectively to earliest and latest.
It is also highly recommended to not use spark checkpointing, since they are not adapted for production use-cases and cannot be reused when spark version upgrades...
databrick documentation
The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.
Runtime Compatibility¶
- PySpark : ✅
- Spark : ✅
Examples¶
---
type: punchline
runtime: spark
version: '6.0'
dag:
- type: elastic_input
component: kafka_offset
settings:
index: kafka-offset-*
hits_hits: true
query:
size: 1
sort:
- processed_date:
order: desc
query:
bool:
must:
- constant_score:
filter:
match:
application_offset_id: test_index_offset
publish:
- stream: data
- type: kafka_batch_input
component: input
settings:
topic: hello_world
punch_offset_manager: true
application_offset_id: test_index_offset
kafka_settings:
failOnDataLost: false
publish:
- stream: data
subscribe:
- stream: data
component: kafka_offset
alias: last_committed_meta
- type: sql
component: cast
settings:
statement: |
SELECT
key,
CAST(value AS STRING) AS value,
topic,
timestamp,
timestampType,
offset,
partition,
processed_date,
application_offset_id FROM input_data
publish:
- stream: data
subscribe:
- stream: data
component: input
- type: elastic_output
component: commit_offset
settings:
index:
type: constant
value: kafka-offset-2020.06.08
output_mode: append
subscribe:
- stream: data
component: input
Parameters¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
brokers | List of Json | true | NONE | a list of json objects with two fields: host : String and port : Integer. This list is used to start a connection with your kafka cluster. |
topic | String | true | NONE | name of the topic on which you want to push data. |
punch_offset_manager | Boolean | false | NONE | Activate punch custom offset manager for reading |
kafka_settings | Map K-V | false | NONE | Valid spark kafka settings. See Kafka 0.10 doc |
application_offset_id | String | false | default | An Id that will be added to the published dataframe |
Kafka Settings¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
kafka.security.protocol | String | true with Kafka SSL | NONE | Security protocol to contact Kafka |
kafka.ssl.truststore.location | String | true with Kafka SSL | NONE | Path to client truststore to check Kafka SSL certificate. |
kafka.ssl.truststore.password | String | true with Kafka SSL | NONE | Password to open client truststore. |
kafka.ssl.keystore.location | String | true with Kafka SSL and client authentication | NONE | Path to the client keystore to send to Kafka SSL. |
kafka.ssl.keystore.password | String | true with Kafka SSL and client authentication | NONE | Password to open client keystore. |