Skip to content

Elastic Input

Elastic Input Node

The elastic_input node reads a single unique document from elasticsearch. This is useful in two circumstances. First you may need to load a single document to be used by some of your downstream pml node. In that case you will retrieve a document by id.

A second use case is to perform aggregations. In which case you perform a single aggregation query that will return several hits.

In either casesThe reading is performed using the standard Elastic HTTP API. Each node subscribing to an elastic input node must expect to receive a string value, and not a dataset.

Examples

Fetching a Document by Id

Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
{
    type: elastic_input
    component: input
    settings: {
        // the target index name
        index: example

        // the document unique id
        id: AWBQnjJj8QPf3E_8E-PX

        // the list of elasticsearch nodes to connect to.
        nodes: [ localhost ]
      }
    publish: [
        {
            stream: value
        }
    ]
}

Aggregation Query

Here is an example using an aggregation query. If you have a standalone at gand this example is delivered in the example pml files.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[
  {
    type: elastic_input
    component: input
    settings:
    {
      index: city_offices
      nodes:
      [
        localhost
      ]
      // the query part correspond to the part following the '_search'
      // part of the query url. See the example below.
      query: {
          // Set a zero size to avoid retrieving the matching document
          // I.e. what you want are the aggregation buckets.
          size: 0
          aggregations: {
            cities: {
              terms: {
                field: city
                size: 50
              }
              aggregations: {
                office_types: {
                  terms: {
                    field : office_types
                  }
                }
              }
            }
          }
        }
      }
    publish:
    [
      {
        stream: default
        field: aggregates
      }
    ]
    verbose: true
  }
]

To make it clear, here is the corresponding elasticsearch query.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
curl -XGET "http://localhost:9200/city_offices/_search" -H 'Content-Type: application/json' -d'
{
  "aggregations": {
    "cities": {
      "terms": { 
        "field": "city",
        "size": 50
      },
      "aggregations": {
        "office_types": {
          "terms": {
            "field": "office_type"
          }
        }
      }
    }
  }
}'

Several Aggregations

You can execute several aggregagtions. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
     query: {
          // Set a zero size to avoid retrieving the matching document
          // I.e. what you want are the aggregation buckets.
          size: 0
          aggregations: {
            cities: {
              terms: {
                field: city
                size: 50
              }
              aggregations: {
                office_types: {
                  terms: {
                    field : office_types
                  }
                }
              }
            }
            citizens: {
                nested: {
                    path: citizens
                }
                aggs: {
                    occupations: {
                        terms: {
                            field: citizens.occupation
                            size: 50
                        }
                    }
                }
            }
          }
        }

Range Query

It is likely you will want to perform aggregations on a limited range of document. Here is an example query that achieves that:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
     query:
      {
        size: 0
        query:
        {
          range:
          {
            @timestamp:
            {
              gte: 2019-01-23T15:00:00.000Z
              lt: 2019-01-23T15:01:00.000Z
            }
          }
        }
        aggregations:
        {
          by_channel:
          {
            terms:
            {
              field: channel
            }
            aggregations:
            {
              max_size:
              {
                max:
                {
                  field: size
                }
              }
              total_size: {
                sum: {
                  field: size
                }
              }
            }
          }
        }
      }

Optional Settings

You can control the following additional settings:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
   settings: {
       // set an elasticsearch different port number
       port: 9200

       // Set the Dataset column name to a specific name instead of "source"
       column: my_column_name

       // If the document type is not the default "_doc" value
       type: doc

   }

Full configuration example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
  job: [
    {
      component: input
      publish: [
        {
          stream: data
        }
      ]
      settings: {
        aggregation: true
        index: mytenant-events*
        nodes: [
          localhost
        ]
        query: {
          aggregations: {
            by_channel: {
              aggregations: {
                max_size: {
                  max: {
                    field: size
                  }
                }
                total_size: {
                  sum: {
                    field: size
                  }
                }
              }
              terms: {
                field: vendor
              }
            }
          }
          query: {
            bool: {
              must: [
                {
                  range: {
                    @timestamp: {
                      gte: now-100h
                      lt: now
                    }
                  }
                }
              ]
            }
          }
          size: 10
        }
      }
      type: elastic_input

    }
    {
      component: show
      subscribe: [
        {
          component: input
          stream: data
        }
      ]
      type: show
    }
  ]
}

Configuration(s)

  • column: String

    Description: [Optional] By default is set to "source". This parameter is taken into account only if aggregation and hits_hits are set to false. A dataset of 1 row will be returned with the source and aggregations results as json format.

  • hits_hits: Boolean

    Description: [Optional] By default is set to false. Returns only the hits result of your elasticsearch query.

  • aggregation: Boolean

    Description: [Optional] By default is set to false. Set to true to use only data returned by an aggregation query. This parameter should be used with flatten set to true.

  • index: String

    Description: [Required] The name of your elasticsearch index where data will be fetched.

  • port: Integer

    Description: [Optional] Your Elasticsearch server Port.

  • type: String

    Description: [Optional] Document type that will be retrieved from your elasticsearch index.

  • query: Json

    Description: [Optional] A valid Elasticsearch query.

  • nodes: List

    Description: [Required] Hostnames of your elasticsearch nodes. In general, only one hostname is needed.

  • output_columns: List

    Description: [Optional] A list of Json Objects where each of them can contain 3 fields:

    1
    2
    3
    field: String [Required] the field found in your document that you want to include in the resulting dataset
    alias: String [Optional] attribute a new name to the field you want to retrieve
    type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)