Skip to content

Metrics/Events Reporters

Introduction

The PunchPlatform processing pipelines and tools are publishing events and metrics to several backend by using common configuration parameters.

These metrics/events are generated by Storm topologies, Spark jobs or even by the platform itself. In any case, the metrics configuration stay exactly the same with a list of "reporters". There is no limit to the number of reporters, you can also use the same reporter configuration more than once (that is why they are in a dedicated section in deployment settings).

Let's take a quick example for each use case.

# storm_topology_example.json
{
  "dag": [
    ...
  ],
  "bolts": [
    ...
  ],
  "metrics": {
    "reporters": [
      {
        "type": "elasticsearch",
        ...
      },
      {
        "type": "logger",
        ...
      }
    ]
  },
  "storm_settings": {
    ...
  }
}
# spark_job_example.hjson
{
  runtime_id: my-job
  tenant: job_tenant
  job: [
    {
        ...
    },
  ],
  metrics: {
    reporters: [
      {
        type: elasticsearch
        ...
      }
      {
        type: console
        ...
      }
    ]
  }
}
# punchplatform.properties
{
  "platform": {
    "platform_id": "my-platform",
    "reporters": [
      {
        "type": "elasticsearch",
         "cluster_name": "es_search"
      },
      {
        "type": "kafka",
         ...
      }
    ]
  },
  ...
}

Common Parameters

The parameters described in this section are common to all the reporter's configuration.

"metrics": {
  "reporters": [
    {
      "type": "elasticsearch",
      "reporting_interval": 10,
      "regex": [
        "netty\\..*",
        "storm\\.worker\\.uptime"
      ],
      ...
    },
    {
      "type": "kafka",
      ...
    }
  ]
}
  • type (string, mandatory)

    Define the reporter type.

    values: "elasticsearch", "kafka", "lumberjack", "logger", "tcp", "udp".

  • reporting_interval (integer: 10)

    The period of metrics publication in seconds

  • regex (array)

    Array of string that filters the published metrics. For each metric event received, at least one of this regular expression pattern must match its metric "name" (i.e. "netty.app.recv"). All other metrics will be dropped. This is especially useful to forward only a subset of metrics event to a specific reporter. The expression must be compliant with the java standard "java.util.regex.Pattern" library.
    Note: The regular expression must match the entire metric name, not only a substring. For example, the regexp "netty" will not match "netty.app.recv". To do so, you can use this expression "netty.*".
    Also note that the character "\" is a special escaping character in JSON so you need to double it (like in the example above).

Reporters Configurations

Elasticsearch Reporter

In order to make topologies send their metrics to Elasticsearch, add an "elasticsearch" reporter configuration to the topology file, as illustrated next:

"metrics": {
  "reporters": [
    {
      "type": "elasticsearch",
      "cluster_name": "es_search"
    },
    {
      "type": "elasticsearch",
      "http_hosts": [
        {
          "host": "localhost",
          "port": 9200
        }
      ]
    }
  ]
}

If your Elasticsearch cluster is configured to use authentication, the recommended configuration is to use certificate authentication flow configuration in Opendistro Security and set key files location :

{
  "type": "elasticsearch",
  "cluster_name": "es_search",
  "ssl": true,
  "ssl_private_key": "/path/to/punch-user-client-key.pem",
  "ssl_certificate": "/path/to/punch-user-client-cert.pem",
  "ssl_trusted_certificate": "/path/to/punch-user-ca.pem"
}

with trust/keystore :

{
  "type": "elasticsearch",
  "cluster_name": "es_search",
  "ssl": true,
  "ssl_truststore_location": "/path/to/truststore.jks",
  "ssl_truststore_pass": "password",
  "ssl_keystore_location": "/path/to/keystore.jks",
  "ssl_keystore_pass": "password"
}

The reporters can still be configured with "credentials", but it is not a recommended configuration. Here is an example using a basic auth a "user" and a "password" fields :

{
  "type": "elasticsearch",
  "cluster_name": "es_search",
  "credentials": {
    "user": "sam",
    "password": "wize"
  }
}

If needed, you can also use a token as authentication with the "token" and "token_type" fields:

{
  "type": "elasticsearch",
  "cluster_name": "es_search",
  "credentials": {
    "token": "dXNlcjpwYXNzd29yZA=",
    "token_type": "Basic"
  }
}

In this case, the "token_type" depends on the security layer protecting the Elasticsearch cluster, adjust it to fit your needs.

  • cluster_name (string)

    Elasticsearch target cluster name, this name must be defined in the properties file. Mandatory if "http_hosts" is not set.

  • http_hosts (array of map)

    An array of maps containing the Elasticsearch cluster host and port information. Mandatory if "cluster_name" is not set.

    example: [{ "host": "<your_host>", "port": 9200 }]

  • index_name (string: "<tenant>-metrics")

    The final Elasticsearch index is composed by <index_name>-<date>. This parameter allows you to set a custom index name to this reporter. We advise you to use the keyword "metrics" in the index name to rely on provided mapping templates (i.e. 'mytenant-special-metrics').

  • index_suffix_date_pattern (string: "yyyy.MM.dd")

    The index name suffix formatted with year, month and day

  • credentials (map)

    Use this section to set an authentication to Elasticsearch. It can rely on both user/password or token strategies.

  • ssl (boolean)

    Enable authentication to Elasticsearch.

  • ssl_private_key (string)

    When SSL is enable, you have to provide a path to a private key

  • ssl_certificate (string)

    When SSL is enable, you have to provide a path to a certificate

  • ssl_trusted_certificate (string)

    When SSL is enable, you have to provide a path to a trusted certificate

  • ssl_keystore_pass (string)

    Truststore password

  • ssl_keystore_location (string)

    Path to Keystore

  • ssl_truststore_pass (string)

    Keystore password

  • ssl_truststore_location (string)

    Path to Truststore

Here is a sample of an inserted Elasticsearch document:

{
  "_index": "mytenant-metrics-2019.07.16",
  "_type": "_doc",
  "_id": "iKLb-WsBGH4zfCiSNX9f",
  "_score": 1.0,
  "_source": {
    "@timestamp": "2019-07-16T08:17:00.769Z",
    "storm": {
      "worker": {
        "start_date": "2019-07-16T08:16:50.764Z",
        "uptime": {
          "count": 9
        }
      }
    },
    "name": "storm.worker.uptime",
    "rep": {
      "host": {
        "name": "punch-elitebook"
      }
    },
    "type": "storm",
    "platform": {
      "storm": {
        "topology": "tcp_in_to_stdout",
        "container_id": "punch-elitebook"
      },
      "channel": "apache-httpd",
      "id": "punchplatform-primary",
      "tenant": "mytenant"
    }
  }
}

Log Reporter

To make topologies log their metrics in a standard software logger (for example for appending to a file), add a logger section to the topology file, as illustrated next:

"metrics": {
  "reporters": [
    {
      "type": "logger",
      "reporting_interval": 5,
      "format": "kv"
    }
  ]
}
  • format (string: "json")

    Select the log output format. "kv" is more log-parsing friendly, but "json" shows full metric document as would be recorded into Elasticsearch.

    values: "kv", "json"

You can then configure the Storm logger to either log the metrics to a file or to a centralized logging service (if available on your platform). In the $STORM_INSTALL_DIR/log4j2/worker.xml of each worker machine, include the following logger:

<logger name="org.thales.punch.libraries.reporters.console.LogReporter" level="INFO"/>

Note

On a standalone installation, this file is located in the $PUNCHPLATFORM_CONF_DIR/../external/apache-storm-*/log4j2 folder. Also note that whenever you start a topology using the punchlinectl shell, the logger configuration file used is different and is located at $PUNCHPLATFORM_CONF_DIR/../external/punch-operator-*/bin/log4j2-topology.xml

Here are 2 output examples with both "json" and "kv" format:

# JSON
13:40:56 o.t.p.l.r.c.LogReporter [INFO] {"@timestamp":"2019-07-16T11:40:56.747Z","storm":{"tuple":{"pending":{"count":0}}},"name":"storm.tuple.pending","rep":{"host":{"name":"punch-elitebook"}},"type":"storm","platform":{"storm":{"component":{"name":"tcp_spout_apache_httpd","task_id":1,"type":"syslog_spout"},"topology":"tcp_in_to_stdout","container_id":"punch-elitebook"},"channel":"default","id":"punchplatform-primary","tenant":"mytenant"}}
# KV
13:43:15 o.t.p.l.r.c.LogReporter [INFO] storm.tuple.pending.count=0 platform.id=punchplatform-primary type=storm platform.storm.component.name=tcp_spout_apache_httpd @timestamp=2019-07-16T11:43:15.792Z platform.storm.component.task_id=1 name=storm.tuple.pending platform.storm.component.type=syslog_spout platform.storm.container_id=punch-elitebook rep.host.name=punch-elitebook platform.channel=default platform.storm.topology=tcp_in_to_stdout platform.tenant=mytenant 

Kafka Reporter

In order to make topologies send their metrics to a Kafka topic, add a "kafka" reporter to the topology file, as illustrated next:

"metrics": {
  "reporters": [
    {
      "type": "kafka",
      "brokers": "local",
      "topic": "<topic_name>",
      "metric_document_field_name": "log",
      "reporting_interval": 30,
      "encoding": "lumberjack"
    }
  ]
}

If your kafka cluster is configured to use authentication :

{
  "type": "kafka",
  "brokers": "local",
  "topic": "<topic_name>",
  "metric_document_field_name": "log",
  "reporting_interval": 30,
  "encoding": "lumberjack",
  "security.protocol": "SSL",
  "ssl.truststore.location": "/path/to/truststore.jks",
  "ssl.truststore.password": "mypassword",
  "ssl.keystore.location": "/path/to/keystore.jks",
  "ssl.keystore.password": "mypassword"
}
  • topic (string, mandatory)

    The Kafka topic where the metrics tuples will be written

  • brokers (string)

    Kafka brokers cluster name, matching a kafka cluster key declared in punchplatform.properties.

or

  • bootstrap.servers (list)

    A comma-separated list of :. Overrides the brokers list that can be computed automatically using the 'brokers' parameter E.G. : "bootstrap.servers": "host1:9092,host2:9092"

  • encoding (string: "lumberjack")

    Specify kafka encoding, "json" or "lumberjack"

  • metric_document_field_name (string: "log")

    Set the name of the JSON field that will contain the log document

  • security.protocol (string)

    Set security protocol

  • ssl.truststore.location (string)

    When SSL is enable, you may provide a path to a truststore containing the trusted certificates

  • ssl.truststore.password (string)

    The truststore's password

  • ssl.truststore.location (string)

    When SSL is enable, you may provide a path to a keystore containing the client's private key and certificate

  • ssl.truststore.password (string)

    The keystore's password

The resulting metric's document send to the Kafka topic looks the following one. Note that the document will be Lumberjack encoded.

{
  "log": {
    "@timestamp": "2019-06-20T06:56:18.543Z",
    "storm": {...},
    "name": "storm.tuple.ack",
    "rep": {...},
    "type": "storm",
    "platform": {...}
  }
}

Lumberjack Reporter

In order to make topologies forward their metrics through Lumberjack, add a "lumberjack" reporter to the topology file, as illustrated next:

"metrics": {
  "reporters": [
    {
      "type": "lumberjack",
      "metric_document_field_name": "log",
      "reporting_interval": 30,
      "destination": [
        {
          "compression": false,
          "host": "<target_address>",
          "port": 9999
        }
      ]
    }
  ]
}
  • metric_document_field_name (string: "log")

    The JSON root field name where the metric document is sent.

  • startup_connection_timeout (integer: 1000)

    In milliseconds, the time waited at initial startup phase, before decision is made to target the group that has reached the maximum weight (no effect when only one group is targeted)

  • group_connection_timeout (integer: 1000)

    In milliseconds, the time waited at initial startup phase before it is considered an error to have no available target group

  • destination (array, mandatory)

    A list of JSON destination. To see how to declare a destination, please refer the Lumberjack bolt documentation.

Here is a result sample:

