Skip to content

Elasticsearch Output

The ElasticSearchOutput receives Storm tuples, convert them in a Json document and write that document to ElasticSearch. It supports both SSL and/or basic authentication.

To get the best performance yet benefit from reliability, this node uses a bulked asynchronous strategy. Messages are batched by small groups (1000 by default). This said tuples are acked only after being acked by elasticsearch, so that your topologies are guaranteed to process all the incoming tuples.

Overview

The data arrives at the ElasticsearchOutput as part of stream(s) of key value fields. Some of these fields are payload data (i.e. the ones you want to insert into Elasticsearch), others are metadata (unique ids, timestamp, index hints etc ...).

Consider the next example, on the stream logs, you receive two keys : log and _ppf_id.

logs => [ "log": { "name": "alice", "age": 23}, "_ppf_id": "12asyw" ]

This example is typical of a log management configuration, where one of the field (here log) actually contains the Json document to insert, the other field contains a unique identifier to be used as document's unique identifier inside Elasticsearch (to avoid duplication of logs in case of replay).

Say you want to insert in Elasticsearch the following document:

# document ID: "12asyw"
{
  "@timestamp": "2018-05-24T13:47:32.642Z",
  "name": "alice",
  "age": 23
}

i.e. use 12asyw as document identifier, and add an iso @timestamp field. The settings to achieve this is illustrated next :

{
  type: elasticsearch_output
  settings: {
    cluster_id: es_search
    per_stream_settings : [
      # each subscribed stream must have a settings section
      {
        # this section only applies for tuples received on stream "logs"
        stream : logs

        # send the document to a daily index with a given prefix (Note that "monthly" and "yearly" can also be used to reduce ES Cluster cluttering with small indices)
        index : {
          type : daily
          prefix : mytenant-events-
        }

        # take a given input tuple field as json document.
        # there can be only one such a field.
        document_json_field : log

        # use another field (_ppf_id) as unique id for inserting that document to lasticsearch
        document_id_field : _ppf_id

        # in addition to the 'log' field, add a '@timestamp' field with the current time.
        additional_document_value_fields : [
          {
            type : date
            document_field : @timestamp
            format : iso
          }
        ]
      }
    ]
  }
  subscribe: [
    {
      component: punchlet_node
      stream: logs
    }
  ]
}

Warning

The above simple example may not be suitable for production need, if you do not want to risk losing unprocessed tuples in some error cases (see reindex_failed_documents settings and Indexing Error Handling section ).

In the rest of this chapter we describe how you can finely control the various metadata : index name, ids, timestamps, and additional fields.

Global Parameters

The elasticsearch output accepts the following settings:

  • cluster_id: (string, mandatory)

    the name of the elasticsearch cluster

  • batch_size: (integer: 1000)

    the size of the bulk written at once to elasticsearch.

  • batch_interval (integer: 1000)

    In milliseconds. When the traffic is low, data is flushed regularly. Use this interval to set the flush period. Note that if the value is large, you will suffer from a delay.

  • request_timeout (string: "20s")

    The Elasticsearch output sends bulk indexation requests as soon as the batch_size is reached (or batch is older than batch_interval). If your topology max_spout_pending is big enough, multiple batches requests may be started at close interval ; of course, when Elasticsearch is loaded, the last submitted request may have to wait for the previous one to complete before being actually executed, leading to longer wait time for the HTTP Response. If you want to avoid Storm tuple failures due to this timeout, please increase your request timeout in relationship with the number of concurrent batches that allows your max_spout_pending/batch_size ratio.

  • reindex_failed_documents (boolean: false)

    When an Elasticsearch output tries to insert a document into Elasticsearch and gets a rejection from Elasticsearch, we may want to keep track of this event (including the faulty document) and record it somewhere else in Elasticsearch. This can be done by putting this setting value to true AND by providing error_index setting. See also reindex_only_mapping_exceptions and fail_tuples_if_an_insertion_of_error_document_is_not_done settings.

  • bulk_failure_action (string: forward)

    When a whole bulk request fails while trying to insert into Elasticsearch, the default behaviour is try to forward the tuples to the ppf_errors stream. If there's no such stream, all the tuple are failed. In production, we may want to avoid that, because it could flood the error stream. This parameter allows 3 values : forward (default), ack and fail. Ack will simply ack the tuples, i.e ignore them, and fail will fail the tuples so that they can be processed again later.

When the default (false) value is used, then an error is recorded in logs of the Storm topology, and the faulty document is not inserted at all, and lost.

  • error_index (string)

    the name of the elasticsearch error index into which records will be inserted to keep track of failure to index documents in Elasticsearch. The faulty document will be (escaped) inside the error record. This setting is ignored if 'reindex_failed_documents' is set to false or missing.

