Kafka Stream Input¶
kafka_stream_input node makes you job act as a kafka consumer. Records are continuously read from Kafka and send as Dataset
You can then plug in your job sql operator to perform arbitrary continuous queries.
Make sure you are familiar with Spark Structure Streaming concepts.
Input records are automatically mapped to rows with the following columns:
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)
The returned DataFrame contains all the familiar fields of a Kafka record and its associated metadata. We can now use all of the familiar DataFrame or Dataset operations to transform the result. Typically, however, we’ll start by parsing the binary values present in the key and value columns. How to interpret these blobs is application specific. Fortunately, Spark SQL contains many built-in transformations for common types of serialization as we’ll show below.
- PySpark : ✅
- Spark : ✅
--- type: punchline version: '6.0' runtime: spark dag: - type: kafka_stream_input component: input settings: topic: mytenant_apache_httpd_archiving start_offset_strategy: latest publish: - stream: data - type: sql component: sql settings: statement: SELECT CAST(value AS AS valuestring FROM input_data subscribe: - component: input stream: data publish: - stream: data - type: console_stream_output component: console settings: mode: Append subscribe: - stream: data component: sql
What did we achieved ? we casted the key and record value as String type. If you need to convert and/or select only some of the columns, use a Sql Node.
|brokers||List of Json||true||NONE||a list of json objects with two fields:
|topic||String||true||NONE||name of the topic on which you want to push data.|
|kafka_settings||Map K-V||false||NONE||Valid spark kafka settings. See Kafka 0.10 doc|
|kafka.security.protocol||String||true with Kafka SSL||NONE||Security protocol to contact Kafka|
|kafka.ssl.truststore.location||String||true with Kafka SSL||NONE||Path to client truststore to check Kafka SSL certificate.|
|kafka.ssl.truststore.password||String||true with Kafka SSL||NONE||Password to open client truststore.|
|kafka.ssl.keystore.location||String||true with Kafka SSL and client authentication||NONE||Path to the client keystore to send to Kafka SSL.|
|kafka.ssl.keystore.password||String||true with Kafka SSL and client authentication||NONE||Password to open client keystore.|