Skip to content

Elastic Input Node

Compatible Spark/Pyspark

The elastic_input nodes enable you to fetch data from your elasticsearch cluster and load in inside your spark cluster. Two modes are currently available:

  • hits_hits
  • aggregation

Warning

Only one of these mode should be set to true at a time. In case aggregation or hits_hits is set to true, the results will be a spark dataset with an auto inferred schema.

Info

If the minimum configuration is set, full raw json data (received from elasticsearch) will be returned by this node within a spark dataset. The number of columns present will be determined on the number of hits present and the size you specified within your elasticsearch query. By default the size is set to 10.

Examples

Basic configuration

This configuration will output a dataframe with a single column where each rows contains the response from Elasticsearch.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
  job:[
    {
      type: elastic_input
      component: input
      settings:
      {
        index: mytenant-events-*
        nodes:
        [
          localhost
        ]
      }
      publish:
      [
        {
          stream: data
        }
      ]
    }
  ]
}

Aggregation Query

Below is an aggregation configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
  job: [
    {
      type: elastic_input
      component: input
      settings:
      {
        index: city_offices
        nodes:
        [
          localhost
        ]
        // the query part correspond to the part following the '_search'
        // part of the query url. See the example below.
        query: {
            // Set a zero size to avoid retrieving the matching document
            // I.e. what you want are the aggregation buckets.
            size: 0
            aggregations: {
              cities: {
                terms: {
                  field: city
                  size: 50
                }
                aggregations: {
                  office_types: {
                    terms: {
                      field : office_types
                    }
                  }
                }
              }
            }
          }
        }
      publish:
      [
        {
          stream: default
          field: aggregates
        }
      ]
      verbose: true
    }
  ]
}

To make it clear, here is the corresponding elasticsearch query.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
curl -XGET "http://localhost:9200/city_offices/_search" -H 'Content-Type: application/json' -d'
{
  "aggregations": {
    "cities": {
      "terms": { 
        "field": "city",
        "size": 50
      },
      "aggregations": {
        "office_types": {
          "terms": {
            "field": "office_type"
          }
        }
      }
    }
  }
}'

Several Aggregations

You can execute several aggregagtions. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
     query: {
          // Set a zero size to avoid retrieving the matching document
          // I.e. what you want are the aggregation buckets.
          size: 0
          aggregations: {
            cities: {
              terms: {
                field: city
                size: 50
              }
              aggregations: {
                office_types: {
                  terms: {
                    field : office_types
                  }
                }
              }
            }
            citizens: {
                nested: {
                    path: citizens
                }
                aggs: {
                    occupations: {
                        terms: {
                            field: citizens.occupation
                            size: 50
                        }
                    }
                }
            }
          }
        }

Range Query

It is likely you will want to perform aggregations on a limited range of document. Here is an example query that achieves that:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
     query:
      {
        size: 0
        query:
        {
          range:
          {
            @timestamp:
            {
              gte: 2019-01-23T15:00:00.000Z
              lt: 2019-01-23T15:01:00.000Z
            }
          }
        }
        aggregations:
        {
          by_channel:
          {
            terms:
            {
              field: channel
            }
            aggregations:
            {
              max_size:
              {
                max:
                {
                  field: size
                }
              }
              total_size: {
                sum: {
                  field: size
                }
              }
            }
          }
        }
      }

Full configuration example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
  job: [
    {
      component: input
      publish: [
        {
          stream: data
        }
      ]
      settings: {
        aggregation: true
        index: mytenant-events*
        nodes: [
          localhost
        ]
        query: {
          aggregations: {
            by_channel: {
              aggregations: {
                max_size: {
                  max: {
                    field: size
                  }
                }
                total_size: {
                  sum: {
                    field: size
                  }
                }
              }
              terms: {
                field: vendor
              }
            }
          }
          query: {
            bool: {
              must: [
                {
                  range: {
                    @timestamp: {
                      gte: now-100h
                      lt: now
                    }
                  }
                }
              ]
            }
          }
          size: 0
        }
      }
      type: elastic_input

    }
    {
      component: show
      subscribe: [
        {
          component: input
          stream: data
        }
      ]
      type: show
    }
  ]
}

Configuration(s)

  • column: String

    Description: [Optional] By default is set to "source". This parameter is taken into account only if aggregation and hits_hits are set to false. A dataset of N row will be returned with the source and aggregations results as json format (N is an Integer).

  • hits_hits: Boolean

    Description: [Optional] By default is set to false. Returns only the hits result of your elasticsearch query.

  • aggregation: Boolean

    Description: [Optional] By default is set to false. Set to true to use only data returned by an aggregation query.

  • index: String

    Description: [Required] The name of your elasticsearch index where data will be fetched.

  • port: Integer

    Description: [Optional] Your Elasticsearch server Port.

  • type: String

    Description: [Optional] Document type that will be retrieved from your elasticsearch index.

  • query: Json

    Description: [Optional] A valid Elasticsearch query.

  • nodes: List

    Description: [Required] Hostnames of your elasticsearch nodes. In general, only one hostname is needed.

  • timestamp: Map

    Description: [Optional] Take in a valid timestamp string (UTC).

    1
    2
    field_name: String [Required] name of the timestamp column
    field_value: String [Required] timestamp value
    
  • output_columns: List

    Description: [Optional] A list of Json Objects where each of them can contain 3 fields:

    1
    2
    3
    field: String [Required] the field found in your document that you want to include in the resulting dataset
    alias: String [Optional] attribute a new name to the field you want to retrieve
    type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)