Skip to content

Log Central : Receiver punchlines (1)

Reference central site Pipelines (image)

Key design highlights

Transport / interface / HA

The logs are received from Log Collectors in lumberjack protocol, in a load-balanced way through an inbuilt balancing mechanism of the sender site. Here a Punch lumberjack output node acts as an emitter in a forwarder punchline of the Log Collector.

To achieve High Availability, at least two receiver punchlines are started on different nodes. To fix punchline location on a node, we use Shiva tags. Each Shiva node has a default tag matching the hostname. In legacy configurations, a 1-node Storm cluster could be used instead. A copy of the punchline is run inside each single-noded Storm cluster.

With multiple receivers, we also have scalability of input, because the lumberjack sender (lumberjack output node) is load-balancing over all available lmuberjack receivers. They will combine their throughput capacity as long as the target Kafka cluster is configured as needed.

Write-fast principle

The receiver punchlines are supposed to use very few CPU before writing to Kafka. This means with as little processing as possible. It allows fast consuming of remote logs in Log Collectors, and safely persisting the logs as soon as possible. This is critical in case of non-replicated Log Centrals, or Log Collector that do not have much persistence capacity.

Multiple front topics and logs dispatching by device types

To allow specialized log processor punchlines, it is often easier to separate different kind of logs in different input queues. This is implemented with different Kafka topics in the front Kafka cluster.

So this receiver punchline will act as a dispatcher towards different processing queues. To comply with the 'write-fast' principle, it is advised to check identify each logs with simple patterns. For example, it can rely on available metadata such as _ppf_remote_port and _ppf_remote_host. Of course, there are other means to identify log type efficiently, such as parsing syslog envelope to identify the source device IP and enriching with a configuration map.

It is also important that the dispatching mechanism has an 'unknown type' stream with an associated processing queue. It will handle unexpected device types that may be added, or to handle malformed input logs.

Errors management

Because there is a punchlet_node in the punchline (for the parsing/normalization/enrichment), there is a risk of exception/error in the punchlet code.

This means that some specific processing queue has to be identified to retrieve these unexpected documents, so as to not lose them (so that someone can later identify the problem, and requires changes in source device configuration, or device type discovery mechanisms)

Receiver channel HA configuration example

As explained above, the receiving application (here input) must be run in multiple instances located on each of the servers. This is an example on how to use Shiva tags in channel_structure.yaml :

version: "6.0"
start_by_tenant: true
stop_by_tenant: true

  # Central receiver on centralfront1
  - name: central_receiver1
    runtime: shiva
    - centralfront1
    cluster: shiva_front
    command: punchlinectl
      - start
      - --punchline
      - logs_receiver.yaml
      - --childopts
      - -Xms96m -Xmx96m # will override topology.component.resources.onheap.memory.mb

  # Central receiver on centralfront2
  - name: central_receiver2
    runtime: shiva
    - centralfront2
    cluster: shiva_front
    command: punchlinectl
      - start
      - --punchline
      - logs_receiver.yaml
      - --childopts
      - -Xms96m -Xmx96m

# Resources

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-linux-os
    partitions: 3
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-apache
    partitions: 3
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-winlogbeat
    partitions: 3
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-unknown
    partitions: 1
    replication_factor: 2

  - type: kafka_topic
    cluster: kafka_front
    name: reftenant-receiver-switch-errors
    partitions: 1
    replication_factor: 2

Receiver Punchline example

# This punchline receives logs from the Log Collector (LTR)
# through Lumberjack protocol and write them to a kafka topic.
# Of course we could process the logs 'on the fly' but writing them directlry to kafka (before doing too many computation),
# makes it possible to consume any backlog of non-transmitted from Log Collectors.
# This application also acts as a "dispatcher" of log types towards different kafka queues.
# In our sample, multiple flows have been received on different ports on the Log Collector, but they
# are mixed in the same lumberjack flow. Here we separate them into different Kafka topics
# based on the original input port on the Log Collector.
# So here we do a sample dispatch :
# - logs received on port 9999 will be sent to 'reftenant-apache-input' kafka topic
# - logs received on port 8888 will be sent to 'reftenant-linux-os' kafka topic

