ElasticsearchSpout¶
The PunchPlatform ElasticsearchSpout reads data from Elasticsearch, then forwards it to the topology. This spout leverages the elasticsearch-hadoop consumer.
Here is an example of a topology that extracts metricbeat documents.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | { "spouts" : [ { "type" : "elasticsearch_spout", "spout_settings" : { "cluster_id" : "es_search", "index" : "metricbeat-*", "type" : "doc", "query": "?q=beat.version:6*", "es.index.read.missing.as.empty": true }, "storm_settings" : { "component" : "extractor_spout" } } ] } |
Instead of providing a query
property using the query
string format, you can use the query_dsl
property to
directly pass in a json query dsl, as illustrated next:
1 | "query_dsl": { "query" : { "term" : { "channel" : "sourcefire" }}}, |
Warning
without a query or a query_dsl property you will fetch all the documents from your index(es).
Streams And fields¶
Since Storm requires each Spout to declare its fields when creating a
topology, by default the elasticsearch spout declares for its tuples a
generic doc
field containing the documents returned (one per tuple) from
Elasticsearch.
When dealing with structured data, one can configure the spout to declare as fields the
document properties effectively unwrapping the document as a Tuple.
By setting up the es.storm.spout.fields
property, the spout will use them
to indicate to the Storm topology the tuple content and extract them
from the returned document.
Here is an example using metricbeat data. Say you only want to extract the @timestamp
qnd type
fields, not the whole document.
Use the followign configuration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | { "spouts" : [ { "type" : "elasticsearch_spout", "spout_settings" : { "cluster_id" : "es_search", "index" : "metricbeat-*", "type" : "doc", "query": "?q=beat.version:6*", "es.storm.spout.fields" : "@timestamp, type", "es.index.read.missing.as.empty": true }, "storm_settings" : { "component" : "extractor_spout" } } ] } |
Another useful usage is to also extract the document metadata : the index name, the document identifier, the document type.
This can be activated using es.read.metadata
property. You must then specify you are interested in the _metadata
field as illustrated next:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | { "spouts" : [ { "type" : "elasticsearch_spout", "spout_settings" : { "cluster_id" : "es_search", "index" : "metricbeat-*", "type" : "doc", "query": "?q=beat.version:6*", "es.storm.spout.fields" : "_metadata, @timestamp, type", "es.read.metadata" : true }, "storm_settings" : { "component" : "extractor_spout" } } ] } |
That metadata field can be renamed differently (see the es.read.metadata.field
below for the various configuration properties).
With such a configuration the storm tuple emitted in the topology will look like:
1 2 3 4 5 6 7 8 9 10 11 | { "default": { "@timestamp": "2018-11-13T15:52:21.946Z", "_metadata": { "_index": "metricbeat-6.5.4-2018.11.13", "_type": "doc", "_id": "FKfGDWcB3WVvsL2tDpmY" }, "type": "metricbeat" } } |
Tip
In there the "default" is the name of the storm stream.
Mandatory Parameters¶
index
: Sringthe target index
type
: Stringthe target elasticsearch type present in the index, the default value is doc
Optional Parameters¶
cluster_id
: Stringthe target elasticsearch cluster id, as defined in the punchplatform.properties file
es.nodes
: StringInstead of providing the elasticsearch cluster identifier
cluster_id
you can provide the list of Elasticsearch nodes to connect to. Note that the list does not have to contain every node inside the Elasticsearch cluster; these are discovered automatically by elasticsearch-hadoop by default. Each node can also have its HTTP/REST port specified individually (e.g. mynode:9600).es.port
: intDefault HTTP/REST port used for connecting to Elasticsearch - this setting is applied to the nodes in
es.nodes
that do not have any port specified.query
: StringIf none is provided, you will read all the documents from the specified index. The query parameter lets you filter specific documents. The query is expected to be a query string starting with
?
. Use to match all documents.query_dsl
: Jsonyou can provide a query dsl instead of a query string. A query dsl is expressed directly as a json dictionary.
es.index.read.missing.as.empty
: Boolean : truemake the topology fail if the target index does not exists. This is useful to catch configuration errors, instead of having silent topologies doing nothing.
es.storm.spout.reliable
: boolean : falseIndicates whether the dedicated EsSpout is reliable, that is replays the documents in case of failure or not. By default it is set to false since replaying requires the documents to be kept in memory until are being acknowledged.
es.storm.spout.fields
: Strings : []specify what fields the spout will declare in its topology and extract from the returned documents. By default is unset meaning the documents are returned as Maps under the default field doc.
es.storm.spout.reliable.queue.size
: booleanApplicable only if
es.storm.spout.reliable
is true. Sets the size of the queue which holds documents in memory to be replayed until they are acknowledged. By default, the queue is unbounded (0) however in a production environment it is indicated to limit the queue to limit the consumption of memory. If the queue is full, the spout drops any incoming data and throws an exception. default: 0es.storm.spout.reliable.retries.per.tuple
: booleanApplicable only if
es.storm.spout.reliable
is true. Set the number of retries (replays) of a failed tuple before giving up. Setting it to a negative value will cause the tuple to be replayed until acknowledged.es.storm.spout.reliable.handle.tuple.failure
: booleanApplicable only if
es.storm.spout.reliable
is true. Indicates how to handle failing tuples after the number of is depleted. Possible values are :- ignore : the tuple is discarded
- warn : a warning message is logged - strict : the topology exits
es.read.metadata
(default false)Whether to include the document metadata (such as id and version) in the results or not (default).
es.read.metadata.field
(default "_metadata")The field under which the metadata information is placed. When es.read.metadata is set to true, the information is returned as a Map under the specified field.
es.read.metadata.version
(default false)Whether to include the document version in the returned metadata. Applicable only if
es.read.metadata
is enabled.
Tip
The number of Spout instances depends highly on your topology and environment."
Typically you should use the number of shards of your target data as an indicator. If you index has 5 shards, create 5 EsSpouts; however sometimes the shards number might be considerably bigger than the number of Spouts you can add to your Apache Storm cluster; in that case, it is better to limit the number of EsSpout instances.
Last but not least, adding more EsSpout instances than the number of shards of the source index does not improve performance; the extra instances will just waste resources without processing anything.
Metrics¶
See metrics_elasticsearch_spout
Also refer to the es hadoop storm documentation.