Skip to content

Elasticsearch Input

The Elasticsearch Input node reads data from Elasticsearch, then forwards it to the punchline. This node leverages the elasticsearch-hadoop consumer.

Here is an example of a punchline that extracts metricbeat documents.

{
  "dag": [
    {
      "type": "elasticsearch_batch_input",
      "settings": {
        "cluster_id": "es_search",
        "index": "*metricbeat-*",
        "query": "?q=beat.version:6*",
        "es.index.read.missing.as.empty": false
      },
      "component": "extractor_spout"
    }
  ]
}

Instead of providing a query property using the query string format, you can use the query_dsl property to directly pass in a json query dsl, as illustrated next:

"query_dsl": { "query": { "term": { "channel": "sourcefire" }}},

Warning

without a query or a query_dsl property you will fetch all the documents from your index(es).

Streams And fields

Since storm semantics requires each input node to declare its fields when creating a punchline, by default the elasticsearch node declares for its tuples a generic doc field containing the documents returned (one per tuple) from Elasticsearch.

When dealing with structured data, one can configure the node to declare as fields the document properties effectively unwrapping the document as a Tuple. By setting up the es.storm.spout.fields property, the node will use them to indicate to the Storm topology the tuple content and extract them from the returned document.

Here is an example using metricbeat data. Say you only want to extract the @timestamp and type fields, not the whole document. Use the following configuration:

{
  "dag": [
    {
      "type": "elasticsearch_input",
      "settings": {
        "cluster_id": "es_search",
        "index": "*metricbeat-*",
        "es.storm.spout.fields": "@timestamp, type"
      },
      "component": "elasticsearch_input"
    }
  ]
}
Another useful usage is to also extract the document metadata : the index name, the document identifier, the document type. This can be activated using es.read.metadata property. You must then specify you are interested in the _metadata field as illustrated next:
{
  "dag": [
    {
      "type": "elasticsearch_input",
      "settings": {
        "cluster_id": "es_search",
        "index": "*metricbeat-*",
        "query": "?q=beat.version:6*",
        "es.storm.spout.fields": "_metadata, @timestamp, type",
        "es.read.metadata": true
      },
      "storm_settings": {
        "component": "extractor_spout"
      }
    }
  ]
}
That metadata field can be renamed differently (see the es.read.metadata.field below for the various configuration properties). With such a configuration the storm tuple emitted in the topology will look like:
{
  "default": {
    "@timestamp": "2018-11-13T15:52:21.946Z",
    "_metadata": {
      "_index": "metricbeat-6.5.4-2018.11.13",
      "_id": "FKfGDWcB3WVvsL2tDpmY"
    },
    "type": "metricbeat"
  }
}

In there the default document root key is the name of the Storm stream.

Mandatory Parameters

  • index (string)

    the target index. To add a document type, simply append /<type> to your index name.

Optional Parameters

  • cluster_id (string)

    the target elasticsearch cluster id, as defined in the punchplatform.properties file

  • es.nodes (string)

    Instead of providing the elasticsearch cluster identifier cluster_id you can provide the list of Elasticsearch nodes to connect to. Note that the list does not have to contain every node inside the Elasticsearch cluster; these are discovered automatically by elasticsearch-hadoop by default. Each node can also have its HTTP/REST port specified individually (e.g. mynode:9600).

  • es.port (integer)

    Default HTTP/REST port used for connecting to Elasticsearch - this setting is applied to the nodes in es.nodes that do not have any port specified.

  • query (string)

    If not provided, all the documents from the specified index are matched. The query parameter lets you filter specific documents. The query is expected to be a URI API query starting with the char ?.

  • query_dsl (map)

    You can provide an Elasticsearch query DSL instead of an URI query string. A query dsl is expressed directly as a json dictionary.

  • es.index.read.missing.as.empty (boolean: true)

    make the topology fail if the target index does not exists. This is useful to catch configuration errors, instead of having silent topologies doing nothing.

  • es.storm.spout.reliable (boolean: false)

    Indicates whether the dedicated EsSpout is reliable, that is replays the documents in case of failure or not. By default it is set to false since replaying requires the documents to be kept in memory until are being acknowledged.

  • es.storm.spout.fields (comma-separated strings)

    Specify what fields the spout will declare in its topology and extract from the returned documents. By default is unset meaning the documents are returned as Maps under the default field doc.
    Note: only the root document fields are valid, for example metricset.name will not work. If you need a fine grain field control, please use the native Elasticsearch queries.

  • es.storm.spout.reliable.queue.size (integer: 0)

    Applicable only if es.storm.spout.reliable is true. Sets the size of the queue which holds documents in memory to be replayed until they are acknowledged. By default, the queue is unbounded (0) however in a production environment it is indicated to limit the queue to limit the consumption of memory. If the queue is full, the spout drops any incoming data and throws an exception.

  • es.storm.spout.reliable.retries.per.tuple (integer: -1)

    Applicable only if es.storm.spout.reliable is true. Set the number of retries (replays) of a failed tuple before giving up. Setting it to a negative value will cause the tuple to be replayed until acknowledged.

  • es.storm.spout.reliable.handle.tuple.failure (string)

    Applicable only if es.storm.spout.reliable is true. Indicates how to handle failing tuples after the number of is depleted. Possible values are :
    - ignore: the tuple is discarded
    - warn: a warning message is logged
    - abort: throw a java EsHadoopIllegalStateException

  • es.read.metadata (boolean: false)

    Whether to include the document metadata (such as id and version) in the results or not (default).

  • es.read.metadata.field (string: "_metadata")

    The field under which the metadata information is placed. When es.read.metadata is set to true, the information is returned as a Map under the specified field.

  • es.read.metadata.version (boolean: false)

    Whether to include the document version in the returned metadata. Applicable only if es.read.metadata is enabled.

