Skip to content

Kafka Stream Input

Overview

The kafka_stream_input node makes you job act as a kafka consumer. Records are continuously read from Kafka and send as Dataset.

You can then plug in your job sql operator to perform arbitrary continuous queries.

Make sure you are familiar with Spark Structure Streaming concepts.

Input records are automatically mapped to rows with the following columns:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

!!!important

1
    Due to spark streaming concepts, it is expected that the terminating node of any structured streaming node to be a structured streaming itself. To make it simpler, make sure that the last node in your graph is an output(sink) structured streaming...

databrick documentation

The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.

Runtime Compatibility

  • PySpark :
  • Spark :

Example

---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: kafka_stream_input
  component: input
  settings:
    topic: mytenant_apache_httpd_archiving
    start_offset_strategy: latest
  publish:
  - stream: data
- type: sql
  component: sql
  settings:
    statement: SELECT CAST(value AS AS valuestring FROM input_data
  subscribe:
  - component: input
    stream: data
  publish:
  - stream: data
- type: console_stream_output
  component: console
  settings:
    mode: Append
  subscribe:
  - stream: data
    component: sql

What did we achieved ? we casted the key and record value as String type. If you need to convert and/or select only some of the columns, use a Sql Node.

Parameters

Name Type mandatory Default value Description
brokers List of Json true NONE a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.
topic String true NONE name of the topic on which you want to push data.
kafka_settings Map K-V false NONE Valid spark kafka settings. See Kafka 0.10 doc

Kafka Settings

Name Type mandatory Default value Description
kafka.security.protocol String true with Kafka SSL NONE Security protocol to contact Kafka
kafka.ssl.truststore.location String true with Kafka SSL NONE Path to client truststore to check Kafka SSL certificate.
kafka.ssl.truststore.password String true with Kafka SSL NONE Password to open client truststore.
kafka.ssl.keystore.location String true with Kafka SSL and client authentication NONE Path to the client keystore to send to Kafka SSL.
kafka.ssl.keystore.password String true with Kafka SSL and client authentication NONE Password to open client keystore.