Skip to content

ElasticsearchBolt

The ElasticSearchBolt receives storm tuples, convert them in a Json document and write that document to ElasticSearch. It supports both SSL and/or basic authentication.

To get the best performance yet benefit from reliability, this bolt uses a bulked asynchronous strategy. Messages are batched by small groups (1000 by default). This said tuples are acked only after being acked by elasticsearch, so that your topologies are guaranteed to process all the incoming tuples.

Tip

make sure you understand the stream and field punchplatform concepts. These are explained in details in the spouts and bolts javadocs.

Overview

The data arrives at the Elasticsearch bolt as part of stream(s) of key value fields. Some of these fields are payload data (i.e. the ones you want to insert into Elasticsearch), others are metadata (unique ids, timestamp, index hints etc..).

Consider the next example, on the stream logs, you receive two keys : log and _ppf_id.

1
logs => [ "log": { "name": "alice", "age": 23}, "_ppf_id": "12asyw" ]

This example is typical of a log management configuration, where one of the field (here log) actually contains the Json document to insert, the other field contains a unique identifier to be used as document unique identifier inside Elasticsearch (to avoid duplication of logs in case of replay).

Say you want to insert in Elasticsearch the following document:

1
2
3
4
5
6
# document ID: "12asyw"
{
  "@timestamp": "2018-05-24T13:47:32.642Z",
  "name": "alice",
  "age": 23
}

i.e. use 12asyw as document identifier, and add an iso @timestamp field. The settings to achieve this is illustrated next :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
  "type": "elasticsearch_bolt",
  "bolt_settings": {
    "cluster_id": "es_search",
    "per_stream_settings" : [
      # each subscribed stream must have a settings section
      {
        # this section only applies for tuples received on stream "logs"
        "stream" : "logs",

        # send the document to a daily index with a given prefix
        "index" : { "type" : "daily" , "prefix" : "mytenant-events-" },

        # take a given input tuple field as json document.
        # there can be only one such a field.
        "document_json_field" : "log",

        # use another field (_ppf_id) as unique id for inserting that document to lasticsearch
        "document_id_field" : "_ppf_id",

        # in addition to the 'log' field, add a '@timestamp' field with the current time.
        "additional_document_value_fields" : [
          { "type" : "date" , "document_field" : "@timestamp", "format" : "iso"}
        ]
      }
    ]
  },
  "storm_settings": {
    "component": "elasticsearch_bolt",
    "subscribe": [ { "component": "punch_bolt", "stream": "logs" } ]
  }
}

Warning

The above simple example may not be suitable for production need, if you do not want to risk losing unprocessed tuples in some error cases (see reindex_failed_documents settings and Indexing Error Handling section ).

In the rest of this chapter we describe how you can finely control the various metadata : index name, ids, timestamps, and additional fields.

Global Parameters

The elasticsearch bolt accepts the following settings:

  • cluster_id: (string, mandatory)

    the name of the elasticsearch cluster

  • batch_size: (integer: 1000)

    the size of the bulk written at once to elasticsearch.

  • batch_interval (integer: 1000)

    In milliseconds. When the traffic is low, data is flushed regularly. Use this interval to set the flush period. Note that if the value is large, you will suffer from a delay.

  • request_timeout (string: "20s")

    The Elasticsearch bolt sends bulk indexation requests as soon as the batch_size is reached (or batch is older than batch_interval). If your topology max_spout_pending is big enoug, multiple batches requests may be started at close interval ; of course, when Elasticsearch is loaded, the last submitted request may have to wait for the previous one to complete before beeing actually executed, leading to longer wait time for the HTTP Response. If you want to avoid storm tuple failures due to this timeout, please increase your request timeout in relationship with the number of concurrent batches that allows your max_spout_pending/batch_size ratio.

  • reindex_failed_documents (boolean: false)

    When an Elasticsearch bolt tries to insert a document into Elasticsearch and gets a rejection from Elasticsearch, we may want to keep track of this event (including the faulty document) and record it somewhere else in Elasticsearch. This can be done by putting this setting value to true AND by providing error_index setting. See also reindex_only_mapping_exceptions and fail_tuples_if_an_insertion_of_error_document_is_not_done settings.

