Skip to content

ElasticsearchSpout

The PunchPlatform ElasticsearchSpout reads data from Elasticsearch, then forwards it to the topology. This spout leverages the elasticsearch-hadoop consumer.

Here is an example of a topology that extracts metricbeat documents.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
  "spouts": [
    {
      "type": "elasticsearch_spout",
      "spout_settings": {
        "cluster_id": "es_search",
        "index": "*metricbeat-*",
        "type": "doc",
        "query": "?q=beat.version:6*",
        "es.index.read.missing.as.empty": false
      },
      "storm_settings": {
        "component": "extractor_spout"
      }
    }
  ]
}

Instead of providing a query property using the query string format, you can use the query_dsl property to directly pass in a json query dsl, as illustrated next:

1
"query_dsl": { "query": { "term": { "channel": "sourcefire" }}},

Warning

without a query or a query_dsl property you will fetch all the documents from your index(es).

Streams And fields

Since Storm requires each Spout to declare its fields when creating a topology, by default the elasticsearch spout declares for its tuples a generic doc field containing the documents returned (one per tuple) from Elasticsearch.

When dealing with structured data, one can configure the spout to declare as fields the document properties effectively unwrapping the document as a Tuple. By setting up the es.storm.spout.fields property, the spout will use them to indicate to the Storm topology the tuple content and extract them from the returned document.

Here is an example using metricbeat data. Say you only want to extract the @timestamp and type fields, not the whole document. Use the following configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
  "spouts": [
    {
      "type": "elasticsearch_spout",
      "spout_settings": {
        "cluster_id": "es_search",
        "index": "*metricbeat-*",
        "type": "doc",
        "es.storm.spout.fields": "@timestamp, type"
      },
      "storm_settings": {
        "component": "extractor_spout"
      }
    }
  ]
}

Another useful usage is to also extract the document metadata : the index name, the document identifier, the document type. This can be activated using es.read.metadata property. You must then specify you are interested in the _metadata field as illustrated next:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
  "spouts": [
    {
      "type": "elasticsearch_spout",
      "spout_settings": {
        "cluster_id": "es_search",
        "index": "*metricbeat-*",
        "type": "doc",
        "query": "?q=beat.version:6*",
        "es.storm.spout.fields": "_metadata, @timestamp, type",
        "es.read.metadata": true
      },
      "storm_settings": {
        "component": "extractor_spout"
      }
    }
  ]
}

That metadata field can be renamed differently (see the es.read.metadata.field below for the various configuration properties). With such a configuration the storm tuple emitted in the topology will look like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
  "default": {
    "@timestamp": "2018-11-13T15:52:21.946Z",
    "_metadata": {
      "_index": "metricbeat-6.5.4-2018.11.13",
      "_type": "doc",
      "_id": "FKfGDWcB3WVvsL2tDpmY"
    },
    "type": "metricbeat"
  }
}

In there the default document root key is the name of the Storm stream.

Mandatory Parameters

  • index (string)

    the target index

  • type (string)

    the target elasticsearch type present in the index, the default value is doc

Optional Parameters

  • cluster_id (string)

    the target elasticsearch cluster id, as defined in the punchplatform.properties file

  • es.nodes (string)

    Instead of providing the elasticsearch cluster identifier cluster_id you can provide the list of Elasticsearch nodes to connect to. Note that the list does not have to contain every node inside the Elasticsearch cluster; these are discovered automatically by elasticsearch-hadoop by default. Each node can also have its HTTP/REST port specified individually (e.g. mynode:9600).

  • es.port (integer)

    Default HTTP/REST port used for connecting to Elasticsearch - this setting is applied to the nodes in es.nodes that do not have any port specified.

  • query (string)

    If not provided, all the documents from the specified index are matched. The query parameter lets you filter specific documents. The query is expected to be a URI API query starting with the char ?.

  • query_dsl (map)

    You can provide an Elasticsearch query DSL instead of an URI query string. A query dsl is expressed directly as a json dictionary.

  • es.index.read.missing.as.empty (boolean: true)

    make the topology fail if the target index does not exists. This is useful to catch configuration errors, instead of having silent topologies doing nothing.

  • es.storm.spout.reliable (boolean: false)

    Indicates whether the dedicated EsSpout is reliable, that is replays the documents in case of failure or not. By default it is set to false since replaying requires the documents to be kept in memory until are being acknowledged.

  • es.storm.spout.fields (comma-separated strings)

    Specify what fields the spout will declare in its topology and extract from the returned documents. By default is unset meaning the documents are returned as Maps under the default field doc.
    Note: only the root document fields are valids, for example metricset.name will not work. If you need a fine grain field controle, please use the native Elasticsearch queries.

  • es.storm.spout.reliable.queue.size (integer: 0)

    Applicable only if es.storm.spout.reliable is true. Sets the size of the queue which holds documents in memory to be replayed until they are acknowledged. By default, the queue is unbounded (0) however in a production environment it is indicated to limit the queue to limit the consumption of memory. If the queue is full, the spout drops any incoming data and throws an exception.

  • es.storm.spout.reliable.retries.per.tuple (integer: -1)

    Applicable only if es.storm.spout.reliable is true. Set the number of retries (replays) of a failed tuple before giving up. Setting it to a negative value will cause the tuple to be replayed until acknowledged.

  • es.storm.spout.reliable.handle.tuple.failure (string)

    Applicable only if es.storm.spout.reliable is true. Indicates how to handle failing tuples after the number of is depleted. Possible values are :
    - ignore: the tuple is discarded
    - warn: a warning message is logged
    - abort: throw a java EsHadoopIllegalStateException

  • es.read.metadata (boolean: false)

    Whether to include the document metadata (such as id and version) in the results or not (default).

  • es.read.metadata.field (string: "_metadata")

    The field under which the metadata information is placed. When es.read.metadata is set to true, the information is returned as a Map under the specified field.

  • es.read.metadata.version (boolean: false)

    Whether to include the document version in the returned metadata. Applicable only if es.read.metadata is enabled.

  • credential (map)

    If you need basic auth, use a credentials dictionary to provide the user password to use. For example : "credential" : { "user" : bob, "password" : "bob's password" }

    This settings can be combined with ssl. token parameter can be specified like that: "credential": { "token": "mytoken", "token_type": "ApiKey" }. Note, if user and password are specified, they will be ignored in favor of token parameter. Token are base64 encoded "user:password" if set to type "Basic"

Tip

The number of Spout instances depends highly on your topology and environment."

Typically you should use the number of shards of your target data as an indicator. If you index has 5 shards, create 5 EsSpouts; however sometimes the shards number might be considerably bigger than the number of Spouts you can add to your Apache Storm cluster; in that case, it is better to limit the number of EsSpout instances.

Last but not least, adding more EsSpout instances than the number of shards of the source index does not improve performance; the extra instances will just waste resources without processing anything.

Metrics

See the Elasticsearch spout metrics section.

Also refer to the es hadoop storm documentation.