Skip to content

Log Central : Elasticsearch Indexer Punchlines (5)

Key design highlights

Additional fields

The elasticsearch_output node must write JSON documents to the Elasticsearch API. Here, the elasticsearch_output is configured to use a JSON document prepared by the processing stage (see processors punchlines). This JSON comes as a JSON string in the 'log' field of the input kafka topic.

The elasticsearch_output is able to enrich on-the-fly the JSON document with root-level fields if needed (additional_document_value_fields setting). Here, we use this capability to :

  • add the raw log in a message field of the JSON document.
  • add an indexation timestamp in @timestamp field. Note that this is not truly the event time, which is stored in obs.ts. When viewing logs in Kibana, the user has to be aware of the meaning of each timestamp field.

Storing the indexation time is useful to troubleshoot the latency of the collection chain, from receiver logs in Log Collector, to final indexation in Log Central.

Multiple streams

As some other Punch nodes, the elasticsearch_output is able to process multiple input streams. This is useful to reduce memory consumption. This requires that we provide a separate configuration for each input stream through per_stream_settings section.

Note that ppf_metrics is a special internal stream, used to compute latency frames. These frames are transformed into metrics automatically. They will be reported through the metrics reporters mechanism. There is no need to provide an output configuration for this specific stream in the per_stream_settings.

Common or splitted output channels

When designing your channels, you will have to decide between two designs :

  • dedicating separate channels for outputing different nature of logs (apache, windows..) to Elasticsearch.
  • having a common channel with possibly with a single 'output' kafka topic.

This is a trade-off case between :

  • risk of too much memory consumption and complex channels management (dedicated channels design).
  • risk that a 'flood' in one of the log sources type induces heavy backlog on all log sources types (shared channel design).

Sometimes, it is a good choice to have dedicated output indexer channels for 'big/verbose' technologies and 'common output indexer channels' for multiple small ones.

Having multiple, dedicated output indexer channels also allow to optimize the throughput of each log types with distinct settings. This is especially true when the targetted elasticsearch indices are separate for better storage efficiency (different field mappings for different log types).

When the indexer is mutualized and we want to keep the different log types in separate Elasticsearch indices, the index name has to be computed beforehand in the processor stage. It is then used to in tuple_field setting.

Scalability / Performance

For processor punchlines, the key for performance is running multiple nodes instances to have as much CPU threads as possible. Here we are not computing anything. The key for performance is leveraging the Elasticsearch API by issuing multiple asynchronous REST requests.

The elasticsearch_output is sending these requests this way. So there is no need to have multiple instances/executors of the elasticsearch_output.

The maximum number of concurrent requests is computed by dividing the batch_size (max number of logs per request) by the topology.max.spout.pending punchline setting. Optimal setting is usually several thousands logs per batch (say 3000 or 5000), but this should be adapted to log size and memory capacity.

Warning

Be careful when decreasing the max.spout.pending. Sending too many batches at the same time will be inefficient.

Be careful when increasing the max.spout.pending. This will increase the waiting time of the log before going to Elasticsearch. This log can be considered 'in timeout' by the punchline setting topology.message.timeout.secs. Timed-out logs will be replayed regardless of actual indexing the first time).

Experimenting is the best way to find the sweet spot where indexing is fast, and the overall processing time of the tuple is less than 1 or 2 seconds.

Errors management

In production, we want to avoid log loss. Errors when trying to index logs into Elasticsearch can occur (either because of log anomalies, or because of Elasticsearch temporary unavailability)

Indexing rejection by Elasticsearch

The reindex_failed_documents: true setting is preventing log loss due to field not matching index mapping. Depending on the Elastic error :

  • 'Field Types Mapping Exception' : the document will be "wrapped" as an error document with string containing the whole document. It will be saved to Elasticsearch, but not indexed as nominal documents would.
  • Other Elastic exceptions : logs will be stuck and elasticsearch_output will publish these logs on _ppf_errors_stream. If this stream is not defined, logs will be failed and replayed. You can also ack and ignore those failed logs. This strategy can be configured with the bulk_failure_action parameter.

Processing errors indexing

Of course, when a processing error has occured in the processing stage, some 'error' document will be produced there instead of the normal parsed JSON document. So an elasticsearch indexer punchline has to be configured to read these errors from the associated kafka output topic (cf processor punchlines).