When the default (false) value is used, then an error is recorded in logs of the Storm topology, and the faulty document is not inserted at all, and lost.

  • error_index (string)

    the name of the elasticsearch error index into which records will be inserted to keep track of failure to index documents in Elasticsearch. The faulty document will be (escaped) inside the error record. This setting is ignored if 'reindex_failed_documents' is set to false or missing.

Watchout : not defining one will make elasticsearch insertion errors result in data loss

  • per_stream_settings (dictionary)

    Each subscribed stream must have a dedicated settings dictionary. Refer to the documentation hereafter

  • use_ssl (boolean: false)

    Set to true to use ssl/tls.

  • ssl_certificate_verification (boolean: true)

    It is advised to leave this value to true to enforce a maximum security. By setting this value to false, no certificate will be asked.

  • truststorepass

    Required with ssl_verification. The password you used to generate your trust store.

  • keystorepath

    Required with ssl_verification. The absolute path of your key. Note: domain name set on your key/certificate (generated by you) are important as it may throws unexpected errors...

  • keystoretype

    Required with ssl_verification. The keystore type your are going to use. Supported types are: JKS, JCEKS, PKCS12, PKCS12S2, JCERACFKS.

  • credentials

    If you need basic auth, use a credentials dictionary to provide the user password to use. For example : "credentials" : { "user" : bob, "password" : "bob's password" }

    This settings can be combined with ssl. token parameter can be specified like that: "credentials": { "token": "mytoken", "token_type": "ApiKey" }. Note, if user and password are specified, they will be ignored in favor of token parameter. Token are the base64 encoded string "user:password" if set to type: Basic

Target Index

To control the target index name, use the (per stream) property:

1
2
3
4
5
6
7
8
# insert into a daily index. You must provide the prefix.
"index" : { "type" : "daily", "prefix" : "some_prefix_" }

# insert into an index provided by one of the input tuple field. 
"index" : { "type" : "tuple_field", "tuple_field" : "_ppf_index" }

# insert into a fixed named index
"index" : { "type" : "constant", "value" : "constant_index_name" }

Target Type

To control the elastic search type where the document are writtend, use the (per stream) property:

1
2
# insert into an index with the type mytype, the default value is doc.
"document_type": "mytype",

Additional fields

Additional fields allows you to add data to your elasticsearch document. Typically a timestamp. Here are the possibilities:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
"additional_document_value_fields" : [
   # insert a generated timestamp
   { "type" : "date", "document_field" : "@timestamp", "format" : "iso" }

   # insert a timestamp provided by one of the received input tuple field.
   # This requires the corresponding field to contain a unix timestamp representation.
   # Typically the one generated by the punchplatform spouts
   { "type" : "date", "tuple_field" : "_ppf_timestamp", "document_field" : "@timestamp",  "format" : "iso" }

   # insert on of the input tuple field as plain leaf value.
   # This requires the corresponding field to contain a string, long,   
   # boolean, or double value. 
   { "type" : "tuple_field", "tuple_field" : "_ppf_channel", "document_field" : "channel" }

   # insert a constant additional field
   "index" : { "type" : "constant", "value" : "banana", "document_field" : "fruit"  }
]

Punch Error Document Handling

If you have punchlet(s) in your channels, they can generate error documents in case the punchlet(s) fail.

These are emitted just like other document, with a different content. If you want to save these errors in Elasticsearch (you should !) you simply must configure your bolt to properly deal with the error stream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "type" : "elasticsearch_bolt",
  "bolt_settings" : {
    "per_stream_settings" : [
      {
        "stream" : "_ppf_errors",
        "document_value_fields" : [ 
          "_ppf_error_message", "_ppf_error_document",  "_ppf_timestamp" 
        ],
        "index" : { ... },
        "document_id_field" : "_ppf_id",
        "additional_document_value_fields" : [ ... ]
       },
       ...
     ]
   }       
  "storm_settings": {
    "component": "elasticsearch_bolt",
    "subscribe": [ { "component": "some_spout", "stream": "_ppf_errors" } ]
  }
}

