Key design highlights¶
elasticsearch_output node must write JSON documents to the Elasticsearch API.
elasticsearch_output is configured to use a JSON document prepared by the processing stage (see processors punchlines). This JSON comes as a JSON string in the 'log' field of the input kafka topic.
elasticsearch_output is able to enrich on-the-fly the JSON document with root-level fields if needed (
Here, we use this capability to :
- add the raw log in a
messagefield of the JSON document.
- add an indexation timestamp in
@timestampfield. Note that this is not truly the event time, which is stored in
obs.ts. When viewing logs in Kibana, the user has to be aware of the meaning of each timestamp field.
Storing the indexation time is useful to troubleshoot the latency of the collection chain, from receiver logs in Log Collector, to final indexation in Log Central.
As some other Punch nodes, the
elasticsearch_output is able to process multiple input streams. This is useful to reduce memory consumption.
This requires that we provide a separate configuration for each input stream through
ppf_metrics is a special internal stream, used to compute latency frames.
These frames are transformed into metrics automatically. They will be reported through the metrics reporters mechanism.
There is no need to provide an output configuration for this specific stream in the
Common or splitted output channels¶
When designing your channels, you will have to decide between two designs :
- dedicating separate channels for outputing different nature of logs (apache, windows..) to Elasticsearch.
- having a common channel with possibly with a single 'output' kafka topic.
This is a trade-off case between :
- risk of too much memory consumption and complex channels management (dedicated channels design).
- risk that a 'flood' in one of the log sources type induces heavy backlog on all log sources types (shared channel design).
Sometimes, it is a good choice to have dedicated output indexer channels for 'big/verbose' technologies and 'common output indexer channels' for multiple small ones.
Having multiple, dedicated output indexer channels also allow to optimize the throughput of each log types with distinct settings. This is especially true when the targetted elasticsearch indices are separate for better storage efficiency (different field mappings for different log types).
When the indexer is mutualized and we want to keep the different log types in separate Elasticsearch indices,
the index name has to be computed beforehand in the processor stage.
It is then used to in
Scalability / Performance¶
For processor punchlines, the key for performance is running multiple nodes instances to have as much CPU threads as possible. Here we are not computing anything. The key for performance is leveraging the Elasticsearch API by issuing multiple asynchronous REST requests.
elasticsearch_output is sending these requests this way.
So there is no need to have multiple instances/executors of the
The maximum number of concurrent requests is computed by dividing the
batch_size (max number of logs per request) by the
topology.max.spout.pending punchline setting.
Optimal setting is usually several thousands logs per batch (say 3000 or 5000), but this should be adapted to log size and memory capacity.
Be careful when decreasing the
max.spout.pending. Sending too many batches at the same time will be inefficient.
Be careful when increasing the
max.spout.pending. This will increase the waiting time of the log before going to Elasticsearch. This log can be considered 'in timeout' by the punchline setting
topology.message.timeout.secs. Timed-out logs will be replayed regardless of actual indexing the first time).
Experimenting is the best way to find the sweet spot where indexing is fast, and the overall processing time of the tuple is less than 1 or 2 seconds.
In production, we want to avoid log loss. Errors when trying to index logs into Elasticsearch can occur (either because of log anomalies, or because of Elasticsearch temporary unavailability)
Indexing rejection by Elasticsearch¶
reindex_failed_documents: true setting is preventing log loss due to field not matching index mapping.
Depending on the Elastic error :
- 'Field Types Mapping Exception' : the document will be "wrapped" as an error document with string containing the whole document. It will be saved to Elasticsearch, but not indexed as nominal documents would.
- Other Elastic exceptions : logs will be stuck and
elasticsearch_outputwill publish these logs on
_ppf_errors_stream. If this stream is not defined, logs will be failed and replayed. You can also ack and ignore those failed logs. This strategy can be configured with the
Processing errors indexing¶
Of course, when a processing error has occured in the processing stage, some 'error' document will be produced there instead of the normal parsed JSON document. So an elasticsearch indexer punchline has to be configured to read these errors from the associated kafka output topic (cf processor punchlines).
Because error documents and nominal parsed logs do not have the same associated metadata, the fields configuration in the indexer punchline is slightly difference (see 'published' fields of the kafka input, that match the standard inbuilt metadata fields of parsing error documents, as described by Punch processing node reference documentation
Parsed logs indexer Punchline example¶
Here is an example of a 'mutualized multi-log-techonologies' indexing punchline, where the index name has been pre-computed by the processor punchline.
# The purpose of this punchline is to index all parsed logs (and error logs) into # Elasticsearch indices. # This punchline reads from a multi-technology kafka topic (reftenant-parsed-output) and # sends the processed log to the appropriate Elastic indice, which name has been # computed at the end of processing phase (based on log types and event date). version: "6.0" tenant: reftenant channel: indexer runtime: shiva type: punchline name: central_indexer dag: # Kafka input logs - type: kafka_input component: kafka_back_input_logs settings: topic: processor-logs-output start_offset_strategy: last_committed encoding: lumberjack publish: - stream: logs fields: - _ppf_id - _ppf_timestamp - log - raw_log - es_index - stream: _ppf_metrics fields: - _ppf_latency # Kafka unknown logs - type: kafka_input component: kafka_front_input_unknown_logs settings: topic: reftenant-receiver-unknown start_offset_strategy: last_committed encoding: lumberjack publish: - stream: logs fields: - _ppf_id - _ppf_timestamp - log - raw_log - es_index - stream: _ppf_metrics fields: - _ppf_latency # Kafka switch errors - type: kafka_input component: kafka_front_input_switch_errors settings: topic: reftenant-receiver-switch-errors start_offset_strategy: last_committed encoding: lumberjack publish: - stream: switch_errors fields: - _ppf_timestamp - _ppf_id - _ppf_platform - _ppf_tenant - _ppf_channel - _ppf_topology - _ppf_component - _ppf_error_message - _ppf_error_document - subsystem - stream: _ppf_metrics fields: - _ppf_latency # Kafka process errors - type: kafka_input component: kafka_back_input_process_errors settings: topic: processor-errors-output start_offset_strategy: last_committed encoding: lumberjack publish: - stream: process_errors fields: - _ppf_timestamp - _ppf_id - _ppf_platform - _ppf_tenant - _ppf_channel - _ppf_topology - _ppf_component - _ppf_error_message - _ppf_error_document - subsystem - device_type - stream: _ppf_metrics fields: - _ppf_latency # ES output logs and errors - type: elasticsearch_output component: elasticsearch_output_logs_and_errors settings: cluster_id: es_data # Elasticsearch is more efficient if some thousands of logs are sent for indexing # in a single web request. Of course this may vary depending on log size and # number of fields that have to be indexed. # The Elastic output node will send in parallel as many web requests as it has # available logs to fill batches. Non-full batches will wait some time (here, # unspecified setting implies 1s) if no more data arrives. # Remember that at any time, there are only topology.max.spout.pending logs being # processed by the punchline (see at bottom) so ensure that the batch size is lower # (or several times lower) than this number ! batch_size: 500 per_stream_settings: - stream: logs index: type: tuple_field tuple_field: es_index document_id_field: _ppf_id document_json_field: log additional_document_value_fields: - document_field: message type: tuple_field tuple_field: raw_log - stream: switch_errors index: type: daily prefix: reftenant-errors- document_id_field: _ppf_id document_json_field: _ppf_error_document additional_document_value_fields: - document_field: error_message type: tuple_field tuple_field: _ppf_error_message - document_field: error_timestamp type: date tuple_field: _ppf_timestamp format: iso - document_field: tenant type: tuple_field tuple_field: _ppf_tenant - document_field: channel type: tuple_field tuple_field: _ppf_channel - document_field: subsystem type: tuple_field tuple_field: subsystem - document_field: "@timestamp" type: date format: iso - stream: process_errors index: type: daily prefix: reftenant-errors- document_id_field: _ppf_id document_json_field: _ppf_error_document additional_document_value_fields: - document_field: error_message type: tuple_field tuple_field: _ppf_error_message - document_field: error_timestamp type: date tuple_field: _ppf_timestamp format: iso - document_field: tenant type: tuple_field tuple_field: _ppf_tenant - document_field: channel type: tuple_field tuple_field: _ppf_channel - document_field: subsystem type: tuple_field tuple_field: subsystem - document_field: device_type type: tuple_field tuple_field: device_type - document_field: "@timestamp" type: date format: iso # VERY IMPORTANT. This is because we are in production mode, and we do not want # to lose any log due to a temporary Elasticsearch failure or due to # rejection of the documents by Elasticsearch (e.g. because of a mapping # exception, which is a mismatch between a field generated at processing # phase, and the expected field type used by Elasticsearch for building indexing tables). # So this setting will cause : # - mapping exception to be transformed in a specific # error document that will be indexed in a separate index (error_index) # to able troubleshooting. # - other kinds of rejection/failures to be retried/replayed automatically later # (this will lead to the storm.tuple.fail metric to be increased), with possible # interruption of the indexing progress for all logs until then. reindex_failed_documents: true error_index: type: daily prefix: reftenant-indexing-errors-events_ecs subscribe: - stream: logs component: kafka_back_input_logs - stream: logs component: kafka_front_input_unknown_logs - stream: process_errors component: kafka_back_input_process_errors - stream: switch_errors component: kafka_front_input_switch_errors metrics: reporters: - type: kafka reporting_interval: 60 settings: # The "max pending" is the number of logs being processed at any moment (per source spout) # It must be high enough to achieve good performance, because of the total wait time of # a log (while Elasticsearch is indexing previously sent batches). # To achieve good performance, this should be a multiple of the batch size set # up at `elasticsearch_output` node level. # 2000 pending means that up to 4 requests may be sent to Elasticsearch concurrently. # Do NOT overdo this, because Elasticsearch queues will fill up if you send too much queries. # Then your tuple will stay 'pending' for too long (as compared to storm message timeout which is 1 minute by default). # As a result, you will have Storm 'failure' count going up, and log replayed. # This means more work for Elasticsearch, if in fact the first request has entered processing state inside ES. topology.max.spout.pending: 2000 topology.component.resources.onheap.memory.mb: 100 topology.enable.message.timeouts: true topology.message.timeout.secs: 30