Skip to content

HOWTO Connect an EXTERNAL Logstash collector to Punchplatform

Why do that

If you want to be sure that there is no loss of log between an external logstash component and Punchplatform. As an example, we will check the number of logs between Logstash and Punchplatform.

Note: If you want to use an internal logstash task on your Punch platform in order to receive logs from an external source (for example to benefit from some Logstash connector or feature ) then refer to How to use an embedded Logstash as a Punchplatform input connector

Prerequisites

You need :

  • punch deployment (or punchplatform standalone)
  • Logstash (5.6.2 or newer)

What to do

The lossless transport is achieved through lumberjack protocol, supported both by logstash and punchplatform.

Please note that to achieve the same level of metadata capture (originator IP, unique log ID...) as with direct reception by punch from a source device, you may have to augment your logstash configuration with additional settings, and enrich your punch channel to use/normalize the metadata captured by logstash (see How to use an embedded Logstash as a Punchplatform input connector )

Configure Logstash for lumberjack emission

Configure your logstash pipeline with a lumberjack output. e.g.:

input {
   tcp {
    port => 9901
  }
}
output {
    lumberjack {
        hosts => "localhost"
        port => 29901
        ssl_certificate => "/home/user/keys/logstash/logstash.crt"
    }
}

See also HOWTO connect lumberjack output of Logstash and lumberjack spout of Storm

Configure a Punchplatform punchline for receiving the lumberjack flow

Create an input topology with the lumerjack input node configuration (in lmr _in _topology.json or another receiving punchline) :

{
    "tenant" : "mytenant",
    "channel" : "test",
    "name" : "lmr_in",
    "meta" : {
      "tenant" : "mytenant"
    },
    "spouts" : [
        {
            "type" : lumberjack_input",
            "settings" : { 
              "listen" : {
                "host" : "0.0.0.0",  
                "port" : 29901,
                "compression" : false,
                "ssl" : true,
                "ssl_private_key" : "/home/user/keys/logstash/punchplatform.key8",
                "ssl_certificate" : "/home/user/keys/logstash/logstash.crt"
              },              
              "self_monitoring.activation" : false
            },
            "storm_settings" : {
              "executors": 1,
              "component" : "syslog_spout_lumberjack",
              "publish" : [ 
                { 
                  "stream" : "logs", 
                  "fields" : ["line"]
                }
              ] 
            }
        }
    ],

    // WHAT FOLLOWS IS BUT AN EXAMPLE. USUALLY WE SHOULD ONLY WRITE TO KAFKA

    "bolts" : [        
        {
           "type" : "distinct_log_counter_bolt",
           "settings" : {             
           },
           "storm_settings" : {
              "executors": 1,
              "component" : "distinct_log_counter",
              "subscribe" : [
                { 
                  "component" : "syslog_spout_lumberjack", 
                  "stream" : "logs",
                  "grouping": "localOrShuffle" 
                }
              ],
               "publish" : [ 
                { 
                  "stream" : "logs", 
                  "fields" : ["seq"]
                }
              ]  
            }
        },
        {
          "type": "elasticsearch_bolt",
          "settings": {
            "cluster_id": "es_search",
            "per_stream_settings" : [
              {
                "stream" : "logs",          
                "index" : { "type" : "daily" , "prefix" : "events-%{tenant}-" },          
                "document_json_field" : "log",          
                "document_id_field" : "local_uuid"          
              }
            ]
          },
          "storm_settings": {
            "executors" : 1,
            "component": "elasticsearch_bolt",
            "subscribe": [ { "component": "distinct_log_counter", "stream": "logs", "grouping" : "localOrShuffle" } ]
          }
        }
    ], 

    ...
}

Configure Injector

In order to inject logs in Logstash, we can use the punchplatform-injector. Define a configuration as follow (sequence _injector.json) :

{
  # Set here where you want to send your samples. It must be the input point of 
  # point of your topology
  "destination" : { "proto" : "tcp", "host" : "127.0.0.1", "port" : 9901 },

  "load" :{
    "stats_publish_interval" : "2s",
    "message_throughput" : 59
  },

  # In this section you define what you inject
  "message" : {
    "payloads" : [
      "seq=%{counter}"
    ],

    "fields" : {
      "counter" : {
        "type" : "counter",
        "min" : 1
      }
    }
  }
}

Execute

Launch Logstash pipeline :

bin/logstash -f first-pipeline.conf --config.reload.automatic

Launch the topology :

punchlinectl /home/user/punch-standalone-3.3.6-SNAPSHOT/conf/tenants/mytenant/channels/test/lmr_in_topology.json

Launch the injector :

punchplatform-log-injector.sh -c ./resources/injector/mytenant/sequence_injector.json -n 300000 -t 5000

See the log directly in Kibana or change the level of log in logback-topology.xml ( )

Example of results

In topology logs :

[INFO] message="size of tuple map" size=0
[INFO] message="size of tuple map" size=0
[INFO] message="size of tuple map" size=17316
[ERROR] message="lumberjack peer stopped reading its acks" channel=[id: 0x214338bf, L:/127.0.0.1:29901 - R:/127.0.0.1:38072]
[ERROR] message="closed channel" channel=[id: 0x214338bf, L:/127.0.0.1:29901 ! R:/127.0.0.1:38072]
[ERROR] message="lumberjack peer stopped reading its acks" channel=[id: 0xe350d992, L:/127.0.0.1:29901 - R:/127.0.0.1:41178]
[ERROR] message="closed channel" channel=[id: 0xe350d992, L:/127.0.0.1:29901 ! R:/127.0.0.1:41178]
[INFO] message="size of tuple map" size=166839
[ERROR] message="lumberjack peer stopped reading its acks" channel=[id: 0x883a19f9, L:/127.0.0.1:29901 - R:/127.0.0.1:41316]
[ERROR] message="closed channel" channel=[id: 0x883a19f9, L:/127.0.0.1:29901 ! R:/127.0.0.1:41316]
[ERROR] message="lumberjack peer stopped reading its acks" channel=[id: 0x6b7301d3, L:/127.0.0.1:29901 - R:/127.0.0.1:41486]
[ERROR] message="closed channel" channel=[id: 0x6b7301d3, L:/127.0.0.1:29901 ! R:/127.0.0.1:41486]
[INFO] message="size of tuple map" size=300000
[INFO] message="size of tuple map" size=300000

In Kibana :

{
  "_index": "events-mytenant-2017.10.12",
  "_type": "log",
  "_id": "AV8QZz-GBPkFG95pVBHv",
  "_score": null,
  "_source": {
    "message": "size of tuple map",
    "size": 300000,
    "ts": "2017-10-12T13:45:22.821+02:00"
  },
  "fields": {
    "ts": [
      1507808722821
    ]
  },
  "sort": [
    1507808722821
  ]

}

Well done ! 300000 logs, all logs are arrived although that we had disconnection problems !