Remember the reserved content of the punchplatform fields (refer to reserved fields) in particular :

  • _ppf_error_document : contains the input document that generated the error.
  • _ppf_error_message : contains a useful error message including the punchlet error line where the error was raised.

It is worth noticing that:

  • the timestamp property as just illustrated makes the bolt generate a timestamp value based on the current time. Do not be confused with a business timestamp that might be present in your input document.
  • the index property in this example will generate an index based on the current day.

Idempotency and replay

You can configure your bolt in various ways but pay attention to two important fields :

  • the _ppf_id lets you insert documents with a deterministic identifier. If you replay your data (by reading again a Kafka queue), it will replace existing documents in elasticsearch documents. This is likely something you want to replay and enrich existing documents.
  • the _ppf_timestamp lets you use the punchplatform entry time as timestamp. Again should you replay your data, that timestamp will always correspond to that entry time.

Elasticsearch Indexing Error Handling

Inserted document can be refused by Elasticsearch, most likely because of a mapping error, but also for load-related reasons. This chapter explains how you deal with this without loosing any data.

When activated by the reindex_failed_documents setting, the insertion errors will be converted to "error documents" and inserted into Elasticsearch in a more safe way (see following paragraphs).

If the reason why the Elasticsearch rejected the document insertion is NOT due to a mapping/field-type exception, but some other reason (such as too much load), then the Elasticsearch will NOT index this error as an "indexation error" into the "error" index, but will directly mark the original tuple as "failed" in order to try to replay it later. This behaviour can be altered by using the reindex_only_mapping_exceptions mapping (which is true by default when the reindexation is activated).

Error Index

First you need to tell the bolt to activate the indexing error storage, and which index the error documents must be sent. Use the reindex_failed_documents, reindex_only_mapping_exceptions and error_index property as shown next. By default, the reindex_failed_documents is set to "false":

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
  "type": "elasticsearch_bolt",
  "bolt_settings": {
    "cluster_id": "es_search",
    "reindex_failed_documents": true,
    "reindex_only_mapping_exceptions": true,
    "error_index": {
      "type": "daily",
      "prefix": "mytenant-events-indexation-errors-"
    },
    "per_stream_settings": [
      ...
    ]
  },
  "storm_settings": {
    ...
  }
}

Error Document content

The bolt will generate an error document based on the input tuple fields plus a timestamp (@timestamp by default). The rule is simple :

  • regular user fields : the content of these ones are escaped and put as json string property. The key is the corresponding key field.
  • punch reserved fields (i.e. starting with _ppf) : the content of these are put as is as json (string or numerical) property. The key is also the corresponding key field.

Here is an example : say you receive this tuple:

1
[ "log": { "age": 23 , "ip": "not a valid ip" }, "_ppf_id": "12asyw" ]

Expecting the "log" field to hold the document to insert to elasticsearch. The bolt first attempt to write

1
{ "age" : 23 , "ip" : "not a valid ip" }

into elasticsearch. That fails because, in this case, the field ip does not contains a valid ip address. The bolt will then insert into the specified error index the following document:

1
2
3
4
5
{ 
  "@timestamp" : "2018-05-24T13:47:32.642Z",
  "_ppf_id": "12asyw", 
  "log" : "{\"age\": 23 , \"ip\": \"not a valid ip\" }"
}

This time the document will be accepted (as long as the error index mapping is properly defined of course).

Escaping strategy

To ensure error document will ne be themselves refused, any user field is escaped (i.e. transformed in a plain string with all quotes and reserved json characters escaped). In contrast reserved punchplatform fields (refer to IReservedFields) will be included as is. These can only contain safe strings with no risk to being refused by elasticsearch.

Rationale

Dealing with errors without loosing data is a complex issue. The punchplatform design ensures you will have, saved in elasticsearch, all the all the required fields so as to be able to

  1. understand the error cause, and
  2. re-process the error documents using a replay topology that includes a simple punchlet.

Since all the received fields are part of the error document, you must simply decide which ones are useful for your error handling.

For example, it is a good idea to include the platform, tenant and channel informations as part of it. Simply arrange to propagate the _ppf_platform, _ppf_tenant and _ppf_channel fields as part your stream. These will appear as searchable fields in your error elasticsearch index.