Skip to content

Central Log Management Reference Architecture: Archiver Punchlines (6)

Reference central site Pipelines (image)

Key design highlights

Two outputs: archive file + metadata

An archive is useless if we cannot use it. Most frequent usage are :

  • mass export through filesystem tools (e.g. cp -r).
  • targeted extraction of files for re-indexing/investigation purpose (or for completing/restoring indexing backend content during incident management).
  • text pattern search in archive files to extract only probable matching events for re-indexing/investigation.

For the two latest, and for capacity planning purposes, an efficient indexing of the files contained in the archive is needed. This is why the archiver node (actually a file_output node) is also producing JSON Metadata Records describing each stored archive file.

Sorting by tags

The file_output node allows to build separate files for logs that have different tags values. The tags fields being a custom choice (usually: the log type/technology, the emitting datacenter, the users community...).

This means the tags values have to be computed at log processing time, and provided as additional fields in the log tuples stream.

RAM vs Streaming vs Replay complexity

Batches can take a long time to be filled. We want to build big files for efficient usage of the filesystem and for fast mass-extraction.

Because we do not want to need too much RAM to accumulate logs, log files are actually written as streams to the target storage (except for S3 object storage). This means the punchline has to acknowledge part of the logs (to flush them out of the tracking tables) BEFORE the files are finished (in fact all the logs except the first in the batch).

This makes it quite complicated in case of a filesystem or storage backend communication failure, to determine what needs to be replayed (because at the same time, other batches may have succeeded, with another part of the logs).

This is why, in case of batch failure, we configure the archiver punchline to die and restart at the last known "persisted offset" in the source Kafka. This also means that we cannot mix in a same batch file logs that have been read from distinct Kafka topic partitions. The File node takes care of this, by using the input partition id as yet one more "sorting tag"

Errors management

Unparsed logs (processing errors or exception) often have to be preserved as preciously as parsed logs (for legal reasons, or for later reprocessing after fixing the processing chain).

For this reason there is need of a punchline dedicated to archiving the errors.

NFS Mount point and storage 'root' not auto-creating

When the storage backend is a NFS device, mounted on all potential archiver servers, there is a risk when the NFS is for some reason not currently mounted. A writer application could write to the LOCAL filesystem (often, the operating system root filesystem), filling it up with un-centralized data that can endanger the operating system availability.

In addition, these unwanted local data may be 'hidden' under the actual mounted device, once the NFS system is back online, making it difficult to understand why the storage size has been exhausted on the root filesystem, and where the 'archived' data has gone.

For avoiding this, the good pattern is to never write at the root of a NFS mount, but always in an EXPECTED sub-folder. The absence of this sub-folder means that the NFS is not currently mounted, and therefore no write should be attempted.

This pattern can be implemented with the file_output node by providing it a storage address which is a pre-created folder inside the target filesystem, and requesting the file_output node to NOT CREATE this folder if it does not exist.

In addition, remember that the same storage mount point should be used on all archiver nodes, so that whatever the used nodes, the data will end up in the same filesystem, and will therefore be easy to locate using the metadata later.

Choice of archived fields: raw log vs parsed log vs both

Of course the raw log takes less place (2 to 3 times), and we often NEED to archive it for legal reasons. But keeping only this raw log means we will need to parse the log again if we want to extract it from the archive towards the indexing backend (Elasticsearch). For mass extraction/indexation this may imply a huge amount of cpu and computation time.

For this reason, keeping ALSO the parsed log is often advisable.

