Skip to content

Elastic Batch Input

Compatible Spark/Pyspark

The elastic_batch_input node fetches data from an elasticsearch index. The read data is transformed into a Dataset, ready to be processed by subsequent nodes.

Its default behavior is simply to transform the input document into rows. You simply must define the field you are interested in together with their types.

Say you have an index "punch-academy-example" with documents:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    {
        "address": {
            "street": "clemenceau"
        },
        "name": "phil",
        "age": 21,
        "friends": [
            "alice"
        ]
    }

Here is how to generate rows containing a single column from the address.street and age json field

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    {
        type: elastic_batch_input
        component: input
        settings: {
            index: academy-example
            cluster_name: es_search
            nodes: [
                localhost
            ]
            output_columns: [
                {
                    type: string
                    field: address.street
                    alias: address_street
                }
                {
                    type: integer
                    field: age
                }
            ]
        }
        publish: [
            {
                stream: data
            }
        ]
    }

The generated dataset will look:

1
2
3
4
5
6
7
+--------------------+--------------+---+
|                  id|address_street|age|
+--------------------+--------------+---+
|eQeYvWkB3c-USBLvf06M|    clemenceau| 21|
|egeYvWkB3c-USBLvf06M|    clemenceau| 23|
|eweYvWkB3c-USBLvf06M|    clemenceau| 53|
+--------------------+--------------+---+

Info

Notice the id column. By default we provide each document id.

Specifying a query

You may want to read only document based on query. Use the query property:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
        settings: {
            ...
            query: {
              bool: {
                must: [
                  {
                    range: {
                      age: {
                        gte: 20,
                        lte: 30
                      }
                    }
                  }
                ]
              }
            } 
        }
    }

You then get

1
2
3
4
5
6
+--------------------+--------------------+---+
|                  id|      address_street|age|
+--------------------+--------------------+---+
|eQeYvWkB3c-USBLvf06M|    clemenceau      | 21|
|egeYvWkB3c-USBLvf06M|    clemenceau      | 23|
+--------------------+--------------------+---+

Warning

ES-HADOOP columns naming is set by default to the last key of your nested json. For instance: { "key1": { "key2": 1 } } will produce a dataset with a column name "key2" instead of "key1.key2". To bypass this issue, use output_columns parameter and set an alias for the desire column name, for instance "key1_key2".

Advanced Options

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
        settings: {
            // you can control the elasticsearch listening port
            port: 9200

            // Should your document not use the default '_doc' type you can
            // set it explicitly
            type: doc

            // Pass in some elastic properties:
            elastic_settings: {
                es.index.read.missing.as.empty: no
            }
        }
    }

Below is an example on how to use this PML node:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
{   
    runtime_id: my-job-test
    tenant: job_tenant_test
    job:[
        {
            description:
            '''
            read all metricbeat documents from local elasticsearch
            and gennerate a Dataset<Row> out of it
            '''
            type: elastic_batch_input
            component: input
            settings: {
                index: platform-metricbeat-*
                type: doc
                cluster_name: es_search
                nodes: [ 
                    localhost 
                ]
                port : 9200
                elastic_settings: {
                    es.index.read.missing.as.empty: yes
                }
                id_column: id
                output_columns: [
                    {
                        field: beat.version
                    }
                ]
                query : {
                    query: {
                      bool: {
                        must: [
                          {
                            term : { 
                                metricset.module : system 
                            }
                          }
                          {
                            range : {
                              @timestamp : {
                                  gte : now-10m
                                  lt :  now
                              }
                            }
                          }
                        ]
                      }
                    }
                }
            }
            publish: [
                { 
                    stream: default 
                }
            ]
        }
        {
            description:
            '''
            store all data into an elasticsearch index
            '''
            type: elastic_batch_output
            component: output
            settings: {
                # the target index name
                index: example
                # the target elasticsearch cluster name. This name if the one defined
                # in the elasticsearch  configuration file.
                cluster_name: es_search
                # the document type. 
                type: doc
                # the list of elasticsearch nodes to connect to. 
                # Note that not all of them need be declared. 
                nodes: [ 
                    localhost 
                ]
                # The name of the column holding the document id
                id_column: id
                # Optional : the elasticsearch nodes listening port
                # port : 9200
                # Optional : insertion mode, by default will result to overwrite
                output_mode: overwrite
            }
            subscribe: [
                { 
                    component: input 
                    stream:default
                }
            ]
        }
    ]

Configuration(s)

  • index: String

    Description: [Required] The name of your elasticsearch index where data will be fetched.

  • port: Integer

    Description: [Optional] Your Elasticsearch server Port.

  • type: String

    Description: [Optional] Document type that will be retrieved from your elasticsearch index.

  • query: Json

    Description: [Optional] A valid Elasticsearch query.

  • nodes: List

    Description: [Required] Hostnames of your elasticsearch nodes. In general, only one hostname is needed.

  • elastic_settings: Map

    Description: [Optional] Specify ES-Hadoop options as key-value: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html. Some are them are disabled by default as they are used by our internal code.

  • cluster_name: String

    Description: [Optional] Name of your Elasticsearch cluster.

  • id_column: String

    Description: [Optional] Name of the column that will be used to store each document id. To be noted that _id is a reserved name within Elasticsearch terminology and hence abide yourself from using... By default, we named the field id.

  • output_columns: List

    Description: [Optional] A list of Json Objects where each of them can contain 3 fields:

    1
    2
    3
    field: String [Required] the field found in your document that you want to include in the resulting dataset
    alias: String [Optional] attribute a new name to the field you want to retrieve
    type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
    

    If output_columns field is not set, all data will be published with their default type.

  • with_null_values: Boolean

    Description: [Optional] Remove columns whose values are null.