Log Central : Platform Events dispatcher punchlines (3)¶
Key design highlights¶
Transport / interface / HA¶
The logs are received from Log Collectors in lumberjack protocol, in a load-balanced way through an inbuilt balancing mechanism of the sender site. Here a Punch lumberjack output node acts as an emitter in a forwarder punchline of the Log Collector.
To achieve High Availability, at least two receiver punchlines are started on different nodes. To fix punchline location on a node, we use Shiva tags. Each Shiva node has a default tag matching the hostname. In legacy configurations, a 1-node Storm cluster could be used instead. A copy of the punchline is run inside each single-noded Storm cluster.
With multiple receivers, we also have scalability of input, because the lumberjack sender (lumberjack output node) is load-balancing over all available lmuberjack receivers. They will combine their throughput capacity as long as the target Kafka cluster is configured as needed.
Multiple target indices¶
The different kind of platform events are written to separate target indices. The target indice depends on the source of the data, and the desired retention duration. For example, operator start/stop commands must be kept for more than 1 year, to ensure that automated channels monitoring services are taking into account last known operator action on ALL applications of the platform, even those that are rarely stopped.
The standard indices naming for dispatching events is indicated in Tenants configuration indices table
The switching rule is implemented in this punchlet :
/**
* This punchlet takes platform events coming from an LTR or a Back Office "platform-monitoring" kafka topics. Events are
* written in a Kafka topic using the "JSON" format so only
* one field is generated. Here, we re-do the "Y" to send
* events to the right Elasticsearch index
* We need separate indices, because retention rules are not the same for different platform events types,
* and to ease querying.
* (e.g. 1 year for punchctl operator commands audit events,
* 3 days for system metrics, a few days more for platform health...)
* This is a reference configuration item for DAVE 6.0 release - checked 21/07/2020 by CVF
*/
{
root = [docs];
String docType = "undetermined";
String indexPrefix = "platform-errors-";
String indexType = "daily";
String platform = "undetermined";
String forwardingHost = "undetermined";
String tenant = "undetermined";
if (world:[meta][platform_id]) {
platform = world:[meta][platform_id] ;
}
if ( world:[meta][tenant] ) {
tenant = world:[meta][tenant];
}
String release_index_prefix = "";
if (world:[meta][index_prefix]) {
release_index_prefix = world:[meta][index_prefix];
}
// If we have a json document that already holds platform id or tenant info, then retrieve it...
if ( [doc][platform][id]) {
platform = [doc][platform][id];
}
// If we have received the event from a remote sender, and the punchline publishes the info,
// then we keep track of the host that sent the event on the network
// This is useful if the actual platform id is not available or ambiguous.
if ( [_ppf_remote_host] ) {
forwardingHost = [_ppf_remote_host];
[doc][platform][forwarding_host] = [_ppf_remote_host];
}
if ( [doc][platform][tenant] ) {
tenant = [doc][platform][tenant];
} else {
[doc][platform][tenant] = tenant;
}
if ( [doc][type] ) {
docType = [doc][type];
} else if ( [doc][@metadata][beat] ) {
docType = [doc][@metadata][beat];
}
else if ( [doc][service][type] ) {
docType = [doc][service][type];
} else if ( [doc][target][type] ) {
docType = [doc][target][type];
}
// The timestamp of the eventis normally available in the incoming json standard field following beats convention:
String dateString = null;
if ([doc][@timestamp]) {
dateString = [doc][@timestamp];
}
switch (docType) {
case "platform":
case "platform-monitoring":
indexPrefix = "platform-monitoring-";
break;
case "channels-monitoring":
indexPrefix = tenant + "-channels-monitoring-";
break;
case "platform-health":
indexPrefix = "platform-health-";
break;
case "storm":
indexPrefix = tenant + "-metrics-";
break;
case "gateway-execution":
case "gateway":
indexPrefix = tenant + "-gateway-logs-";
break;
case "spark":
indexPrefix = tenant + "-spark-metrics-";
break;
case "spark-application":
indexPrefix = tenant + "-spark-metrics-";
break;
case "punch":
// We separate the operator commands from the verbose logs, to be able to keep these commands for more than a year, because they are needed for channels monitoring activation
if (([doc][init][process][name] == "channelctl") || ([doc][init][process][name] == "punchctl")) {
indexPrefix = "platform-operator-logs-";
indexType = "monthly";
} else {
indexPrefix = "platform-logs-";
}
break;
case "metricbeat":
indexPrefix = "platform-metricbeat-" + [doc][@metadata][version] + "-";
break;
default:
String errorMessage = "Unable to dispatch unknown monitoring document type ('" + docType + "') forwarded from host '" + forwardingHost + "' (platform '" + platform + "'' .";
throw new PunchRuntimeException(errorMessage);
}
String elasticsearch_date = "nodate-errors";
if (dateString != null) {
if (indexType == "daily") {
// here, the input date is : 2019-10-10T17:07:39.951Z
elasticsearch_date = date("yyyy.MM.dd", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").on(dateString).get();
} else {
// Monthly case
// here, the input date is : 2019-10-10T17:07:39.951Z
elasticsearch_date = date("yyyy.MM", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").on(dateString).get();
}
}
[index] = indexPrefix + release_index_prefix + elasticsearch_date;
}
Reference configuration example¶
Collector monitoring channel_structure example¶
As explained above, the receiving application (here input
) must be run in multiple instances located on each of the servers. This is an example on how to use Shiva tags in channel_structure.yaml
:
# This channel handles forwarded ltr site monitoring data.
# It normally runs the same punchline on multiple input nodes that receive lumebrjack forwarding for ltrs.
# The punchline writes the logs/events received from the ltr into various elasticsearch indices.
version: "6.0"
start_by_tenant: true
stop_by_tenant: true
applications:
# collector events receiver front1
- name: collector_events_receiver1
runtime: shiva
cluster: shiva_front
shiva_runner_tags:
- centralfront1
command: punchlinectl
args:
- start
- --punchline
- receiver.yaml
- --childopts
- -Xms96m -Xmx96m # will override topology.component.resources.onheap.memory.mb
# collector events receiver front2
- name: collector_events_receiver2
runtime: shiva
cluster: shiva_front
shiva_runner_tags:
- centralfront2
command: punchlinectl
args:
- start
- --punchline
- receiver.yaml
- --childopts
- -Xms96m -Xmx96m
# collector events indexer
- name: collector_events_indexer
runtime: shiva
cluster: shiva_front
shiva_runner_tags:
- shiva_front
command: punchlinectl
args:
- start
- --punchline
- indexer.yaml
- --childopts
- -Xms256m -Xmx512m
resources:
- type: kafka_topic
cluster: kafka_front
name: platform-collector-events
partitions: 1
replication_factor: 1
Remote collecter monitoring events receiver/dispatcher punchline example¶
The associated dispatcher punchline for ltr/collection-site forwarded platform events usually looks like this:
# The purpose of this punchline is to receive various types of platform events from a source (LTR collector)
# platform, and to dispatch these different events into different indices
# so that they have different retention rules.
# This is important because all platform events (platform metrics, operator commands audit events, health monitoring status...)
# are coming through a single events flow (especially when coming from remote platforms).
version: "6.0"
tenant: platform
channel: collector_monitoring
runtime: shiva
type: punchline
name: collector_events_receiver
dag:
# Lumberjack input
- type: lumberjack_input
settings:
listen:
host: 0.0.0.0 # can be resolved to specify an interface
port: 1711
compression: true
ssl: true
ssl_private_key: "@{PUNCHPLATFORM_SECRETS_DIR}/server.pem"
ssl_certificate: "@{PUNCHPLATFORM_SECRETS_DIR}/server.crt"
ssl_trusted_certificate: "@{PUNCHPLATFORM_SECRETS_DIR}/fullchain-collector.crt"
publish:
- stream: docs
# The remote host metadata field can be used to track from which remote host(platform)
# an event has been received. This is just additional tracability information
# because the received documents (json) normally include a platform id field.
fields:
- doc
- _ppf_remote_host
# kafka output
- type: kafka_output
component: kafka_front_output
settings:
topic: platform-collector-events
encoding: lumberjack
producer.acks: "1"
subscribe:
- stream: docs
component: lumberjack_input
metrics:
reporters:
- type: kafka
reporting_interval: 60
settings:
topology.max.spout.pending: 6000
topology.component.resources.onheap.memory.mb: 200
Central site monitoring events dispatcher punchline example¶
# The purpose of this punchline is to receive various types of platform events from a source (LTR collector)
# platform, and to dispatch these different events into different indices
# so that they have different retention rules.
# This is important because all platform events (platform metrics, operator commands audit events, health monitoring status...)
# are coming through a single events flow (especially when coming from remote platforms).
version: "6.0"
tenant: platform
channel: collector_monitoring
runtime: shiva
type: punchline
name: collector_events_indexer
dag:
# Kafka input
- type: kafka_input
component: kafka_front_input
settings:
topic: platform-collector-events
encoding: lumberjack
start_offset_strategy: last_committed
auto.offset.reset: earliest
self_monitoring.activation: true
self_monitoring.period: 60
publish:
- stream: docs
fields:
- doc
- _ppf_remote_host
# Punchlet node
- type: punchlet_node
settings:
punchlet:
- standard/Platform/monitoring_dispatcher.punch
subscribe:
- component: kafka_front_input
stream: docs
publish:
- stream: docs
fields:
- doc
- index
- stream: _ppf_errors
fields:
- _ppf_error_message
- _ppf_error_document
- _ppf_timestamp
- _ppf_tenant
- _ppf_channel
# ES Output
- type: elasticsearch_output
settings:
cluster_id: es_monitoring
per_stream_settings:
- stream: docs
index:
type: tuple_field # ES index has been determined by the dispatcher punchlet
tuple_field: index
document_json_field: doc
additional_document_value_fields:
- document_field: es_ts
type: date
format: iso
# Now we deal with the error flows, which are escaped documents of the tuple
# that caused the punchlet exception, and additional metadata fields
# about the error and context
- stream: _ppf_errors
index:
type: daily
prefix: platform-monitoring-errors-
document_json_field: _ppf_error_document
additional_document_value_fields:
- document_field: "@timestamp"
type: date
format: iso
- document_field: tenant
type: tuple_field
tuple_field: _ppf_tenant
- document_field: channel
type: tuple_field
tuple_field: _ppf_channel
- document_field: error_message
type: tuple_field
tuple_field: _ppf_error_message
- document_field: error_ts
type: tuple_field
tuple_field: _ppf_timestamp
# In case Elasticearch rejects some insertion because of mapping error,
# to be able to keep all events
# AND troubleshoot the situation, the faulty documents will be inserted as
# escaped error documents here :
reindex_failed_documents: true
error_index:
type: daily
prefix: platform-monitoring-errors-
subscribe:
- component: punchlet_node
stream: docs
- component: punchlet_node
stream: _ppf_errors
metrics:
reporters:
- type: kafka
reporting_interval: 60
settings:
topology.max.spout.pending: 6000
topology.component.resources.onheap.memory.mb: 200