public class ElasticsearchOutput
extends org.thales.punch.libraries.storm.api.BaseProcessingNode
To get the best performance yet benefit from reliability, this node uses a bulked asynchronous strategy. Messages are batched by small groups to be sent to Elasticsearch using the bulk API.
The Elasticsearch node configuration comes in two distinct set. The first set relates to the overall settings of the node, the second relates to how the node generate the documents from incoming storm tuples.
property | mandatory | type | default | comment |
---|---|---|---|---|
"reindex_failed_documents" | no | boolean | false | With this option, the node will try to re-index documents refused by Elasticsearch and store them into a dedicated error index.
see "error_index". |
use_ssl | no | boolean | false | By default this parameter will result to false if not set. If set to true, you should provide other mandatory arguments, for instance the certificate to be used and it's type |
credential | no | json | By default if this parameter is not found in your topology configuration, this node will write to your elasticsearch cluster without authentication. If this parameter is found, you can authenticate each post request by specifying a "user": "myuser" key and a "password": "mypass" key or by providing a "token": "mytoken" field alongside "token_type": "Basic" (the token should be generated from a base64 encoded credentials in this format "myuser:mypass" if the type is Basic. By default, the type will result to ApiKey | |
ssl_certificate_verification | no | boolean | true | By default is set to true. In production, it is advised to set this variable to it's default value: true. By setting this variable to false, autosigned certificated will be allowed and no hostname verification will be enforced. |
"error_index" | no | json | This property defines the index to be used to store document refused by Elasticsearch because of (typically) mapping errors.
It lets you define a backup recovery index so that you can reprocess the refused document later on, after correcting your mapping or you parser. MANDATORY if "reindex_failed_documents" is set to true. Refer to the example below. |
|
"reindex_only_mapping_exceptions" | no | boolean | true | When `reindex_failed_documents` is set to true, and `error_index` is provided, then Elasticsearch indexation failure will cause :
If you set `reindex_only_mapping_exceptions` setting to `false` then the reindexation will occur on ALL failure cause, which may avoid the topology getting 'stuck' in a replay loop. But this is not the default because it may lead to false indexing as 'indexation error' of perfectly valid documents for other reasons (e.g. elasticsearch overload). |
"fail_tuples_if_an_insertion_of_error_document_is_not_done" | no | boolean | true | When `reindex_failed_documents` is set to true, and `error_index` is provided, then Elasticsearch indexation failure
will cause indexation of this error as a json document in an error index. If this second indexation attempt fails, then
the original tuple will be marked as 'failed' to trigger a replay by the storm spout if possible.
If you set `fail_tuples_if_an_insertion_of_error_document_is_not_done` to `false` then the failure of the attempt to index an 'indexation error' document into the error index will cause the original tuple to be acknowledged, leading to NOTHING being written into Elasticsearch for this source tuple. This may be useful if never processing some input tuple is less important than not being stuck. But this is not the default because it may lead to loss of user data in some Elasticsearch conditions (e.g. Elasticsearch overload). |
"cluster_id" | yes | String | The target Elasticsearch cluster id. This MUST be defined accordingly in your punchplatform.properties file. | |
"batch_size" | no | long | 1000 | The Elasticsearch node relies on the bulk API. The batch_size parameter indicates how many document should be sent at once. |
"batch_interval" | no | long | 1000 | When the traffic is low, data is flushed regularly. Use this interval to set the flush period. Note that if the value is large, you will see documents delayed in the target Elasticsearch cluster. |
"per_stream_settings" | yes | json | See the description below. | |
"request_timeout" | no | String | "20s" | Set the Elasticsearch requests timeout. |
"error_cause_key" | no | String | "error_message" | Set the name of the Elasticsearch field containing the cause of errors. Only used for rejected documents at first Elasticsearch insertion trial. |
"error_document_type" | no | String | "_doc" | Set the document type of the Elasticsearch documents used to record errors (including escaped faulty document) when Elasticsearch has rejected documents at insertion time. |
"error_timestamp_key" | no | String | "@timestamp" | Set the name of the Elasticsearch field containing the error timestamp. Only used for rejected documents. |
"error_timestamp_format" | no | String | "ISO" | Define the date timestamp format used for "error_timestamp_key". Only used for rejected documents. |
{
"type": "elasticsearch_output",
"settings": {
"cluster_id": "es_search",
"per_stream_settings" : [
# each subscribed stream must have a settings section
{
# this section only applies for tuples received on stream "logs"
"stream" : "logs",
# send the document to a daily index with a given prefix
"index" : { "type" : "daily" , "prefix" : "mytenant-events-" },
# take a given input tuple field as json document.
# There can be only one such a field.
"document_json_field" : "log",
# use another field (_ppf_id) as unique id for inserting that document to elasticsearch
"document_id_field" : "_ppf_id",
# in addition to the 'log' field, add a '@timestamp' field with the current time.
"additional_document_value_fields" : [
{ "type" : "date" , "document_field" : "@timestamp", "format" : "iso"}
]
}
]
},
"component": "elasticsearch_output",
"subscribe": [ { "component": "punch_node", "stream": "logs" } ]
}
With these settings, if you receive the input tuple with [ "log" : "{ "age" : 23 }", "_ppf_id" : "12asyw" ], the generated document will be:
{ "age" : 23 , "@timestamp" : "2018-05-24T13:47:32.642Z" }
Note how the 'log' key was removed in the generated document.
That document will be inserted in Elasticsearch in the "mytenant-events-yyyy.mm.dd index, with an id "12asyw".
Another different use case is to receive a tuple with several fields, and you just want to insert all of these
as a json document. For example if you receive a tuple with two values
"name" : "thales", "value" : 354
You may want to generate the document :
{ "name" : "thales", "value" : 354 }
Here is how you do this:
{
"type": "elasticsearch_output",
"settings": {
"cluster_id": "es_search",
"per_stream_settings" : [
{
# this section only applies for tuples received on stream "log"
"stream" : "stock",
# send the document to a daily index with a given prefix
"index" : { "type" : "daily" , "prefix" : "mytenant-stock-" },
# take selected fields as json document.
"document_value_fields" : [ "name", "value" ],
# you can also use another field (_ppf_id) as unique id for inserting that document to Elasticsearch
"document_id_field" : "_ppf_id",
# and you can also add an extra field, here '@timestamp' with the current time.
"additional_document_value_fields" : [
{ "type" : "date" , "document_field" : "@timestamp", "format" : "iso"}
]
}
]
},
"component": "elasticsearch_node",
"subscribe": [ { "component": "punch_node", "stream": "stock" } ]
}
These settings produce :
{ "stock" : "thales", "value" : 354, "@timestamp" : "2018-05-24T13:47:32.642Z" }
Here are the possibilities to control the target index.
# insert into a daily index. You must provide the prefix.
"index" : { "type" : "daily", "prefix" : "some_prefix_" }
# insert into an index provided by one of the input tuple field.
"index" : { "type" : "tuple_field", "tuple_field" : "_ppf_index" }
# insert into a fixed named index
"index" : { "type" : "constant", "value" : "constant_index_name" }
Here is how you control the adding of additional values that may be useful to your processing logic.
"additional_document_value_fields" : [
# insert a generated timestamp
{ "type" : "date", "document_field" : "@timestamp", "format" : "iso" }
# insert a timestamp from a received storm tuple field
# This requires the corresponding field to contain a unix timestamp representation.
# Typically the one generated by the punchplatform spouts
{ "type" : "date", "tuple_field" : "_ppf_timestamp", "document_field" : "@timestamp", "format" : "iso" }
# insert a field based n some arbitrary tuple input field.
# This requires the corresponding field to contain a json leaf value (i.e. string, long, boolean, double)
# Typically the ones generated by the punchplatform reserved fields holding platform,tenant, channel, component
# identifiers.
{ "type" : "tuple_field", "tuple_field" : "_ppf_channel", "document_field" : "channel" }
# insert a constant additional field
"index" : { "type" : "constant", "value" : "banana", "document_field" : "fruit" }
If you have punchlet(s) in your channels, they can generate error documents. These are emitted just like other document, with a different content. If you want to save these errors in Elasticsearch (you should !) you simply must configure your node to properly deal with the error stream.
Here is an example :
{
"type" : "elasticsearch_output",
"settings" : {
"per_stream_settings" : [
{
"stream" : "_ppf_errors",
"document_value_fields" : [ "_ppf_error_message", "_ppf_error_document", "_ppf_id", "_ppf_timestamp" ],
"index" : { ... },
"document_id_field" : "_ppf_id",
"additional_document_value_fields" : [ ... ]
},
...
]
}
"component": "elasticsearch_bolt",
"subscribe": [ { "component": "some_spout", "stream": "_ppf_errors" } ]
In case the document sent to Elasticsearch is refused, typically because of a mapping error, you may want to save it to a backup index to re-process it later, after correcting your mapping or your punchlet(s). Here is how the Elasticsearch node is processing these errors.
First you need to tell the node to which index the error documents must be sent. Use the error_index property as shown next.
To index error in a daily basis:
"type" : "elasticsearch_output",
"bolt_settings" : {
"reindex_failed_documents" : true,
"error_index" : {
// supported values are "monthly" "daily" or "hourly"
"type" : "daily",
"prefix" : "mytenant-events-"
}
...
}
To index all errors in a single index with a constant name:
"type" : "elasticsearch_output",
"settings" : {
"reindex_failed_documents" : true,
"error_index" : {
"type" : "constant",
"value" : "mytenant-errors"
}
...
}
The node will generate an error document based on the input tuple received fields plus a timestamp (@timestamp by default. The rule is simple :
[ "log" : "{\"age\":23,\"ip\":\"not a valid ip\"}" , "_ppf_id":"12asyw" ]
Expecting the "log" field to hold the document to insert to Elasticsearch. The node first attempt to write
{"age":23 , "ip" : "not a valid ip" }
into Elasticsearch. That fails because (say) the "ip" field is not a valid ip. The node will then insert into the error index
the following document:
{ "log" : "{\"age\":23,\"ip\":\"not a valid ip\"}" , "_ppf_id":"12asyw", "@timestamp" : "2018-05-24T13:47:32.642Z" }
This time the document will be accepted (as long as the error index mapping is properly defined of course).
To ensure error document will not be themselves refused, any user field is escaped (i.e. transformed in a plain string with all quotes and reserved json characters escaped).
In contrast, reserved punchplatform fields (refer to IReservedFields
) will be included as is. These can only contain safe strings with no risk to being refused by Elasticsearch.
Dealing with errors without loosing data is a complex issue. The punchplatform design ensures you will have, saved in Elasticsearch, all the required fields to be able to
(i) understand the error cause, and
(ii) re-process the error documents using a simple punchlet.
Since all the received fields are part of the error document, you must simply decide which ones are useful for your error handling.
For example, it is a good idea to include the platform, tenant and channel information as part of it. Simply arrange to propagate the _ppf_platform, _ppf_tenant and _ppf_channel fields as part your stream. These will appear as searchable fields in your error Elasticsearch index. SSL/TLS elasticsearch cluster Below is an example of a ssl/tls elasticsearch cluster configuration. Tested certificate format are "PKCS12" and "jks". You should make sure that all the nodes of the elasticsearch cluster used the same Certificate Authority (CA). The domain name or distinguished name of the provided certificate should be signed by the same CA as the node on which you want to execute this node. Furthermore, the domain name should match your cluster domain name. For instance, if you want to execute the topology below on a node deployed on a local server, the DN should equal to localhost.
An official elasticsearch guide is available online: https://www.elastic.co/guide/en/elasticsearch/reference/current/configuring-tls.html#node-certificates to generate the certificates correctly.
{
"dag": [
{
"type": "file_input",
"settings": {
"read_file_from_start": true,
"path": "./AAPL.csv"
},
"component": "file_spout",
"publish": [
{"stream": "logs", "fields": ["log"]}
]
},
{
"type": "punch_node",
"settings": {
"punchlet": "./AAPL.punch"
},
"component": "punch_bolt",
"publish": [
{"stream": "logs", "fields": ["log"]}
],
"subscribe": [
{"component": "file_spout", "stream": "logs"}
]
},
{
"type": "elasticsearch_output",
"settings": {
"use_ssl": true,
"credentials": {
"user": "elastic",
"password": "changeme"
// if token: user and password will be ignored
// token generated from user:password encoded in base64
"token": "my token"
},
"truststorepass": "yuechun",
"keystorepath": "/Users/jonathan/Desktop/kibana-6.4.3-darwin-x86_64/config/certs/localhost/localhost.p12",
"keystoretype": "PKCS12",
"cluster_id": "es_search",
"per_stream_settings" : [
{
"stream" : "logs",
"index" : {
"prefix" : "stock-",
"type" : "daily"
},
"timestamp": "@timestamp",
"document_json_field" : "log"
}
]
},
"component": "elasticsearch_bolt",
"subscribe": [
{
"component": "punch_bolt",
"stream": "logs"
}
]
}
}
]
}
Constructor and Description |
---|
ElasticsearchOutput(org.thales.punch.libraries.storm.api.NodeSettings config)
Constructor
|
Modifier and Type | Method and Description |
---|---|
void |
ackDocument(ElasticDocument document) |
void |
cleanup() |
void |
failDocument(ElasticDocument document) |
void |
forwardOrFailDocument(ElasticDocument document,
String stream) |
Map<String,Object> |
getComponentConfiguration() |
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector) |
void |
process(org.apache.storm.tuple.Tuple tuple) |
void |
reindex(List<ElasticDocument> documents) |
public ElasticsearchOutput(org.thales.punch.libraries.storm.api.NodeSettings config)
config
- bolt configurationpublic void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void process(org.apache.storm.tuple.Tuple tuple)
process
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public Map<String,Object> getComponentConfiguration()
getComponentConfiguration
in interface org.apache.storm.topology.IComponent
getComponentConfiguration
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void cleanup()
cleanup
in interface org.apache.storm.task.IBolt
cleanup
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void ackDocument(ElasticDocument document)
public void forwardOrFailDocument(ElasticDocument document, String stream)
public void failDocument(ElasticDocument document)
public void reindex(List<ElasticDocument> documents)
Copyright © 2023. All rights reserved.