Note also that it is always a good idea to keep the unique log id (for deduplication purpose during replays: it can help restore seamlessly missing time ranges in the indexed backend, caused by production incidents that prevented normal indexing.

Parsed logs archiver Punchline example

Here is an example of a shared multi-log-technologies archiving punchline, where the index name has been pre-computed by the processor punchline.

version: "6.0"
tenant: reftenant
channel: archiver
runtime: shiva
type: punchline
name: logs_archiver
dag:
  # Kafka input
  - type: kafka_input
    component: kafka_back_input_logs
    settings:
      topic: processor-logs-output
      start_offset_strategy: last_committed
    publish:
      - stream: logs
        fields:
          - _ppf_id
          - _ppf_timestamp
          - log
          - raw_log
          - device_type
          - subsystem
          - _ppf_partition_id
          - _ppf_partition_offset
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Archive logs in csv format
  - type: file_output
    component: file_output_logs
    settings:
      strategy: at_least_once
      destination: file:///tmp/storage-dir
      create_root: true
      timestamp_field: _ppf_timestamp
      file_prefix_pattern: "%{tag:subsystem}/%{tag:device_type}/%{date}/%{offset}"
      encoding: csv
      separator: _|_
      compression_format: GZIP
      batch_size: 750
      batch_life_timeout: 1m
      fields:
        - _ppf_id
        - _ppf_timestamp
        - log
        - raw_log
      tags:
        - subsystem
        - device_type
    executors: 1
    subscribe:
      - component: kafka_back_input_logs
        stream: logs
      - component: kafka_back_input_logs
        stream: _ppf_metrics
    publish:
      - stream: metadatas
        fields:
          - metadata
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # The `elasticsearch_output` produces `_ppf_errors` if metadatas indexation fails.
  # It prevents previous `file_output` from blocking if no ack is received from `elasticsearch_output`.
  - type: elasticsearch_output
    component: elasticsearch_output_metadatas
    settings:
      cluster_id: es_data
      unavailability_failure_action: forward
      bulk_failure_action: forward
      reindex_failed_documents: false
      per_stream_settings:
        - stream: metadatas
          index:
            type: daily
            prefix: reftenant-archive-
          document_json_field: metadata
          batch_size: 1
          error_index:
            type: daily
            prefix: reftenant-indexing-errors-archive-
    subscribe:
      - component: file_output_logs
        stream: metadatas
      - component: file_output_logs
        stream: _ppf_metrics
    publish:
      - stream: _ppf_errors
        fields:
          - _ppf_error_document
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # The `kafka_output` saves failed metadatas from `elasticsearch_output`.
  - type: kafka_output
    component: kafka_back_output_metadatas
    settings:
      topic: archiver-metadata
      encoding: lumberjack
      producer.acks: "1"
    subscribe:
      - stream: _ppf_errors
        component: elasticsearch_output_metadatas
      - stream: _ppf_metrics
        component: elasticsearch_output_metadatas

metrics:
  reporters:
    - type: kafka
  reporting_interval: 60

settings:
  topology.max.spout.pending: 500
  topology.component.resources.onheap.memory.mb: 100
  topology.enable.message.timeouts: true
  topology.message.timeout.secs: 600

Processing errors archiver Punchline example

Here is an example of the special case 'processing errors' archiver punchline.

Please refer to Processing errors indexing or inlined comments for some explanations.

version: "6.0"
tenant: reftenant
channel: archiver
runtime: shiva
type: punchline
name: errors_archiver
dag:
  # Kafka input Switch errors
  - type: kafka_input
    component: kafka_front_input_switch_errors
    settings:
      topic: reftenant-receiver-switch-errors
      start_offset_strategy: last_committed
    publish:
      - stream: errors
        fields:
          - _ppf_id
          - _ppf_timestamp
          - _ppf_error_document
          - subsystem
          - _ppf_partition_id
          - _ppf_partition_offset
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Kafka input process errors
  - type: kafka_input
    component: kafka_back_input_process_errors
    settings:
      topic: processor-errors-output
      start_offset_strategy: last_committed
    publish:
      - stream: errors
        fields:
          - _ppf_id
          - _ppf_timestamp
          - _ppf_error_document
          - subsystem
          - device_type
          - _ppf_partition_id
          - _ppf_partition_offset
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # File output switch errors
  - type: file_output
    component: file_output_switch_errors
    settings:
      strategy: at_least_once
      destination: file:///tmp/storage-dir
      create_root: true
      timestamp_field: _ppf_timestamp
      file_prefix_pattern: "%{tag:subsystem}/errors/central/%{topic}/%{date}/%{offset}"
      encoding: csv
      compression_format: GZIP
      batch_size: 750
      batch_life_timeout: 1m
      fields:
        - _ppf_id
        - _ppf_timestamp
        - _ppf_error_document
      tags:
        - subsystem
    executors: 1
    subscribe:
      - component: kafka_front_input_switch_errors
        stream: errors
      - component: kafka_front_input_switch_errors
        stream: _ppf_metrics
    publish:
      - stream: metadatas
        fields:
          - metadata
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # File Output process errors
  - type: file_output
    component: file_output_process_errors
    settings:
      strategy: at_least_once
      destination: file:///tmp/storage-dir
      create_root: true
      timestamp_field: _ppf_timestamp
      file_prefix_pattern: "%{tag:subsystem}/%{tag:device_type}/errors/%{date}/%{offset}"
      encoding: csv
      compression_format: GZIP
      batch_size: 750
      batch_life_timeout: 1m
      fields:
        - _ppf_id
        - _ppf_timestamp
        - _ppf_error_document
      tags:
        - subsystem
        - device_type
    executors: 1
    subscribe:
      - component: kafka_back_input_process_errors
        stream: errors
      - component: kafka_back_input_process_errors
        stream: _ppf_metrics
    publish:
      - stream: metadatas
        fields:
          - metadata
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # The `elasticsearch_output` produces `_ppf_errors` if metadatas indexation fails.
  # It prevents previous `file_output` from blocking if no ack is received from `elasticsearch_output`.
  - type: elasticsearch_output
    component: elasticsearch_output_metadatas
    settings:
      cluster_id: es_data
      per_stream_settings:
        - stream: metadatas
          index:
            type: daily
            prefix: reftenant-archive-errors-
          document_json_field: metadata
          batch_size: 1
          bulk_failure_action: forward
          reindex_failed_documents: true
          reindex_only_mapping_exceptions: true
          error_index:
            type: daily
            prefix: reftenant-indexing-errors-archive-errors-
    subscribe:
      - component: file_output_switch_errors
        stream: metadatas
      - component: file_output_switch_errors
        stream: _ppf_metrics
      - component: file_output_process_errors
        stream: metadatas
      - component: file_output_process_errors
        stream: _ppf_metrics
    publish:
      - stream: _ppf_errors
        fields:
          - _ppf_error_document
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # The `kafka_output` saves failed metadatas from `elasticsearch_output`.
  - type: kafka_output
    component: kafka_back_output_metadatas
    settings:
      topic: archiver-metadata
      encoding: lumberjack
      producer.acks: "1"
    subscribe:
      - stream: _ppf_errors
        component: elasticsearch_output_metadatas
      - stream: _ppf_metrics
        component: elasticsearch_output_metadatas

metrics:
  reporters:
    - type: kafka
  reporting_interval: 60

settings:
  topology.max.spout.pending: 500
  topology.component.resources.onheap.memory.mb: 100
  topology.enable.message.timeouts: true
  topology.message.timeout.secs: 600