Watch out : not defining one will make elasticsearch insertion errors result in data loss.

  • per_stream_settings (dictionary)

    Each subscribed stream must have a dedicated settings dictionary. Refer to the documentation hereafter.

  • credentials

    If you need basic auth, use a credentials dictionary to provide the user password to use. For example : "credentials" : { "user" : bob, "password" : "bob's password" }

    These settings can be combined with ssl. token parameter can be specified like that: "credentials": { "token": "mytoken", "token_type": "ApiKey" }. Note, if user and password are specified, they will be ignored in favor of token parameter. Token are the base64 encoded string "user:password" if set to type: Basic

Target Index

To control the target index name, use the (per stream) property:

# insert into a daily index. You must provide the prefix.
# Note that your also can use 'monthly' or 'yearly' if your index content is intended to be small, to avoid Elasticsearch cluster cluttering with too many indices to manage.
index : {
  type : daily
  prefix : some_prefix_
}

# insert into an index provided by one of the subscribed field.
index : {
  type : tuple_field
  tuple_field : index_name
}

# insert into a fixed named index
index : {
  type : constant
  value : constant_index_name
}

Target Type

To control the elastic search type where the document are written, use the (per stream) property:

# insert into an index with the type mytype, the default value is _doc.
document_type: mytype

Warning

Type mapping are deprecated since ElasticSearch 7. Therefore, this parameter is optional and should be used only when dealing with an Elasticsearch version prior to the 7th.

Additional fields

Additional fields allows you to add data to your elasticsearch document. Typically, a timestamp. Here are the possibilities:

additional_document_value_fields : [
  # insert a generated timestamp
  {
    type : date
    document_field : @timestamp
    format : iso
  }

  # insert a timestamp provided by one of the received input tuple field.
  # This requires the corresponding field to contain a unix timestamp representation.
  # Typically the one generated by the punchplatform spouts
  {
    type : date
    tuple_field : _ppf_timestamp
    document_field : @timestamp
    format : iso
  }

  # insert on of the input tuple field as plain leaf value.
  # This requires the corresponding field to contain a string, long,
  # boolean, or double value.
  {
    type : tuple_field
    tuple_field : _ppf_channel
    document_field : channel
  }

  # insert a constant additional field
  index : {
    type : constant
    value : banana
    document_field : fruit
  }
]

Punch Error Document Handling

If you have punchlet(s) in your channels, they can generate error documents in case the punchlet(s) fail.

These are emitted just like other document, with a different content. If you want to save these errors in Elasticsearch (you should !) you simply must configure your node to properly deal with the error stream.

{
  type : elasticsearch_output
  settings : {
    per_stream_settings : [
      {
        stream : _ppf_errors
        document_value_fields : [
          _ppf_error_message
          _ppf_error_document
          _ppf_timestamp
        ]
        index : { ... }
        document_id_field : _ppf_id
        additional_document_value_fields : [ ... ]
      }
       ...
    ]
  }
  subscribe: [
    {
      component: some_spout
      stream: _ppf_errors
    }
  ]
}

Remember the reserved content of the punchplatform fields (refer to reserved fields) in particular :

  • _ppf_error_document : contains the input document that generated the error.
  • _ppf_error_message : contains a useful error message including the punchlet error line where the error was raised.

It is worth noticing that:

  • the timestamp property as just illustrated makes the node generate a timestamp value based on the current time. Do not be confused with a business timestamp that might be present in your input document.
  • the index property in this example will generate an index based on the current day.

Security

Additional parameters are available to configure the Elasticsearch input node with security settings for :

  • TLS : using keystores or key files
  • Authentication : using credentials with a basic user and password or a token

Example :

{
  type: elasticsearch_output
  settings: {
    cluster_id: es_search
    http_hosts: [
      {
        host: localhost
        port: 9200
      }
    ]
    credentials: {
      user: bob
      password: bob_secret
    }
    ssl: true
    ssl_keystore_location: /data/certs/keystore.jks
    ssl_truststore_location: data/certs/truststore.jks
    ssl_keystore_pass: keystore_secret
    ssl_truststore_pass: truststore_secret
    per_stream_settings: [
      ...
    ]
  }
}
  • credentials.user: (string)

    Username used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, credentials.password MUST be configured. Cannot work with credentials.token and credentials.token_type.

  • credentials.password: (string)

    Username used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, credentials.user MUST be configured. Cannot work with credentials.token and credentials.token_type.

  • credentials.token: (string)

    Token string used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, credentials.token_type MUST be configured. Cannot work with credentials.user and credentials.password.

  • credentials.token_type: (string)

    Token type used by th Elasticsearch input node to authenticate to the Elasticsearch cluster. If provided, credentials.token MUST be configured. Cannot work with credentials.user and credentials.password. Use Basic for a base64 encoded token string for authentication, Bearer for tokens based on the OAuth2 specs, or ApiKey for custom api key generation. Check Token-based authentication services Elasticsearch's documentation for more information.

  • ssl: (boolean: false)

    Enable TLS encryption over the Elasticsearch input node's connexion to the Elasticsearch cluster. If false, all the following configurations are ignored.

Warning

