Skip to content

Elastic Batch Input

Before you start...

Before using...

The elastic_batch_input node fetches data from an elasticsearch index. The read data is transformed into a Dataset, ready to be processed by subsequent nodes.

Its default behavior is to transform the input document into rows. You must define the field you are interested in together with their types.

For all of our examples, we will admit that your elasticsearch has an index academy-example containing the following documents:

    {
        "address": {
            "street": "clemenceau"
        },
        "name": "phil",
        "age": 21,
        "friends": [
            "alice"
        ]
    }

Pyspark ->

Spark ->

Examples

Use-cases

Our "hello world" punchline configuration.

beginner_use_case.punchline

{
    tenant: mytenant
    version: "6.0"
    runtime: spark
    type: punchline
    dag: [
        {
            type: elastic_batch_input
            component: input
            settings: {
                index: academy-example
                es_cluster: es_search
                nodes: [
                    localhost
                ]
                output_columns: [
                    {
                        type: string
                        field: address.street
                        alias: address_street
                    }
                    {
                        type: integer
                        field: age
                    }
                ]
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
    ]
}

run beginner_use_case.punchline by using the command below:

CONF=beginner_use_case.punchline
punchlinectl start -p $CONF
What did we achieved ?

Here is how to generate rows containing a single column from the address.street and age json field. Notice the id column. By default we provide each document id.

# this is a preview on stdout after running the instruction
+--------------------+--------------+---+
|                  id|address_street|age|
+--------------------+--------------+---+
|eQeYvWkB3c-USBLvf06M|    clemenceau| 21|
|egeYvWkB3c-USBLvf06M|    clemenceau| 23|
|eweYvWkB3c-USBLvf06M|    clemenceau| 53|
+--------------------+--------------+---+

Specifying a query

Warning

ES-HADOOP columns naming is set by default to the last key of your nested json. For instance: { "key1": { "key2": 1 } } will produce a dataset with a column name "key2" instead of "key1.key2". To bypass this issue, use output_columns parameter and set an alias for the desire column name, for instance "key1_key2".

You may want to read only document based on query. Use the query property:

settings: {
    ...
    query: {
        bool: {
            must: [
                {
                    range: {
                        age: {
                            gte: 20,
                            lte: 30
                        }
                    }
                }
            ]
        }
    } 
}
intermediate_use_case.punchline

{
    tenant: mytenant
    version: "6.0"
    runtime: spark
    type: punchline
    dag: [
        {
            type: elastic_batch_input
            component: input
            settings: {
                query: {
                    query: {
                        bool: {
                            must: [
                                {
                                    range: {
                                        age: {
                                            gte: 20,
                                            lte: 30
                                        }
                                    }
                                }
                            ]
                        }
                    } 
                }
                index: academy-example
                es_cluster: es_search
                nodes: [
                    localhost
                ]
                output_columns: [
                    {
                        type: string
                        field: address.street
                        alias: address_street
                    }
                    {
                        type: integer
                        field: age
                    }
                ]
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
    ]
}

let's excute it with the command below:

Controlling this requests with elastic_settings

settings: {
    // Pass in some elastic properties:
    elastic_settings: {
        es.index.read.missing.as.empty: no
    }
}

**advanced_use_case.punchline**

```punchline
{   
    type: punchline
    version: "6.0"
    runtime: spark
    tenant: default
    dag:[
        {
            description:
            '''
            read all metricbeat documents from local elasticsearch
            and gennerate a Dataset<Row> out of it
            '''
            type: elastic_batch_input
            component: input
            settings: {
                index: platform-metricbeat-*
                nodes: [ 
                    localhost 
                ]
                port : 9200
                elastic_settings: {
                    es.index.read.missing.as.empty: yes
                }
                id_column: id
                output_columns: [
                    {
                        field: beat.version
                    }
                ]
                query : {
                    query: {
                    bool: {
                        must: [
                        {
                            term : { 
                                metricset.module : system 
                            }
                        }
                        {
                            range : {
                            @timestamp : {
                                gte : now-10m
                                lt :  now
                            }
                            }
                        }
                        ]
                    }
                    }
                }
            }
            publish: [
                { 
                    stream: default 
                }
            ]
        }
        {
            description:
            '''
            store all data into an elasticsearch index
            '''
            type: elastic_batch_output
            component: output
            settings: {
                # the target index name
                index: example
                # the target elasticsearch cluster name. This name if the one defined
                # in the elasticsearch  configuration file.
                es_cluster: es_search
                # The name of the column holding the document id
                id_column: id
                # Optional : the elasticsearch nodes listening port
                # port : 9200
                # Optional : insertion mode, by default will result to overwrite
                output_mode: overwrite
            }
            subscribe: [
                { 
                    component: input 
                    stream:default
                }
            ]
        }
    ]
}

Parameters

Common Settings

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched. To add a document type, simply append /<type> to your index name.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
output_columns List of Json false NONE A list of Json Objects where each of them can contain 3 fields: field: String [Required] the field found in your document that you want to include in the resulting dataset, alias: String [Optional] attribute a new name to the field you want to retrieve and type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client. See elasticsearch official documentation.
es_cluster String false NONE Name of your Elasticsearch cluster located in your punchplatform.properties.
id_column String false id Name of the column that will be used to store each document id. To be noted that _id is a reserved name within Elasticsearch terminology and hence abide yourself from using... By default, we named the field id.
with_null_values Boolean false true Remove columns whose values are null.
empty_dataset_columns List of Json false NONE Returns a dataset with a schema and data instead of an empty one.

Advanced Settings

No advanced settings