Tip

The number of input node instances depends highly on your topology and environment."

Typically you should use the number of shards of your target data as an indicator. If you index has 5 shards, create 5 nodes; however sometimes the shards number might be considerably bigger than the number of nodes you can add to your target runtime cluster; in that case, it is better to limit the number of EsSpout instances.

Last but not least, adding more node instances than the number of shards of the source index does not improve performance; the extra instances will just waste resources without processing anything.

Security

Additional parameters are available to configure the Elasticsearch input node with security settings for :

  • TLS : using keystores
  • Authentication : using credentials with a basic user and password

Example :

{
  "type": "elasticsearch_input",
  "settings": {
    "cluster_id": "es_search",
    "http_hosts": [
      {
        "host": "localhost",
        "port": 9200
      }
    ],
    "credentials": {
      "user": "bob",
      "password": "bob_secret"
    },
    "ssl": true,
    "ssl_keystore_location": "/data/certs/keystore.jks",
    "ssl_truststore_location": "data/certs/truststore.jks",
    "ssl_keystore_pass": "keystore_secret",
    "ssl_truststore_pass": "truststore_secret",
    "per_stream_settings": [
      ...
    ]
  },
  "storm_settings": {
    ...
  }
}
  • credentials.user: (string)

    Username used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, credentials.password MUST be configured. Cannot work with credentials.token and credentials.token_type.

  • credentials.password: (string)

    Username used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, credentials.user MUST be configured. Cannot work with credentials.token and credentials.token_type.

  • ssl: (boolean: false)

    Enable TLS encryption over the Elasticsearch input node's connexion to the Elasticsearch cluster. If false, all the following configurations are ignored.

  • ssl_keystore_location: (string)

    Path to the keystore containing the Elasticsearch input node's public and private keys. jks, pkcs12 and p12 keystore types are supported.

  • ssl_keystore_pass: (string)

    Password of the keystore provided with ssl_keystore_location. Do not provide this configuration if no password protects the keystore.

  • ssl_truststore_location: (string)

    Path to the truststore containing the Elasticsearch input node's CA file and all the certificates trusted by this node. jks, pkcs12 and p12 truststore types are supported.

  • ssl_truststore_pass: (string)

    Password of the truststore provided with ssl_truststore_location. Do not provide this configuration if no password protects the keystore.

  • ssl_hostname_verification (boolean: true)

    Whether the node client should resolve the nodes hostnames to IP addresses or not.

You may provide the previous configuration using the Elasticsearch configuration for Apache Hadoop standard settings.

  • es.net.http.auth.user: (string)

    Username used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, es.net.http.auth.pass MUST be configured.

  • es.net.http.auth.pass: (string)

    Username used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, es.net.http.auth.user MUST be configured.

  • es.net.ssl: (boolean: false)

    Enable TLS encryption over the Elasticsearch input node's connexion to the Elasticsearch cluster. If false, all the following configurations are ignored.

  • es.net.ssl.keystore.location: (string)

    Path to the keystore containing the Elasticsearch input node's public and private keys. jks, pkcs12 and p12 keystore types are supported.

  • es.net.ssl.keystore.pass: (string)

    Password of the keystore provided with es.net.ssl.keystore.location. Do not provide this configuration if no password protects the keystore.

  • es.net.ssl.truststore.location: (string)

    Path to the truststore containing the Elasticsearch input node's CA file and all the certificates trusted by this node. jks, pkcs12 and p12 truststore types are supported.

  • es.net.ssl.truststore.pass: (string)

    Password of the truststore provided with es.net.ssl.truststore.location. Do not provide this configuration if no password protects the keystore.

  • es.net.ssl.hostname.verification (boolean: true)

    Whether the node client should resolve the nodes hostnames to IP addresses or not.

Metrics

See the Elasticsearch spout metrics section.

Also refer to the es hadoop storm documentation.