For SSL files, use key files OR keystores settings, but not both

  • ssl_private_key: (string)

    Path to the PKCS8 private key of the Elasticsearch input node.

  • ssl_certificate: (string)

    Path to the x509 public key of the Elasticsearch input node. This certificate should always be generated with at least the usr_cert extension as its x509v3 extended usage for client purpose.

  • ssl_trusted_certificate: (string)

    Path to the CA file containing the trusted certificates by the Elasticsearch input node. It should also contain its own CA root and all the intermediate certificates if this one is not self-signed.

  • ssl_keystore_location: (string)

    Path to the keystore containing the Elasticsearch input node's public and private keys. jks, pkcs12 and p12 keystore types are supported.

  • ssl_keystore_pass: (string)

    Password of the keystore provided with ssl_keystore_location. Do not provide this configuration if no password protects the keystore.

  • ssl_truststore_location: (string)

    Path to the truststore containing the Elasticsearch input node's CA file and all the certificates trusted by this node. jks, pkcs12 and p12 truststore types are supported.

  • ssl_truststore_pass: (string)

    Password of the truststore provided with ssl_truststore_location. Do not provide this configuration if no password protects the truststore.

  • ssl_hostname_verification (boolean: true)

    Whether the node client should resolve the nodes hostnames to IP addresses or not.

Idempotency and replay

You can configure your node in various ways but pay attention to two important fields :

  • the _ppf_id lets you insert documents with a deterministic identifier. If you replay your data (by reading again a Kafka queue), it will replace existing documents in elasticsearch documents. This is likely something you want to replay and enrich existing documents.
  • the _ppf_timestamp lets you use the punchplatform entry time as timestamp. Again should you replay your data, that timestamp will always correspond to that entry time.

Elasticsearch Indexing Error Handling

An inserted document can be refused by Elasticsearch, most likely because of a mapping error, but also for load-related reasons. This chapter explains how you deal with this without loosing any data.

When activated by the reindex_failed_documents setting, the insertion errors will be converted to "error documents" and inserted into Elasticsearch in a more safe way (see following paragraphs).

If the reason why the Elasticsearch rejected the document insertion is NOT due to a mapping/field-type exception, but some other reason (such as too much load), then the Elasticsearch will NOT index this error as an "indexation error" into the "error" index, but will directly mark the original tuple as "failed" in order to try to replay it later. This behaviour can be altered by using the reindex_only_mapping_exceptions mapping (which is true by default when the reindexation is activated).

Error Index

First you need to tell the node to activate the indexing error storage, and which index the error documents must be sent. Use the reindex_failed_documents, reindex_only_mapping_exceptions and error_index property as shown next. By default, the reindex_failed_documents is set to "false":

{
  type: elasticsearch_output
  settings: {
    cluster_id: es_search
    reindex_failed_documents: true
    reindex_only_mapping_exceptions: true
    error_index: {
      type: daily
      prefix: mytenant-events-indexation-errors-
    }
    per_stream_settings: [
      ...
    ]
  }
}

Error Document content

The node will generate an error document based on the input tuple fields plus a timestamp (@timestamp by default). The rule is simple :

  • regular user fields : the content of these are escaped and put as json string property. The key is the corresponding key field.
  • punch reserved fields (i.e. starting with _ppf) : the content of these are put as is as json (string or numerical) property. The key is also the corresponding key field.

Here is an example : say you receive this tuple:

[ "log": { "age": 23 , "ip": "not a valid ip" }, "_ppf_id": "12asyw" ]

Expecting the "log" field to hold the document to insert to elasticsearch. The node first attempt to write this into elasticsearch :

{ "age" : 23 , "ip" : "not a valid ip" }

It fails because, in this case, the field ip does not contains a valid ip address. The node will then insert into the specified error index the following document:

{
  "@timestamp" : "2018-05-24T13:47:32.642Z",
  "_ppf_id": "12asyw",
  "log" : "{\"age\": 23 , \"ip\": \"not a valid ip\" }"
}

This time the document will be accepted (as long as the error index mapping is properly defined of course).

Escaping strategy

To ensure error document will not be themselves refused, any user field is escaped (i.e. transformed in a plain string with all quotes and reserved json characters escaped). In contrast, reserved punchplatform fields (refer to IReservedFields) will be included as is. These can only contain safe strings with no risk to being refused by elasticsearch.

Rationale

Dealing with errors without loosing data is a complex issue. The punchplatform design ensures you will have, saved in elasticsearch, all the required fields to be able to :

  1. understand the error cause, and
  2. re-process the error documents using a replay topology that includes a simple punchlet.

Since all the received fields are part of the error document, you must simply decide which ones are useful for your error handling.

For example, it is a good idea to include the platform, tenant and channel information as part of it. Simply arrange to propagate the _ppf_platform, _ppf_tenant and _ppf_channel fields as part your stream. These will appear as searchable fields in your error elasticsearch index.

Failing strategy

You may want to avoid failing your tuple in case of indexation error, in order to keep your punchline running for example. To do that and keep track of the failing tuples, you can define a _ppf_errors stream in your publish settings. If such a stream exists, then your node will not fail tuple but ack them and send them to the _ppf_errors stream, in order to be processed.