Skip to content

Elastic Batch Input

Overview

The elastic_batch_input node fetches data from an elasticsearch index. The read data is transformed into a Dataset, ready to be processed by subsequent nodes.

Its default behavior is to transform the input document into rows. You must define the field you are interested in together with their types.

For all of our examples, we will admit that your elasticsearch has an index academy-example containing the following documents:

    {
        "address": {
            "street": "clemenceau"
        },
        "name": "phil",
        "age": 21,
        "friends": [
            "alice"
        ]
    }

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

---
tenant: mytenant
version: '6.0'
runtime: spark
type: punchline
dag:
- type: elastic_batch_input
  component: input
  settings:
    index: academy-example
    es_cluster: es_search
    nodes:
    - localhost
    output_columns:
    - type: string
      field: address.street
      alias: address_street
    - type: integer
      field: age
  publish:
  - stream: data

Here is how to generate rows containing a single column from the address.street and age json field. Notice the id column. By default we provide each document id.

# this is a preview on stdout after running the instruction
+--------------------+--------------+---+
|                  id|address_street|age|
+--------------------+--------------+---+
|eQeYvWkB3c-USBLvf06M|    clemenceau| 21|
|egeYvWkB3c-USBLvf06M|    clemenceau| 23|
|eweYvWkB3c-USBLvf06M|    clemenceau| 53|
+--------------------+--------------+---+

Custom Query

Warning

ES-HADOOP columns naming is set by default to the last key of your nested json. For instance: { "key1": { "key2": 1 } } will produce a dataset with a column name "key2" instead of "key1.key2". To bypass this issue, use output_columns parameter and set an alias for the desire column name, for instance "key1_key2".

You may want to read only document based on query. Use the query property:

---
settings:
  ...
  query:
    bool:
      must:
      - range:
          age:
            gte: 20
            lte: 30

file.yaml

---
tenant: mytenant
version: '6.0'
runtime: spark
type: punchline
dag:
- type: elastic_batch_input
  component: input
  settings:
    query:
      query:
        bool:
          must:
          - range:
              age:
                gte: 20
                lte: 30
    index: academy-example
    es_cluster: es_search
    nodes:
    - localhost
    output_columns:
    - type: string
      field: address.street
      alias: address_street
    - type: integer
      field: age
  publish:
  - stream: data

Here is the result:

CONF=intermediate_use_case.punchline
punchlinectl start -p $
# stdout result
+--------------------+--------------------+---+
|                  id|      address_street|age|
+--------------------+--------------------+---+
|eQeYvWkB3c-USBLvf06M|    clemenceau      | 21|
|egeYvWkB3c-USBLvf06M|    clemenceau      | 23|
+--------------------+--------------------+---+
### Using Elastic Settings

---
type: elastic_batch_input
...
settings:
  elastic_settings:
    es.index.read.missing.as.empty: 'no'

Example

---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: elastic_batch_input
  component: input
  settings:
    index: platform-metricbeat-*
    elastic_settings:
      es.index.read.missing.as.empty: 'yes'
    id_column: id
    output_columns:
    - field: beat.version
    query:
      query:
        bool:
          must:
          - term:
              metricset.module: system
          - range:
              "@timestamp":
                gte: now-10m
                lt: now
  publish:
  - stream: default
- type: elastic_batch_output
  component: output
  settings:
    index: example
    id_column: id
    output_mode: overwrite
  subscribe:
  - component: input
    stream: default

Parameters

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched. To add a document type, simply append /<type> to your index name.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
output_columns List of Json false NONE A list of Json Objects where each of them can contain 3 fields: field: String [Required] the field found in your document that you want to include in the resulting dataset, alias: String [Optional] attribute a new name to the field you want to retrieve and type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client. See elasticsearch official documentation.
es_cluster String false NONE Name of your Elasticsearch cluster located in your punchplatform.properties.
id_column String false id Name of the column that will be used to store each document id. To be noted that _id is a reserved name within Elasticsearch terminology and hence abide yourself from using... By default, we named the field id.
with_null_values Boolean false true Remove columns whose values are null.
empty_dataset_columns List of Json false NONE Returns a dataset with a schema and data instead of an empty one.

Elastic Settings

Elastic settings Type Default value Description
es.nodes.path.prefix String NONE /something/to/append in case your elastic servers are behind a proxy
es.size String 50 size of elastic query or size of each scroll query
es.scroll String false enable scrolling request
es.scroll.keepalive String 10m how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc...
es.net.ssl String false enable ssl
es.net.http.auth.pass String NONE must be used with es.net.http.auth.user
es.net.http.auth.user String NONE must be used with es.net.http.auth.pass
es.net.http.auth.token String NONE must be used with es.net.http.auth.token_type
es.net.http.auth.token_type String NONE must be used with es.net.http.auth.token
es.net.ssl String false enable ssl
es.net.ssl.keystore.location String NONE must be a jks, pkcs12 or p12 store and must contain the private and the public key of the node
es.net.ssl.keystore.pass String NONE do not provide if the keystore is not protected with a password
es.net.ssl.truststore.location String NONE must be a jks, pkcs12 or p12 store and must contain at least the node certificate and its CA chain, and every other certificate this node should trust
es.net.ssl.truststore.pass String NONE do not provide if the truststore is not protected with a password
es.net.ssl.hostname.verification String true Whether the node client should resolve the nodes hostnames to IP addresses or not
es.max_concurrent_shard_requests String NONE set how max shards elastic_input node can request at a time
es.nodes.resolve.hostname String false resolve a hostname: be sure that /etc/hosts referenced the proper IP address
es.doc_type String NONE add doc_type to requested URI, this is a deprecated feature by Elastic
es.http.timeout String 1m to override the the HTTP connection timeout
socket_timeout_ms String 2m timeout before reception data from ES ; increase this if request is using many filtering or many indices