Skip to content

HOWTO Use an EMBEDDED Logstash instance as a Punchplatform input connector

IN PROGRESS

Why do that

Logstash and its community may be providing features or protocol connectors complementary to the ones already provided by native Punchplatform nodes.

In this situation, you can embed any external task (like logstash) inside a punchplatform channel to act as a punchplatform connector

Note: If you want to connect to an external logstash provider in order to receive logs safely from this external Logstash, then please refer to HOWTO Connect an EXTERNAL Logstash collector to Punchplatform

Prerequisites

You need a deployed Punchplatform, including one (or more for high available collection) shiva runner node.

What to do

The overall process: - prepare a Logstash configuration file, to provide network input listening and writing logs into a Punchplatform kafka topic - configure a channel (channel_structure.hjson) to include a (input) shiva task for running logstash with a placement tag to ensure that this logstash task runs on the desired input node(s). - If high-availability is needed at input point, declare a second task in the channel, with requirement tag for placing it on the other shiva runner node. Then configure a Virtual IP Management cluster of daemons (pacemaker, keepalived...) to ensure the listening IP is always assigned to one of the input nodes - adjust your punchplatform channels in charge of processing the log (forwarding, parsing, indexing...) so that they deal with the logstash formatting in kafka (json encoding, no punch metadata)

Logstash configuration

Create a logstash pipeline with a lumberjack output (first-pipeline.conf) :

input {
  relp {
    id => my_logstash_listener_2514
    port => 2514
  }
}

filter {
  uuid {
     target => "uuid"
  }
}

output {

  kafka {
    codec => json
    topic_id => "reftenant-relp-input"
  }
}

Punch configuration

The metadata that punch usually generates upon documents reception (_ppf_timestamp, _ppf_id, ...) are of course not directly generated by logstash.

The punchline that follows in the channel is therefore in charge of normalizing the available metadata (e.g. translating 'uuid' field in the json document into a separate '_ppf_id' field, that allows to remove duplicates when inserting multiple times into Elasticsearch)

Example of normalizing input punchlet (to be executed before any other usual punchlet for parsing):

{

  /* This punchlet normalizes the data produced by a logstash input application.
      Logstash with its 'uuid' filter produces a unique JSON field that looks like this (:

      {
          "@timestamp": "2020-12-20T15:36:30.640Z",
          "@version": "1",
          "host": "192.168.56.119:43272",
          "local_host": "thishostname",
          "local_port": 2514
          "uuid": "3eabce73-aa5c-4d83-84e3-859bc6d192c6"
          "message": "<46>2020-12-20T15:36:30.460097+00:00 ubuntu rsyslogd: Here is a raw log"
    }


    We suppose here that this unique json comes in the '[logs][log]' stream/field.


    This punchlet will produce the standard fields that a punch syslog input node
    can produce:

    - _ppf_id : a unique log id string (retrieved from logstash 'uuid')
    - _ppf_timestamp: the input timestamp (retrived from logstash '@timestamp')
    - _ppf_remote_host: the source network adress of the server that connected
       to the input (logstash) collector application (retrieved from logstash 'host')
        - _ppf_remote_port: the source network TCP port (retrieved from logstash 'host')
    - _ppf_local_port: the reception port 
    - 'log' field will only contain the message payload string (i.e. the 'message' field from logstash)

  */



  // reduce verbosity of following lines by relocating the implcit root of all
  // tuples to [logs]
  root = [logs];  

  Tuple input = [log];

  // Warning: due to the dissect operator behaviour, 
  // this overwrites the content of the 'root' so we have to use 
  // this BEFORE adding up other fields
  if (dissect("%{_ppf_remote_host}:%{_ppf_remote_port}").on(input:[host]).into(root)) {
    [_ppf_remote_host] = [log][host];
  }

  [_ppf_id] = input:[uuid];
  [_ppf_timestamp] = input:[@timestamp];
  [_ppf_local_host] = input:[local_host];
  [_ppf_local_port] = input:[local_port];
  [log] = input:[message];
}