{
  "log": "{\"@timestamp\":\"2019-06-20T07:28:22.407Z\",\"storm\":{\"tuple\":{\"ack\":{\"count\":0,\"m1_rate\":0.0}}},\"name\":\"storm.tuple.ack\",\"rep\":{\"host\":{\"name\":\"punch-elitebook\"}},\"type\":\"storm\",\"platform\":{\"storm\":{\"component\":{\"name\":\"tcp_spout_sourcefire\",\"task_id\":1,\"type\":\"syslog_spout\"},\"topology\":\"single\",\"container_id\":\"local_punch-elitebook\"},\"channel\":\"sourcefire\",\"id\":\"punchplatform-primary\",\"tenant\":\"mytenant\"}}"
}

TCP Socket Reporter

In order to make topologies forward their metrics through TCP connexion, add a TCP socket reporter to the topology file, as illustrated next:

"metrics": {
  "reporters": [
    {
      "type": "tcp",
      "reporting_interval": 30,
      "destination": [
        {
          "host": "<target_address>",
          "port": 9999
        }
      ]
    }
  ]
}
  • startup_connection_timeout (integer: 1000)

    time waited at initial startup phase, before decision is made to target the group that has reached the maximum weight (no effect when only one group is targeted)

  • group_connection_timeout (integer: 1000)

    time waited at initial startup phase before it is considered an error to have no available target group

  • destination (map, mandatory)

    A list of JSON destination. To see how to declare a destination, please refer the Syslog bolt documentation.

Here is a result sample:

{"@timestamp":"2019-06-20T08:49:22.299Z","storm":{"tuple":{"rtt":{"min":0,"max":0,"mean":0.0,"count":0,"stddev":0.0}}},"name":"storm.tuple.rtt","rep":{"host":{"name":"punch-elitebook"}},"type":"storm","platform":{"storm":{"component":{"name":"tcp_spout_sourcefire","task_id":1,"type":"syslog_spout"},"topology":"single","container_id":"local_punch-elitebook"},"channel":"sourcefire","id":"punchplatform-primary","tenant":"mytenant"}}

UDP Socket Reporter

In order to make topologies forward their metrics over UDP protocol, add an UDP socket reporter to the topology file, as illustrated next:

"metrics": {
  "reporters": [
    {
      "type": "udp",
      "reporting_interval": 30,
      "destination": [
        {
          "host": "<target_address>",
          "port": 9999
        }
      ]
    }
  ]
}
  • destination (map, mandatory)

    A list of JSON destination. To see how to declare a destination, please refer the Syslog bolt documentation.

Here is a result sample:

{"@timestamp":"2019-06-20T08:58:32.216Z","name":"netty.raw.recv","type":"storm","rep":{"host":{"name":"punch-elitebook"}},"platform":{"storm":{"component":{"name":"tcp_spout_sourcefire","task_id":1,"type":"syslog_spout"},"topology":"single","container_id":"local_punch-elitebook"},"channel":"sourcefire","id":"punchplatform-primary","tenant":"mytenant"},"netty":{"raw":{"recv":{"count":0}},"target":{"protocol":"tcp","port":9902,"host":"0.0.0.0"}}}

Console Reporter

This reporter is not designed to be used in production and must be used with caution: for debugging only. It is a simple wrapper around the Java System.out.println(...) method.

Because it will output the metrics to the current terminal/console, it is the quickest way to test the metric filtering when using the "regex" option.

"metrics": {
  "reporters": [
    {
      "type": "console",
      "reporting_interval": 10
    }
  ]
}

Production setup

