Skip to content

Kafka Stream Output

The kafka_stream_output node writes record to a kafka topic. The input dataset must have a column named value, and an optional column named key. These column will be used for the written record value (resp. key).

In the following example the received dataset has a column parsed_log that we want to write to Kafka. A select expressions is set to rename it to value.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
        type : kafka_stream_output
        component: output
        settings: {
            select_expressions: [
               parsed_log AS value
            ]
            topic: parsed_apache_httpd
            brokers: [
                {
                    host : localhost
                    port : 9092
                }
            ]
            checkpoint_location : /tmp/checkpoint_location
            mode : append
        }
        subscribe: [
            {
                component: punch
                stream: data
            }
        ]
    }

Note

For now, the python version of this PML is missing checkpoint_location parameter. On the other hand, you have the ability to configure the request_timeout option of your kafka producer (not available in the Java one).

PYTHON

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
    "type": "kafka_structured_stream_output",
    "component": "output",
    "settings": {
        "brokers": [{"host": "localhost", "port": 9092}],
        "column_value": "myKeyValue",
        "request_timeout": 30,
        "topic": "a_topic_name"
    },
    "subscribe": [{"component": "input", "stream": "data"}],
    "publish": [{"stream": "data"}]
}

Below is a full PML version (PYTHON) you can try:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
    job = [
        {
            "type": "batch_input",
            "component": "input",
            "publish": [{"stream": "data"}],
            "settings": {
                "input_data": [
                    {
                        "age": 21,
                        "name": "phil"
                    },
                    {
                        "age": 22,
                        "name": "dimi"
                    }
                ]
            }
        },
        {
            "type": "kafka_structured_stream_output",
            "component": "output",
            "settings": {
                "brokers": [{"host": "localhost", "port": 9092}],
                "column_value": "myKeyValue",
                "request_timeout": 30
            },
            "subscribe": [{"component": "input", "stream": "data"}],
            "publish": [{"stream": "data"}]
        },
        {
            "type": "show",
            "component": "show",
            "subscribe": [{"component": "output", "stream": "data"}]
        }
    ]
}

Configuration(s)

  • brokers: List

    Description: a list of json objects with two fields: host: String and port: Integer. This list is used to start a connection with your kafka cluster.

  • column_value: String

    Description: each row of your spark dataframe that will be stored into your kafka cluster will be referenced by a key: column_value

  • request_timeout: Integer (seconds)

    Description: PYTHON only. Because kafka producers API works in an async way, setting this parameter is a must. Depending on the size of your inputted dataframe, you might want to tweak this parameter. By default, if not set, it will result to 30 sec.

  • topic: String

    Description: name of the topic on which you want to push data.