Skip to content

Log Central : Receiver punchlines (1)

Reference central site Pipelines (image)

Key design highlights

Transport / interface / HA

The logs are received from Log Collectors in lumberjack protocol, in a load-balanced way through an inbuilt balancing mechanism of the sender site. Here a Punch lumberjack output node acts as an emitter in a forwarder punchline of the Log Collector.

To achieve High Availability, at least two receiver punchlines are started on different nodes. To fix punchline location on a node, we use Shiva tags. Each Shiva node has a default tag matching the hostname. In legacy configurations, a 1-node Storm cluster could be used instead. A copy of the punchline is run inside each single-noded Storm cluster.

With multiple receivers, we also have scalability of input, because the lumberjack sender (lumberjack output node) is load-balancing over all available lmuberjack receivers. They will combine their throughput capacity as long as the target Kafka cluster is configured as needed.

Write-fast principle

The receiver punchlines are supposed to use very few CPU before writing to Kafka. This means with as little processing as possible. It allows fast consuming of remote logs in Log Collectors, and safely persisting the logs as soon as possible. This is critical in case of non-replicated Log Centrals, or Log Collector that do not have much persistence capacity.

Multiple front topics and logs dispatching by device types

To allow specialized log processor punchlines, it is often easier to separate different kind of logs in different input queues. This is implemented with different Kafka topics in the front Kafka cluster.

So this receiver punchline will act as a dispatcher towards different processing queues. To comply with the 'write-fast' principle, it is advised to check identify each logs with simple patterns. For example, it can rely on available metadata such as _ppf_remote_port and _ppf_remote_host. Of course, there are other means to identify log type efficiently, such as parsing syslog envelope to identify the source device IP and enriching with a configuration map.

It is also important that the dispatching mechanism has an 'unknown type' stream with an associated processing queue. It will handle unexpected device types that may be added, or to handle malformed input logs.

Errors management

Because there is a punchlet_node in the punchline (for the parsing/normalization/enrichment), there is a risk of exception/error in the punchlet code.

This means that some specific processing queue has to be identified to retrieve these unexpected documents, so as to not lose them (so that someone can later identify the problem, and requires changes in source device configuration, or device type discovery mechanisms)

Receiver channel HA configuration example

As explained above, the receiving application (here input) must be run in multiple instances located on each of the servers. This is an example on how to use Shiva tags in channel_structure.yaml :

version: "6.0"
start_by_tenant: true
stop_by_tenant: true
applications:

  # Central receiver on centralfront1
  - name: central_receiver1
    runtime: shiva
    shiva_runner_tags:
    - centralfront1
    cluster: shiva_front
    command: punchlinectl
    args:
      - start
      - --punchline
      - logs_receiver.yaml
      - --childopts
      - -Xms96m -Xmx96m # will override topology.component.resources.onheap.memory.mb

  # Central receiver on centralfront2
  - name: central_receiver2
    runtime: shiva
    shiva_runner_tags:
    - centralfront2
    cluster: shiva_front
    command: punchlinectl
    args:
      - start
      - --punchline
      - logs_receiver.yaml
      - --childopts
      - -Xms96m -Xmx96m

# Resources
resources:

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-linux-os
    partitions: 3
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-apache
    partitions: 3
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-winlogbeat
    partitions: 3
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-unknown
    partitions: 1
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-switch-errors
    partitions: 1
    replication_factor: 2

Receiver Punchline example

# This punchline receives logs from the Log Collector (LTR)
# through Lumberjack protocol and write them to a kafka topic.
# Of course we could process the logs 'on the fly' but writing them directlry to kafka (before doing too many computation),
# makes it possible to consume any backlog of non-transmitted from Log Collectors.
# 
# This application also acts as a "dispatcher" of log types towards different kafka queues.
# In our sample, multiple flows have been received on different ports on the Log Collector, but they
# are mixed in the same lumberjack flow. Here we separate them into different Kafka topics
# based on the original input port on the Log Collector.
# 
# So here we do a sample dispatch :
# - logs received on port 9999 will be sent to 'reftenant-apache-input' kafka topic
# - logs received on port 8888 will be sent to 'reftenant-linux-os' kafka topic

