Skip to content

Elastic Stream Output

Before you start...

Before using...

Like our elastic_batch_output, you can use this node to write to your elasticsearch cluster BUT in a streaming fashion.

Pyspark ->

Spark ->

Examples

Use-cases

Our "hello world" punchline configuration.

Below, we illustrate an example on inserting data continuously from a kafka broker to an elasticsearch cluster:

beginner_use_case.punchline

{
    type: punchline
    version: "6.0"
    runtime: spark
    tenant: default
    dag: [
        {
            type: kafka_stream_input
            component: input
            settings: {
                topic: mytenant_apache_httpd_archiving
                brokers: [
                    {
                        host : localhost
                        port : 9092
                    }
                ]
                start_offset_strategy : latest
            }
            publish: [
                    {
                        stream: data
                    }
                ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: SELECT  CAST(value AS STRING) AS tmp FROM input_data
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: elastic_stream_output
            component: elastic
            settings: {
                nodes: [
                    localhost
                ]
                port: 9200
                index: myesstream
                checkpoint_location: /tmp/my_es_checkpoint
            }
            subscribe: [
                {
                    stream: data
                    component: sql
                }
            ]
        }
    ]
}

run beginner_use_case.punchline by using the command below:

    CONF=beginner_use_case.punchline
    punchlinectl start -p $CONF

Comming soon

Comming soon

Parameters

Common Settings

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched. To add a document type, simply append /<type> to your index name.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
output_columns List of Json false NONE A list of Json Objects where each of them can contain 3 fields: field: String [Required] the field found in your document that you want to include in the resulting dataset, alias: String [Optional] attribute a new name to the field you want to retrieve and type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client. See elasticsearch official documentation.
checkpoint_location String true NONE Absolute path where on to store checkpoint location.

Advanced Settings

No advanced settings