Skip to content

Elastic Batch Output

Compatible Spark/Pyspark

The elastic_batch_output node enables you to store your tabular dataframe data into an elasticsearch index. It leverages the Elasticsearch-Hadoop plugin for data insertion.

Its default behavior is simply to transform the input dataset into a Json document. Say you receive:

1
2
3
|  column1 | column2 | column3 |
|  "hello" | true    | 17      |
 ...

The corresponding data will be indexed in elasticsearch:

1
2
3
4
5
    {
        "column1": "hello",
        "column2": true,
        "column": 17
    }

If some of your columns String value are nested json document, they will automatically be parsed as Json. I.e :

1
2
3
|  column1         | column2 | column3 |
|  {"name": "bob"} | true    | 17      |
 ...

becomes

1
2
3
4
5
6
7
    {
        "column1" : {
            "name": "bob"
        },
        "column2" : true,
        "column3" : 17
    }

Below is an example on how to use this PML node:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
{   
    runtime_id: my-job-test
    tenant: job_tenant_test
    job:[
        {
            description:
            '''
            read all metricbeat documents from local elasticsearch
            and gennerate a Dataset<Row> out of it
            '''
            type: elastic_batch_input
            component: input
            settings: {
                index: platform-metricbeat-*
                type: doc
                cluster_name: es_search
                nodes: [ 
                    localhost 
                ]
                port : 9200
                elastic_settings: {
                    es.index.read.missing.as.empty: yes
                }
                id_column: id
                output_columns: [
                    {
                        field: beat.version
                    }
                ]
                query : {
                    query: {
                      bool: {
                        must: [
                          {
                            term : { 
                                metricset.module : system 
                            }
                          }
                          {
                            range : {
                              @timestamp : {
                                  gte : now-10m
                                  lt :  now
                              }
                            }
                          }
                        ]
                      }
                    }
                }
            }
            publish: [
                { 
                    stream: default 
                }
            ]
        }
        {
            description:
            '''
            store all data into an elasticsearch index
            '''
            type: elastic_batch_output
            component: output
            settings: {
                # the target index name
                index: example
                # the target elasticsearch cluster name. This name if the one defined
                # in the elasticsearch  configuration file.
                cluster_name: es_search
                # the document type. 
                type: doc
                # the list of elasticsearch nodes to connect to. 
                # Note that not all of them need be declared. 
                nodes: [ 
                    localhost 
                ]
                # The name of the column holding the document id
                id_column: id
                # Optional : the elasticsearch nodes listening port
                # port : 9200
                # Optional : insertion mode, by default will result to overwrite
                output_mode: overwrite
            }
            subscribe: [
                { 
                    component: input 
                    stream:default
                }
            ]
        }
    ]

Controlling the document id

If you want a given column to be used as document id, use the column_id property:

1
2
3
4
5
6
7
8
9
{
    type: elastic_batch_output
    component: output
    settings: {
        ...
        column_id: column1
        ...
    }
}

Warning

The selected column id must be of type String. Id column is used by output_mode.

Info

If you want to insert a group of columns instead of the whole dataset, please use an SQL statement node before hand...

Configuration(s)

  • index: String

    Description: [Required] The name of your elasticsearch index where data will be saved.

  • port: Integer

    Description: [Optional] Your Elasticsearch server Port.

  • type: String

    Description: [Optional] Document type that will be used upon insertion in your elasticsearch index.

  • nodes: List

    Description: [Required] Hostnames of your elasticsearch nodes. In general, only one hostname is needed.

  • output_mode: String

    Description: [Optional] Takes values

    1
    2
    3
    4
    5
    6
    7
    1) overwrite: Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
    
    2) append: Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
    
    3) errorifexists: ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
    
    4) ignore: Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.
    
  • elastic_settings: Map

    Description: [Optional] Specify ES-Hadoop options as key-value: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html. Some are them are disabled by default as they are used by our internal code.

  • cluster_name: String

    Description: [Optional] Name of your Elasticsearch cluster.

  • id_column: String

    Description: [Optional] Name of the column that will be used to store each document id. To be noted that _id is a reserved name within Elasticsearch terminology and hence abide yourself from using...