version: "6.0"
tenant: reftenant
channel: receiver
runtime: shiva
type: punchline
name: central_receiver
dag:
  # Lumberjack input
  - type: lumberjack_input
    component: lumberjack_input_logs
    settings:
      listen:
        host: 0.0.0.0 # can be resolved to specify an interface
        port: 2002
        compression: true
        ssl: true
        ssl_private_key: '@{PUNCHPLATFORM_SECRETS_DIR}/server.pem'
        ssl_certificate: '@{PUNCHPLATFORM_SECRETS_DIR}/server.crt'
        ssl_trusted_certificate: '@{PUNCHPLATFORM_SECRETS_DIR}/fullchain-collector.crt'
      self_monitoring.activation: true
      self_monitoring.period: 10
    publish:
      - stream: logs
        fields:
          - log
          - subsystem
          - log_format
          - _ppf_local_host
          - _ppf_local_port
          - _ppf_remote_host
          - _ppf_remote_port
          - _ppf_timestamp
          - _ppf_id
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Punchlet log format switcher
  - type: punchlet_node
    component: log_format_switcher
    settings:
      punchlet:
        - punchlets/receiver/log_format_switcher.punch
    publish:
      - stream: ecs
        fields:
          - log
          - subsystem
          - log_format
          - _ppf_local_host
          - _ppf_local_port
          - _ppf_remote_host
          - _ppf_remote_port
          - _ppf_timestamp
          - _ppf_id
      - stream: syslog
        fields:
          - log
          - subsystem
          - log_format
          - _ppf_local_host
          - _ppf_local_port
          - _ppf_remote_host
          - _ppf_remote_port
          - _ppf_timestamp
          - _ppf_id
      - stream: _ppf_metrics
        fields:
          - _ppf_latency
    subscribe:
      - stream: logs
        component: lumberjack_input_logs
      - stream: _ppf_metrics
        component: lumberjack_input_logs

  # Punchlet SYSLOG device type switcher
  - type: punchlet_node
    component: syslog_device_type_switcher
    settings:
      punchlet_json_resources:
        - punchlets/receiver/device_to_device_type.json
      punchlet:
        - punchlets/receiver/input.punch
        - punchlets/receiver/parsing_syslog_header_ecs.punch
        - punchlets/receiver/device_type_enrichment.punch
        - punchlets/receiver/adding_es_index.punch
        - punchlets/receiver/remove_data.punch
        - punchlets/receiver/device_type_switcher.punch
    subscribe:
      - stream: syslog
        component: log_format_switcher
      - stream: _ppf_metrics
        component: log_format_switcher
    publish:
      - stream: linux_os_logs
        fields:
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: apache_logs
        fields:
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: unknown_device_logs
        fields:
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: _ppf_errors
        fields:
          - _ppf_timestamp
          - _ppf_id
          - _ppf_platform
          - _ppf_tenant
          - _ppf_channel
          - _ppf_topology
          - _ppf_component
          - _ppf_error_message
          - _ppf_error_document
          - subsystem
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Punchlet ECS device type switcher
  - type: punchlet_node
    component: ecs_device_type_switcher
    settings:
      punchlet:
        - punchlets/receiver/input_ecs.punch
        - punchlets/receiver/device_type_enrichment_ecs.punch
        - punchlets/receiver/adding_es_index.punch
        - punchlets/receiver/device_type_switcher.punch
    subscribe:
      - stream: ecs
        component: log_format_switcher
      - stream: _ppf_metrics
        component: log_format_switcher
    publish:
      - stream: winlogbeat_logs
        fields:
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: unknown_device_logs
        fields:
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: _ppf_errors
        fields:
          - _ppf_timestamp
          - _ppf_id
          - _ppf_platform
          - _ppf_tenant
          - _ppf_channel
          - _ppf_topology
          - _ppf_component
          - _ppf_error_message
          - _ppf_error_document
          - subsystem
      - stream: _ppf_metrics
        fields:
          - _ppf_latency

  # Kafka output
  - type: kafka_output
    component: kafka_front_output_switcher
    settings:
      encoding: lumberjack
      producer.acks: "1"
      topics:
        - stream: linux_os_logs
          topic: reftenant-receiver-linux-os
        - stream: apache_logs
          topic: reftenant-receiver-apache
        - stream: winlogbeat_logs
          topic: reftenant-receiver-winlogbeat
        - stream: unknown_device_logs
          topic: reftenant-receiver-unknown
        - stream: _ppf_errors
          topic: reftenant-receiver-switch-errors
    subscribe:
      - stream: winlogbeat_logs
        component: ecs_device_type_switcher
      - stream: linux_os_logs
        component: syslog_device_type_switcher
      - stream: apache_logs
        component: syslog_device_type_switcher
      - stream: unknown_device_logs
        component: ecs_device_type_switcher
      - stream: unknown_device_logs
        component: syslog_device_type_switcher
      - stream: _ppf_errors
        component: syslog_device_type_switcher
      - stream: _ppf_errors
        component: ecs_device_type_switcher

metrics:
  reporters:
    - type: kafka
  reporting_interval: 60

settings:
  topology.max.spout.pending: 5000
  topology.component.resources.onheap.memory.mb: 200
  topology.enable.message.timeouts: true
  topology.message.timeout.secs: 30