Log Central : Elasticsearch Indexer Punchlines (5)¶
Key design highlights¶
Additional fields¶
The elasticsearch_output
node must write JSON documents to the Elasticsearch API.
Here, the elasticsearch_output
is configured to use a JSON document prepared by the processing stage (see processors punchlines). This JSON comes as a JSON string in the 'log' field of the input kafka topic.
The elasticsearch_output
is able to enrich on-the-fly the JSON document with root-level fields if needed (additional_document_value_fields
setting).
Here, we use this capability to :
- add the raw log in a
message
field of the JSON document. - add an indexation timestamp in
@timestamp
field. Note that this is not truly the event time, which is stored inobs.ts
. When viewing logs in Kibana, the user has to be aware of the meaning of each timestamp field.
Storing the indexation time is useful to troubleshoot the latency of the collection chain, from receiver logs in Log Collector, to final indexation in Log Central.
Multiple streams¶
As some other Punch nodes, the elasticsearch_output
is able to process multiple input streams. This is useful to reduce memory consumption.
This requires that we provide a separate configuration for each input stream through per_stream_settings
section.
Note that ppf_metrics
is a special internal stream, used to compute latency frames.
These frames are transformed into metrics automatically. They will be reported through the metrics reporters mechanism.
There is no need to provide an output configuration for this specific stream in the per_stream_settings
.
Common or splitted output channels¶
When designing your channels, you will have to decide between two designs :
- dedicating separate channels for outputing different nature of logs (apache, windows..) to Elasticsearch.
- having a common channel with possibly with a single 'output' kafka topic.
This is a trade-off case between :
- risk of too much memory consumption and complex channels management (dedicated channels design).
- risk that a 'flood' in one of the log sources type induces heavy backlog on all log sources types (shared channel design).
Sometimes, it is a good choice to have dedicated output indexer channels for 'big/verbose' technologies and 'common output indexer channels' for multiple small ones.
Having multiple, dedicated output indexer channels also allow to optimize the throughput of each log types with distinct settings. This is especially true when the targetted elasticsearch indices are separate for better storage efficiency (different field mappings for different log types).
When the indexer is mutualized and we want to keep the different log types in separate Elasticsearch indices,
the index name has to be computed beforehand in the processor stage.
It is then used to in tuple_field
setting.
Scalability / Performance¶
For processor punchlines, the key for performance is running multiple nodes instances to have as much CPU threads as possible. Here we are not computing anything. The key for performance is leveraging the Elasticsearch API by issuing multiple asynchronous REST requests.
The elasticsearch_output
is sending these requests this way.
So there is no need to have multiple instances/executors of the elasticsearch_output
.
The maximum number of concurrent requests is computed by dividing the batch_size
(max number of logs per request) by the topology.max.spout.pending
punchline setting.
Optimal setting is usually several thousands logs per batch (say 3000 or 5000), but this should be adapted to log size and memory capacity.
Warning
Be careful when decreasing the max.spout.pending
. Sending too many batches at the same time will be inefficient.
Be careful when increasing the max.spout.pending
. This will increase the waiting time of the log before going to Elasticsearch. This log can be considered 'in timeout' by the punchline setting topology.message.timeout.secs
. Timed-out logs will be replayed regardless of actual indexing the first time).
Experimenting is the best way to find the sweet spot where indexing is fast, and the overall processing time of the tuple is less than 1 or 2 seconds.
Errors management¶
In production, we want to avoid log loss. Errors when trying to index logs into Elasticsearch can occur (either because of log anomalies, or because of Elasticsearch temporary unavailability)
Indexing rejection by Elasticsearch¶
The reindex_failed_documents: true
setting is preventing log loss due to field not matching index mapping.
Depending on the Elastic error :
- 'Field Types Mapping Exception' : the document will be "wrapped" as an error document with string containing the whole document. It will be saved to Elasticsearch, but not indexed as nominal documents would.
- Other Elastic exceptions : logs will be stuck and
elasticsearch_output
will publish these logs on_ppf_errors_stream
. If this stream is not defined, logs will be failed and replayed. You can also ack and ignore those failed logs. This strategy can be configured with thebulk_failure_action
parameter.
Processing errors indexing¶
Of course, when a processing error has occured in the processing stage, some 'error' document will be produced there instead of the normal parsed JSON document. So an elasticsearch indexer punchline has to be configured to read these errors from the associated kafka output topic (cf processor punchlines).
Because error documents and nominal parsed logs do not have the same associated metadata, the fields configuration in the indexer punchline is slightly difference (see 'published' fields of the kafka input, that match the standard inbuilt metadata fields of parsing error documents, as described by Punch processing node reference documentation
Parsed logs indexer Punchline example¶
Here is an example of a 'mutualized multi-log-techonologies' indexing punchline, where the index name has been pre-computed by the processor punchline.
# The purpose of this punchline is to index all parsed logs (and error logs) into
# Elasticsearch indices.
# This punchline reads from a multi-technology kafka topic (reftenant-parsed-output) and
# sends the processed log to the appropriate Elastic indice, which name has been
# computed at the end of processing phase (based on log types and event date).
version: "6.0"
tenant: reftenant
channel: indexer
runtime: shiva
type: punchline
name: central_indexer
dag:
# Kafka input logs
- type: kafka_input
component: kafka_back_input_logs
settings:
topic: processor-logs-output
start_offset_strategy: last_committed
encoding: lumberjack
publish:
- stream: logs
fields:
- _ppf_id
- _ppf_timestamp
- log
- raw_log
- es_index
- stream: _ppf_metrics
fields:
- _ppf_latency
# Kafka unknown logs
- type: kafka_input
component: kafka_front_input_unknown_logs
settings:
topic: reftenant-receiver-unknown
start_offset_strategy: last_committed
encoding: lumberjack
publish:
- stream: logs
fields:
- _ppf_id
- _ppf_timestamp
- log
- raw_log
- es_index
- stream: _ppf_metrics
fields:
- _ppf_latency
# Kafka switch errors
- type: kafka_input
component: kafka_front_input_switch_errors
settings:
topic: reftenant-receiver-switch-errors
start_offset_strategy: last_committed
encoding: lumberjack
publish:
- stream: switch_errors
fields:
- _ppf_timestamp
- _ppf_id
- _ppf_platform
- _ppf_tenant
- _ppf_channel
- _ppf_topology
- _ppf_component
- _ppf_error_message
- _ppf_error_document
- subsystem
- stream: _ppf_metrics
fields:
- _ppf_latency
# Kafka process errors
- type: kafka_input
component: kafka_back_input_process_errors
settings:
topic: processor-errors-output
start_offset_strategy: last_committed
encoding: lumberjack
publish:
- stream: process_errors
fields:
- _ppf_timestamp
- _ppf_id
- _ppf_platform
- _ppf_tenant
- _ppf_channel
- _ppf_topology
- _ppf_component
- _ppf_error_message
- _ppf_error_document
- subsystem
- device_type
- stream: _ppf_metrics
fields:
- _ppf_latency
# ES output logs and errors
- type: elasticsearch_output
component: elasticsearch_output_logs_and_errors
settings:
cluster_id: es_data
# Elasticsearch is more efficient if some thousands of logs are sent for indexing
# in a single web request. Of course this may vary depending on log size and
# number of fields that have to be indexed.
# The Elastic output node will send in parallel as many web requests as it has
# available logs to fill batches. Non-full batches will wait some time (here,
# unspecified setting implies 1s) if no more data arrives.
# Remember that at any time, there are only topology.max.spout.pending logs being
# processed by the punchline (see at bottom) so ensure that the batch size is lower
# (or several times lower) than this number !
batch_size: 500
per_stream_settings:
- stream: logs
index:
type: tuple_field
tuple_field: es_index
document_id_field: _ppf_id
document_json_field: log
additional_document_value_fields:
- document_field: message
type: tuple_field
tuple_field: raw_log
- stream: switch_errors
index:
type: daily
prefix: reftenant-errors-
document_id_field: _ppf_id
document_json_field: _ppf_error_document
additional_document_value_fields:
- document_field: error_message
type: tuple_field
tuple_field: _ppf_error_message
- document_field: error_timestamp
type: date
tuple_field: _ppf_timestamp
format: iso
- document_field: tenant
type: tuple_field
tuple_field: _ppf_tenant
- document_field: channel
type: tuple_field
tuple_field: _ppf_channel
- document_field: subsystem
type: tuple_field
tuple_field: subsystem
- document_field: "@timestamp"
type: date
format: iso
- stream: process_errors
index:
type: daily
prefix: reftenant-errors-
document_id_field: _ppf_id
document_json_field: _ppf_error_document
additional_document_value_fields:
- document_field: error_message
type: tuple_field
tuple_field: _ppf_error_message
- document_field: error_timestamp
type: date
tuple_field: _ppf_timestamp
format: iso
- document_field: tenant
type: tuple_field
tuple_field: _ppf_tenant
- document_field: channel
type: tuple_field
tuple_field: _ppf_channel
- document_field: subsystem
type: tuple_field
tuple_field: subsystem
- document_field: device_type
type: tuple_field
tuple_field: device_type
- document_field: "@timestamp"
type: date
format: iso
# VERY IMPORTANT. This is because we are in production mode, and we do not want
# to lose any log due to a temporary Elasticsearch failure or due to
# rejection of the documents by Elasticsearch (e.g. because of a mapping
# exception, which is a mismatch between a field generated at processing
# phase, and the expected field type used by Elasticsearch for building indexing tables).
# So this setting will cause :
# - mapping exception to be transformed in a specific
# error document that will be indexed in a separate index (error_index)
# to able troubleshooting.
# - other kinds of rejection/failures to be retried/replayed automatically later
# (this will lead to the storm.tuple.fail metric to be increased), with possible
# interruption of the indexing progress for all logs until then.
reindex_failed_documents: true
error_index:
type: daily
prefix: reftenant-indexing-errors-events_ecs
subscribe:
- stream: logs
component: kafka_back_input_logs
- stream: logs
component: kafka_front_input_unknown_logs
- stream: process_errors
component: kafka_back_input_process_errors
- stream: switch_errors
component: kafka_front_input_switch_errors
metrics:
reporters:
- type: kafka
reporting_interval: 60
settings:
# The "max pending" is the number of logs being processed at any moment (per source spout)
# It must be high enough to achieve good performance, because of the total wait time of
# a log (while Elasticsearch is indexing previously sent batches).
# To achieve good performance, this should be a multiple of the batch size set
# up at `elasticsearch_output` node level.
# 2000 pending means that up to 4 requests may be sent to Elasticsearch concurrently.
# Do NOT overdo this, because Elasticsearch queues will fill up if you send too much queries.
# Then your tuple will stay 'pending' for too long (as compared to storm message timeout which is 1 minute by default).
# As a result, you will have Storm 'failure' count going up, and log replayed.
# This means more work for Elasticsearch, if in fact the first request has entered processing state inside ES.
topology.max.spout.pending: 2000
topology.component.resources.onheap.memory.mb: 100
topology.enable.message.timeouts: true
topology.message.timeout.secs: 30