Skip to content

Elastic Batch Output

Overview

The elastic_batch_output node enables you to store your tabular dataframe data into an elasticsearch index. It leverages the Elasticsearch-Hadoop plugin for data insertion.

Its default behavior is simply to transform the input dataset into a Json document. Say you receive:

|  column1 | column2 | column3 |
|  "hello" | true    | 17      |
...

The corresponding data will be indexed in elasticsearch:

    {
        "column1": "hello",
        "column2": true,
        "column": 17
    }

If some of your columns String value are nested json document, they will automatically be parsed as Json. I.e :

|  column1         | column2 | column3 |
|  {"name": "bob"} | true    | 17      |
...
becomes

    {
        "column1" : {
            "name": "bob"
        },
        "column2" : true,
        "column3" : 17
    }

Runtime Compatibility

  • PySpark :
  • Spark :

Example

{   
type: punchline
version: "6.0"
runtime: spark
tenant: default
    dag:[
        {
            description:
            '''
            read all metricbeat documents from local elasticsearch
            and gennerate a Dataset<Row> out of it
            '''
            type: elastic_batch_input
            component: input
            settings: {
                index: platform-metricbeat-*
                es_cluster: es_search
                port : 9200
                id_column: id
                output_columns: [
                    {
                        field: beat.version
                    }
                ]
                query : {
                    query: {
                    bool: {
                        must: [
                        {
                            term : { 
                                metricset.module : system 
                            }
                        }
                        {
                            range : {
                            @timestamp : {
                                gte : now-10m
                                lt :  now
                            }
                            }
                        }
                        ]
                    }
                    }
                }
            }
            publish: [
                { 
                    stream: default 
                }
            ]
        }
        {
            description:
            '''
            store all data into an elasticsearch index
            '''
            type: elastic_batch_output
            component: output
            settings: {
                # the target index name
                index: example
                # the target elasticsearch cluster name. This name if the one defined
                # in the elasticsearch  configuration file.
                es_cluster: es_search
                # The name of the column holding the document id
                id_column: id
                # Optional : the elasticsearch nodes listening port
                # port : 9200
                # Optional : insertion mode, by default will result to overwrite
                output_mode: overwrite
            }
            subscribe: [
                { 
                    component: input 
                    stream:default
                }
            ]
        }
    ]
}

Warning

The selected column id must be of type String. Id column is used by output_mode.

Tips

If you want to insert a group of columns instead of the whole dataset, please use an SQL node before hand...

Parameters

Common Settings

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched. To add a document type, simply append /<type> to your index name.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
output_columns List of Json false NONE A list of Json Objects where each of them can contain 3 fields: field: String [Required] the field found in your document that you want to include in the resulting dataset, alias: String [Optional] attribute a new name to the field you want to retrieve and type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client. See elasticsearch official documentation.
es_cluster String false NONE Name of your Elasticsearch cluster located in your punchplatform.properties.
id_column String false id Name of the column that will be used to store each document id. To be noted that _id is a reserved name within Elasticsearch terminology and hence abide yourself from using... By default, we named the field id.
with_null_values Boolean false true Remove columns whose values are null.
output_mode String true NONE Which strategy to choose for storing data, see advanced section.

Elastic Settings

Elastic settings Type Default value Description
es.nodes.path.prefix String NONE /something/to/append in case your elastic servers are behind a proxy
es.size String 50 size of elastic query or size of each scroll query
es.scroll String false enable scrolling request
es.scroll.keepalive String 10m how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc...
es.net.ssl String false enable ssl
es.net.http.auth.pass String NONE must be used with es.net.http.auth.user
es.net.http.auth.user String NONE must be used with es.net.http.auth.pass
es.net.http.auth.token String NONE must be used with es.net.http.auth.token_type
es.net.http.auth.token_type String NONE must be used with es.net.http.auth.token
es.max_concurrent_shard_requests String NONE set how max shards elastic_input node can request at a time
es.nodes.resolve.hostname String false resolve a hostname: be sure that /etc/hosts referenced the proper IP address
es.doc_type String NONE add doc_type to requested URI, this is a deprecated feature by Elastic
es.http.timeout String  1m to override the the HTTP connection timeout
socket_timeout_ms String 2m  timeout before reception data from ES ; increase this if request is using many filtering or many indices

Advanced Settings

output_mode Type Default value Description
overwrite String NONE Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
append String NONE Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
errorifexists String NONE ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
ignore String NONE Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.