version: "6.0"
tenant: reftenant
channel: receiver
runtime: shiva
type: punchline
name: central_receiver
  # Lumberjack input
  - type: lumberjack_input
    component: lumberjack_input_logs
        host: # can be resolved to specify an interface
        port: 2002
        compression: true
        ssl: true
        ssl_private_key: '@{PUNCHPLATFORM_SECRETS_DIR}/server.pem'
        ssl_certificate: '@{PUNCHPLATFORM_SECRETS_DIR}/server.crt'
        ssl_trusted_certificate: '@{PUNCHPLATFORM_SECRETS_DIR}/fullchain-collector.crt'
      self_monitoring.activation: true
      self_monitoring.period: 10
      - stream: logs
          - log
          - subsystem
          - log_format
          - _ppf_local_host
          - _ppf_local_port
          - _ppf_remote_host
          - _ppf_remote_port
          - _ppf_timestamp
          - _ppf_id
      - stream: _ppf_metrics
          - _ppf_latency

  # Punchlet log format switcher
  - type: punchlet_node
    component: log_format_switcher
        - punchlets/receiver/log_format_switcher.punch
      - stream: ecs
          - log
          - subsystem
          - log_format
          - _ppf_local_host
          - _ppf_local_port
          - _ppf_remote_host
          - _ppf_remote_port
          - _ppf_timestamp
          - _ppf_id
      - stream: syslog
          - log
          - subsystem
          - log_format
          - _ppf_local_host
          - _ppf_local_port
          - _ppf_remote_host
          - _ppf_remote_port
          - _ppf_timestamp
          - _ppf_id
      - stream: _ppf_metrics
          - _ppf_latency
      - stream: logs
        component: lumberjack_input_logs
      - stream: _ppf_metrics
        component: lumberjack_input_logs

  # Punchlet SYSLOG device type switcher
  - type: punchlet_node
    component: syslog_device_type_switcher
        - punchlets/receiver/device_to_device_type.json
        - punchlets/receiver/input.punch
        - punchlets/receiver/parsing_syslog_header_ecs.punch
        - punchlets/receiver/device_type_enrichment.punch
        - punchlets/receiver/adding_es_index.punch
        - punchlets/receiver/remove_data.punch
        - punchlets/receiver/device_type_switcher.punch
      - stream: syslog
        component: log_format_switcher
      - stream: _ppf_metrics
        component: log_format_switcher
      - stream: linux_os_logs
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: apache_logs
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: unknown_device_logs
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: _ppf_errors
          - _ppf_timestamp
          - _ppf_id
          - _ppf_platform
          - _ppf_tenant
          - _ppf_channel
          - _ppf_topology
          - _ppf_component
          - _ppf_error_message
          - _ppf_error_document
          - subsystem
      - stream: _ppf_metrics
          - _ppf_latency

  # Punchlet ECS device type switcher
  - type: punchlet_node
    component: ecs_device_type_switcher
        - punchlets/receiver/input_ecs.punch
        - punchlets/receiver/device_type_enrichment_ecs.punch
        - punchlets/receiver/adding_es_index.punch
        - punchlets/receiver/device_type_switcher.punch
      - stream: ecs
        component: log_format_switcher
      - stream: _ppf_metrics
        component: log_format_switcher
      - stream: winlogbeat_logs
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: unknown_device_logs
          - log
          - raw_log
          - subsystem
          - device_type
          - es_index
          - _ppf_id
          - _ppf_timestamp
      - stream: _ppf_errors
          - _ppf_timestamp
          - _ppf_id
          - _ppf_platform
          - _ppf_tenant
          - _ppf_channel
          - _ppf_topology
          - _ppf_component
          - _ppf_error_message
          - _ppf_error_document
          - subsystem
      - stream: _ppf_metrics
          - _ppf_latency

  # Kafka output
  - type: kafka_output
    component: kafka_front_output_switcher
      encoding: lumberjack
      producer.acks: "1"
        - stream: linux_os_logs
          topic: reftenant-receiver-linux-os
        - stream: apache_logs
          topic: reftenant-receiver-apache
        - stream: winlogbeat_logs
          topic: reftenant-receiver-winlogbeat
        - stream: unknown_device_logs
          topic: reftenant-receiver-unknown
        - stream: _ppf_errors
          topic: reftenant-receiver-switch-errors
      - stream: winlogbeat_logs
        component: ecs_device_type_switcher
      - stream: linux_os_logs
        component: syslog_device_type_switcher
      - stream: apache_logs
        component: syslog_device_type_switcher
      - stream: unknown_device_logs
        component: ecs_device_type_switcher
      - stream: unknown_device_logs
        component: syslog_device_type_switcher
      - stream: _ppf_errors
        component: syslog_device_type_switcher
      - stream: _ppf_errors
        component: ecs_device_type_switcher

    - type: kafka
  reporting_interval: 60

  topology.max.spout.pending: 5000
  topology.component.resources.onheap.memory.mb: 200
  topology.enable.message.timeouts: true
  topology.message.timeout.secs: 30