Because error documents and nominal parsed logs do not have the same associated metadata, the fields configuration in the indexer punchline is slightly difference (see 'published' fields of the kafka input, that match the standard inbuilt metadata fields of parsing error documents, as described by Punch processing node reference documentation

Parsed logs indexer Punchline example

Here is an example of a 'mutualized multi-log-techonologies' indexing punchline, where the index name has been pre-computed by the processor punchline.

# The purpose of this punchline is to index all parsed logs (and error logs) into 
# Elasticsearch indices.

# This punchline reads from a multi-technology kafka topic (reftenant-parsed-output) and
# sends the processed log to the appropriate Elastic indice, which name has been
# computed at the end of processing phase (based on log types and event date).

version: "6.0"
tenant: reftenant
channel: indexer
runtime: shiva
type: punchline
name: central_indexer
dag:
  # Kafka input logs
  - type: kafka_input
    component: kafka_back_input_logs
    settings:
      topic: processor-logs-output
      start_offset_strategy: last_committed
      encoding: lumberjack
    publish:
      - stream: logs
        fields:
          - _ppf_id
          - _ppf_timestamp
          - log
          - raw_log
          - es_index
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Kafka unknown logs
  - type: kafka_input
    component: kafka_front_input_unknown_logs
    settings:
      topic: reftenant-receiver-unknown
      start_offset_strategy: last_committed
      encoding: lumberjack
    publish:
      - stream: logs
        fields:
          - _ppf_id
          - _ppf_timestamp
          - log
          - raw_log
          - es_index
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Kafka switch errors
  - type: kafka_input
    component: kafka_front_input_switch_errors
    settings:
      topic: reftenant-receiver-switch-errors
      start_offset_strategy: last_committed
      encoding: lumberjack
    publish:
      - stream: switch_errors
        fields:
          - _ppf_timestamp
          - _ppf_id
          - _ppf_platform
          - _ppf_tenant
          - _ppf_channel
          - _ppf_topology
          - _ppf_component
          - _ppf_error_message
          - _ppf_error_document
          - subsystem
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Kafka process errors
  - type: kafka_input
    component: kafka_back_input_process_errors
    settings:
      topic: processor-errors-output
      start_offset_strategy: last_committed
      encoding: lumberjack
    publish:
      - stream: process_errors
        fields:
          - _ppf_timestamp
          - _ppf_id
          - _ppf_platform
          - _ppf_tenant
          - _ppf_channel
          - _ppf_topology
          - _ppf_component
          - _ppf_error_message
          - _ppf_error_document
          - subsystem
          - device_type
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # ES output logs and errors
  - type: elasticsearch_output
    component: elasticsearch_output_logs_and_errors
    settings:
      cluster_id: es_data
      # Elasticsearch is more efficient if some thousands of logs are sent for indexing
      # in a single web request. Of course this may vary depending on log size and
      # number of fields that have to be indexed.

      # The Elastic output node will send in parallel as many web requests as it has
      # available logs to fill batches. Non-full batches will wait some time (here,
      # unspecified setting implies 1s) if no more data arrives.

      # Remember that at any time, there are only topology.max.spout.pending logs being 
      # processed by the punchline (see at bottom) so ensure that the batch size is lower
      # (or several times lower) than this number !
      batch_size: 500
      per_stream_settings:

        - stream: logs
          index:
            type: tuple_field
            tuple_field: es_index
          document_id_field: _ppf_id
          document_json_field: log
          additional_document_value_fields:
            - document_field: message
              type: tuple_field
              tuple_field: raw_log

        - stream: switch_errors
          index:
            type: daily
            prefix: reftenant-errors-
          document_id_field: _ppf_id
          document_json_field: _ppf_error_document
          additional_document_value_fields:
            - document_field: error_message
              type: tuple_field
              tuple_field: _ppf_error_message
            - document_field: error_timestamp
              type: date
              tuple_field: _ppf_timestamp
              format: iso
            - document_field: tenant
              type: tuple_field
              tuple_field: _ppf_tenant
            - document_field: channel
              type: tuple_field
              tuple_field: _ppf_channel
            - document_field: subsystem
              type: tuple_field
              tuple_field: subsystem
            - document_field: "@timestamp"
              type: date
              format: iso

        - stream: process_errors
          index:
            type: daily
            prefix: reftenant-errors-
          document_id_field: _ppf_id
          document_json_field: _ppf_error_document
          additional_document_value_fields:
            - document_field: error_message
              type: tuple_field
              tuple_field: _ppf_error_message
            - document_field: error_timestamp
              type: date
              tuple_field: _ppf_timestamp
              format: iso
            - document_field: tenant
              type: tuple_field
              tuple_field: _ppf_tenant
            - document_field: channel
              type: tuple_field
              tuple_field: _ppf_channel
            - document_field: subsystem
              type: tuple_field
              tuple_field: subsystem
            - document_field: device_type
              type: tuple_field
              tuple_field: device_type
            - document_field: "@timestamp"
              type: date
              format: iso

      # VERY IMPORTANT. This is because we are in production mode, and we do not want
      # to lose any log due to a temporary Elasticsearch failure or due to 
      # rejection of the documents by Elasticsearch (e.g. because of a mapping
      # exception, which is a mismatch between a field generated at processing
      # phase, and the expected field type used by Elasticsearch for building indexing tables).

      # So this setting will cause :
      # - mapping exception to be transformed in a specific
      # error document that will be indexed in a separate index (error_index)
      # to able troubleshooting.
      # - other kinds of rejection/failures to be retried/replayed automatically later
      #   (this will lead to the storm.tuple.fail metric to be increased), with possible 
      #   interruption of the indexing progress for all logs until then.
      reindex_failed_documents: true
      error_index:
        type: daily
        prefix: reftenant-indexing-errors-events_ecs
    subscribe:
      - stream: logs
        component: kafka_back_input_logs
      - stream: logs
        component: kafka_front_input_unknown_logs
      - stream: process_errors
        component: kafka_back_input_process_errors
      - stream: switch_errors
        component: kafka_front_input_switch_errors

metrics:
  reporters:
    - type: kafka
  reporting_interval: 60

settings:
  # The "max pending" is the number of logs being processed at any moment (per source spout)
  # It must be high enough to achieve good performance, because of the total wait time of 
  # a log (while Elasticsearch is indexing previously sent batches).

  # To achieve good performance, this should be a multiple of the batch size set
  # up at `elasticsearch_output` node level.

  # 2000 pending means that up to 4 requests may be sent to Elasticsearch concurrently.

  # Do NOT overdo this, because Elasticsearch queues will fill up if you send too much queries.
  # Then your tuple will stay 'pending' for too long (as compared to storm message timeout which is 1 minute by default).
  # As a result, you will have Storm 'failure' count going up, and log replayed.
  # This means more work for Elasticsearch, if in fact the first request has entered processing state inside ES.
  topology.max.spout.pending: 2000
  topology.component.resources.onheap.memory.mb: 100
  topology.enable.message.timeouts: true
  topology.message.timeout.secs: 30