Log Central : Receiver punchlines (1)¶
Key design highlights¶
Transport / interface / HA¶
The logs are received from Log Collectors in lumberjack protocol, in a load-balanced way through an inbuilt balancing mechanism of the sender site. Here a Punch lumberjack output node acts as an emitter in a forwarder punchline of the Log Collector.
To achieve High Availability, at least two receiver punchlines are started on different nodes. To fix punchline location on a node, we use Shiva tags. Each Shiva node has a default tag matching the hostname. In legacy configurations, a 1-node Storm cluster could be used instead. A copy of the punchline is run inside each single-noded Storm cluster.
With multiple receivers, we also have scalability of input, because the lumberjack sender (lumberjack output node) is load-balancing over all available lmuberjack receivers. They will combine their throughput capacity as long as the target Kafka cluster is configured as needed.
Write-fast principle¶
The receiver punchlines are supposed to use very few CPU before writing to Kafka. This means with as little processing as possible. It allows fast consuming of remote logs in Log Collectors, and safely persisting the logs as soon as possible. This is critical in case of non-replicated Log Centrals, or Log Collector that do not have much persistence capacity.
Multiple front topics and logs dispatching by device types¶
To allow specialized log processor punchlines, it is often easier to separate different kind of logs in different input queues. This is implemented with different Kafka topics in the front Kafka cluster.
So this receiver punchline will act as a dispatcher towards different processing queues.
To comply with the 'write-fast' principle, it is advised to check identify each logs with simple patterns.
For example, it can rely on available metadata such as _ppf_remote_port
and _ppf_remote_host
.
Of course, there are other means to identify log type efficiently, such as parsing syslog envelope to identify the source device IP and enriching with a configuration map.
It is also important that the dispatching mechanism has an 'unknown type' stream with an associated processing queue. It will handle unexpected device types that may be added, or to handle malformed input logs.
Errors management¶
Because there is a punchlet_node
in the punchline (for the parsing/normalization/enrichment),
there is a risk of exception/error in the punchlet code.
This means that some specific processing queue has to be identified to retrieve these unexpected documents, so as to not lose them (so that someone can later identify the problem, and requires changes in source device configuration, or device type discovery mechanisms)
Receiver channel HA configuration example¶
As explained above, the receiving application (here input
) must be run in multiple instances located on each of the servers. This is an example on how to use Shiva tags in channel_structure.yaml
:
version: "6.0"
start_by_tenant: true
stop_by_tenant: true
applications:
# Central receiver on centralfront1
- name: central_receiver1
runtime: shiva
shiva_runner_tags:
- centralfront1
cluster: shiva_front
command: punchlinectl
args:
- start
- --punchline
- logs_receiver.yaml
- --childopts
- -Xms96m -Xmx96m # will override topology.component.resources.onheap.memory.mb
# Central receiver on centralfront2
- name: central_receiver2
runtime: shiva
shiva_runner_tags:
- centralfront2
cluster: shiva_front
command: punchlinectl
args:
- start
- --punchline
- logs_receiver.yaml
- --childopts
- -Xms96m -Xmx96m
# Resources
resources:
- type: kafka_topic
cluster: kafka_front
name: reftenant-receiver-linux-os
partitions: 3
replication_factor: 2
- type: kafka_topic
cluster: kafka_front
name: reftenant-receiver-apache
partitions: 3
replication_factor: 2
- type: kafka_topic
cluster: kafka_front
name: reftenant-receiver-winlogbeat
partitions: 3
replication_factor: 2
- type: kafka_topic
cluster: kafka_front
name: reftenant-receiver-unknown
partitions: 1
replication_factor: 2
- type: kafka_topic
cluster: kafka_front
name: reftenant-receiver-switch-errors
partitions: 1
replication_factor: 2
Receiver Punchline example¶
# This punchline receives logs from the Log Collector (LTR)
# through Lumberjack protocol and write them to a kafka topic.
# Of course we could process the logs 'on the fly' but writing them directlry to kafka (before doing too many computation),
# makes it possible to consume any backlog of non-transmitted from Log Collectors.
#
# This application also acts as a "dispatcher" of log types towards different kafka queues.
# In our sample, multiple flows have been received on different ports on the Log Collector, but they
# are mixed in the same lumberjack flow. Here we separate them into different Kafka topics
# based on the original input port on the Log Collector.
#
# So here we do a sample dispatch :
# - logs received on port 9999 will be sent to 'reftenant-apache-input' kafka topic
# - logs received on port 8888 will be sent to 'reftenant-linux-os' kafka topic
version: "6.0"
tenant: reftenant
channel: receiver
runtime: shiva
type: punchline
name: central_receiver
dag:
# Lumberjack input
- type: lumberjack_input
component: lumberjack_input_logs
settings:
listen:
host: 0.0.0.0 # can be resolved to specify an interface
port: 2002
compression: true
ssl: true
ssl_private_key: '@{PUNCHPLATFORM_SECRETS_DIR}/server.pem'
ssl_certificate: '@{PUNCHPLATFORM_SECRETS_DIR}/server.crt'
ssl_trusted_certificate: '@{PUNCHPLATFORM_SECRETS_DIR}/fullchain-collector.crt'
self_monitoring.activation: true
self_monitoring.period: 10
publish:
- stream: logs
fields:
- log
- subsystem
- log_format
- _ppf_local_host
- _ppf_local_port
- _ppf_remote_host
- _ppf_remote_port
- _ppf_timestamp
- _ppf_id
- stream: _ppf_metrics
fields:
- _ppf_latency
# Punchlet log format switcher
- type: punchlet_node
component: log_format_switcher
settings:
punchlet:
- punchlets/receiver/log_format_switcher.punch
publish:
- stream: ecs
fields:
- log
- subsystem
- log_format
- _ppf_local_host
- _ppf_local_port
- _ppf_remote_host
- _ppf_remote_port
- _ppf_timestamp
- _ppf_id
- stream: syslog
fields:
- log
- subsystem
- log_format
- _ppf_local_host
- _ppf_local_port
- _ppf_remote_host
- _ppf_remote_port
- _ppf_timestamp
- _ppf_id
- stream: _ppf_metrics
fields:
- _ppf_latency
subscribe:
- stream: logs
component: lumberjack_input_logs
- stream: _ppf_metrics
component: lumberjack_input_logs
# Punchlet SYSLOG device type switcher
- type: punchlet_node
component: syslog_device_type_switcher
settings:
punchlet_json_resources:
- punchlets/receiver/device_to_device_type.json
punchlet:
- punchlets/receiver/input.punch
- punchlets/receiver/parsing_syslog_header_ecs.punch
- punchlets/receiver/device_type_enrichment.punch
- punchlets/receiver/adding_es_index.punch
- punchlets/receiver/remove_data.punch
- punchlets/receiver/device_type_switcher.punch
subscribe:
- stream: syslog
component: log_format_switcher
- stream: _ppf_metrics
component: log_format_switcher
publish:
- stream: linux_os_logs
fields:
- log
- raw_log
- subsystem
- device_type
- es_index
- _ppf_id
- _ppf_timestamp
- stream: apache_logs
fields:
- log
- raw_log
- subsystem
- device_type
- es_index
- _ppf_id
- _ppf_timestamp
- stream: unknown_device_logs
fields:
- log
- raw_log
- subsystem
- device_type
- es_index
- _ppf_id
- _ppf_timestamp
- stream: _ppf_errors
fields:
- _ppf_timestamp
- _ppf_id
- _ppf_platform
- _ppf_tenant
- _ppf_channel
- _ppf_topology
- _ppf_component
- _ppf_error_message
- _ppf_error_document
- subsystem
- stream: _ppf_metrics
fields:
- _ppf_latency
# Punchlet ECS device type switcher
- type: punchlet_node
component: ecs_device_type_switcher
settings:
punchlet:
- punchlets/receiver/input_ecs.punch
- punchlets/receiver/device_type_enrichment_ecs.punch
- punchlets/receiver/adding_es_index.punch
- punchlets/receiver/device_type_switcher.punch
subscribe:
- stream: ecs
component: log_format_switcher
- stream: _ppf_metrics
component: log_format_switcher
publish:
- stream: winlogbeat_logs
fields:
- log
- raw_log
- subsystem
- device_type
- es_index
- _ppf_id
- _ppf_timestamp
- stream: unknown_device_logs
fields:
- log
- raw_log
- subsystem
- device_type
- es_index
- _ppf_id
- _ppf_timestamp
- stream: _ppf_errors
fields:
- _ppf_timestamp
- _ppf_id
- _ppf_platform
- _ppf_tenant
- _ppf_channel
- _ppf_topology
- _ppf_component
- _ppf_error_message
- _ppf_error_document
- subsystem
- stream: _ppf_metrics
fields:
- _ppf_latency
# Kafka output
- type: kafka_output
component: kafka_front_output_switcher
settings:
encoding: lumberjack
producer.acks: "1"
topics:
- stream: linux_os_logs
topic: reftenant-receiver-linux-os
- stream: apache_logs
topic: reftenant-receiver-apache
- stream: winlogbeat_logs
topic: reftenant-receiver-winlogbeat
- stream: unknown_device_logs
topic: reftenant-receiver-unknown
- stream: _ppf_errors
topic: reftenant-receiver-switch-errors
subscribe:
- stream: winlogbeat_logs
component: ecs_device_type_switcher
- stream: linux_os_logs
component: syslog_device_type_switcher
- stream: apache_logs
component: syslog_device_type_switcher
- stream: unknown_device_logs
component: ecs_device_type_switcher
- stream: unknown_device_logs
component: syslog_device_type_switcher
- stream: _ppf_errors
component: syslog_device_type_switcher
- stream: _ppf_errors
component: ecs_device_type_switcher
metrics:
reporters:
- type: kafka
reporting_interval: 60
settings:
topology.max.spout.pending: 5000
topology.component.resources.onheap.memory.mb: 200
topology.enable.message.timeouts: true
topology.message.timeout.secs: 30