Skip to content

Elastic Input

Overview

The elastic_input node enable you to fetch data from your elasticsearch cluster as a typed/untyped spark dataset. Two modes are currently available:

  • hits_hits
  • aggregation

warning

Only one of these mode should be set to true at a time. In case aggregation or hits_hits is set to true, the results will be a spark dataset with an auto inferred schema.

info

If the minimum configuration is set, full raw json data (received from elasticsearch) will be returned by this node within a spark dataset. The number of columns present will be determined on the number of hits present and the size you specified within your elasticsearch query. By default the size is set to 50.

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

{
  type: punchline
  version: "6.0"
  runtime: spark
  tenant: default
  dag:[
    {
      type: elastic_input
      component: input
      settings:
      {
        index: mytenant-events-*
      }
      publish:
      [
        {
          stream: data
        }
      ]
    }
  ]
}

The resulting output will be a set of rows where each of them is of string type and json valid.

Aggregation

Say you want to query elasticsearch using

curl -XGET "http://localhost:9200/city_offices/_search" -H 'Content-Type: application/json' -d'
        {
          "aggregations": {
            "cities": {
              "terms": { 
                "field": "city",
                "size": 50
              },
              "aggregations": {
                "office_types": {
                  "terms": {
                    "field": "office_type"
                  }
                }
              }
            }
          }
        }'

Here is how it is done in apunchline:

{
  type: punchline
  version: "6.0"
  runtime: spark
  tenant: default
  dag:
  [
    {
      type: elastic_input
      component: input
      settings:
      {
        index: city_offices
        query:
        {
          size: 0
          aggregations:
          {
            cities:
            {
              terms:
              {
                field: city
                size: 50
              }
              aggregations:
              {
                office_types:
                {
                  terms:
                  {
                    field: office_types
                  }
                }
              }
            }
          }
        }
      }
      publish:
      [
        {
          stream: default
        }
      ]
    }
  ]
}

Several Aggregations

Let us now execute severals aggregations. You probably wants to perform aggregations on a limited range of document. Here is an example query that achieves that:

{
  query:
  {
    size: 0
    query:
    {
      range:
      {
        @timestamp:
        {
          gte: 2019-01-23T15:00:00.000Z
          lt: 2019-01-23T15:01:00.000Z
        }
      }
    }
    aggregations:
    {
      by_channel:
      {
        terms:
        {
          field: channel
        }
        aggregations:
        {
          max_size:
          {
            max:
            {
              field: size
            }
          }
          total_size:
          {
            sum:
            {
              field: size
            }
          }
        }
      }
    }
  }
}

Below is the complete configuration file that let you do that:

{
  type: punchline
  version: "6.0"
  runtime: spark
  tenant: default
  dag:
  [
    {
      component: input
      publish:
      [
        {
          stream: data
        }
      ]
      settings:
      {
        aggregation: true
        index: mytenant-events*
        query:
        {
          aggregations:
          {
            by_channel:
            {
              aggregations:
              {
                max_size:
                {
                  max:
                  {
                    field: size
                  }
                }
                total_size:
                {
                  sum:
                  {
                    field: size
                  }
                }
              }
              terms:
              {
                field: vendor
              }
            }
          }
          query:
          {
            bool:
            {
              must:
              [
                {
                  range:
                  {
                    @timestamp:
                    {
                      gte: now-100h
                      lt: now
                    }
                  }
                }
              ]
            }
          }
          size: 0
        }
      }
      type: elastic_input
    }
    {
      component: show
      subscribe:
      [
        {
          component: input
          stream: data
        }
      ]
      type: show
    }
  ]
}

Parameters

Common Settings

Name Type mandatory Default value Description
column String false source By default is set to "source". This parameter is taken into account only if aggregation and hits_hits are set to false. A dataset of N row will be returned with the source and aggregations results as json format (N is an Integer).
hits_hits Boolean false false Returns only the hits result of your elasticsearch query.
aggregation Boolean false false Set to true to use only data returned by an aggregation query.
index String true NONE The name of your elasticsearch index where data will be fetched. To add a document type, simply append /<type> to your index name.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
timestamp str(K)-str(V) false NONE Take in a valid timestamp string (UTC). field_name: String [Required] name of the timestamp column and field_value: String [Required] timestamp value.
output_columns List of Json false NONE A list of Json Objects where each of them can contain 3 fields: field: String [Required] the field found in your document that you want to include in the resulting dataset, alias: String [Optional] attribute a new name to the field you want to retrieve and type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client
id_column String false id Name of column where id of each documents.
empty_dataset_columns List of Json false NONE Returns a dataset with a schema and data instead of an empty one.

Elastic Settings

Elastic settings Type Default value Description
es.nodes.path.prefix String NONE /something/to/append in case your elastic servers are behind a proxy
es.size String 50 size of elastic query or size of each scroll query
es.scroll String false enable scrolling request
es.scroll.keepalive String 10m how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc...
es.net.ssl String false enable ssl
es.net.http.auth.pass String NONE must be used with es.net.http.auth.user
es.net.http.auth.user String NONE must be used with es.net.http.auth.pass
es.net.http.auth.token String NONE must be used with es.net.http.auth.token_type
es.net.http.auth.token_type String NONE must be used with es.net.http.auth.token
es.max_concurrent_shard_requests String NONE set how max shards elastic_input node can request at a time
es.nodes.resolve.hostname String false resolve a hostname: be sure that /etc/hosts referenced the proper IP address
es.doc_type String NONE add doc_type to requested URI, this is a deprecated feature by Elastic
es.http.timeout String  1m to override the the HTTP connection timeout
socket_timeout_ms String 2m  timeout before reception data from ES ; increase this if request is using many filtering or many indices