In a deployed platform we recommend to use Kafka reporters for all your punchlines and services as Shiva for example. Indeed, by sending all logs in Kafka you benifit of all Kafka advantages. In addition, you just have to design a simple punchline which reads your Kafka topic and dispatches logs to suitable Elasticsearch index using a punchlet as follows :

    /**
     * This punchlet takes platform events coming from an LTR or a Back Office "platform-monitoring" kafka topics. Events are
     * written in a Kafka topic using the "JSON" format so only
     * one field is generated. Here, we re-do the "Y" to send
     * events to the right Elasticsearch index
     * We need separate indices, because retention rules are not the same for different platform events types,
     * and to ease querying.
     * (e.g. 1 year for punchctl operator commands audit events,
     * 3 days for system metrics, a few days more for platform health...)

     * This is a reference configuration item for DAVE 6.0 release - checked 21/07/2020 by CVF

     */
    {
        root = [docs];
        String docType = "undetermined";
        String indexPrefix = "platform-errors-";
        String indexType = "daily";
        String platform  = "undetermined";
        String forwardingHost = "undetermined";
        String tenant = "undetermined";

        if (world:[meta][platform_id]) {
            platform = world:[meta][platform_id] ;
        }

        if ( world:[meta][tenant] ) {
            tenant = world:[meta][tenant];
        }
        String release_index_prefix = "";
        if (world:[meta][index_prefix]) {
            release_index_prefix = world:[meta][index_prefix];
        }



        // If we have a json document that already holds platform id or tenant info, then retrieve it...
        if ( [doc][platform][id]) {
            platform = [doc][platform][id];
        }

        // If we have received the event from a remote sender, and the punchline publishes the info,
        // then we keep track of the host that sent the event on the network
        // This is useful if the actual platform id is not available or ambiguous.

        if ( [_ppf_remote_host] ) {
            forwardingHost = [_ppf_remote_host];
            [doc][platform][forwarding_host] = [_ppf_remote_host];
        }


        if ( [doc][platform][tenant] ) {
            tenant = [doc][platform][tenant];
        } else {
            [doc][platform][tenant] = tenant;
        }



        if ( [doc][type] ) {
            docType = [doc][type];
        } else if ( [doc][@metadata][beat] ) {
            docType =  [doc][@metadata][beat];
        }
        else if ( [doc][service][type] ) {
            docType =  [doc][service][type];
        } else if ( [doc][target][type] ) {
            docType =  [doc][target][type];
        }

        // The timestamp of the eventis normally available in the incoming json standard field following beats convention:
        String dateString = null;
        if ([doc][@timestamp]) {
            dateString = [doc][@timestamp];
        }

        switch (docType) {
            case "platform":
            case "platform-monitoring":
                indexPrefix = "platform-monitoring-";
                break;
            case "channels-monitoring":
                indexPrefix = tenant + "-channels-monitoring-";
                break;
            case "platform-health":
                indexPrefix = "platform-health-";
                break;
            case "storm":
                indexPrefix = tenant + "-metrics-";
                break;
            case "gateway-execution":
            case "gateway":
                indexPrefix = tenant + "-gateway-logs-";
                break;
            case "spark":
                indexPrefix = tenant + "-spark-metrics-";
                break;
            case "spark-application":
                indexPrefix = tenant + "-spark-metrics-";
                break;
            case "punch":
                // We separate the operator commands from the verbose logs, to be able to keep these commands for more than a year, because they are needed for channels monitoring activation
                if (([doc][init][process][name] == "channelctl") || ([doc][init][process][name] == "punchctl")) {
                    indexPrefix = "platform-operator-logs-";
                    indexType = "monthly";
                } else {
                    indexPrefix = "platform-logs-";
                }
                break;
            case "metricbeat":
                indexPrefix = "platform-metricbeat-" + [doc][@metadata][version] + "-";
                break;
            default:
                String errorMessage =  "Unable to dispatch unknown monitoring document type ('" + docType + "') forwarded from host '" + forwardingHost + "' (platform '" + platform + "'' .";
                throw new PunchRuntimeException(errorMessage);
            }

        String elasticsearch_date = "nodate-errors";
        if (dateString != null) {
            if (indexType == "daily") {
                    // here, the input date is : 2019-10-10T17:07:39.951Z
                 elasticsearch_date = date("yyyy.MM.dd", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").on(dateString).get();
            } else {
                // Monthly case
                    // here, the input date is : 2019-10-10T17:07:39.951Z
                 elasticsearch_date = date("yyyy.MM", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").on(dateString).get();
            }
        }
        [index] = indexPrefix +  release_index_prefix + elasticsearch_date;

    }

When defining a Kafka reporter in Reporters section in punchplatform-deployment.settings, the Punch deployer creates Kafka topics automatically using the following settings :