Skip to content

Punchlines

Abstract

This track covers the fundamentals of punchlines.

First of all, be sure to read the punchline value proposition chapter.

Overview

Stream or Batch Data Processing

A punchline is an application that processes data.

  • streaming punchline runs forever and is typically used to continuously collect, transform and/or saves the data from various sources up to some sink(s).
  • Batch punchlines also process data but run on a finite dataset. Batch punchlines starts, do what they do, then ends. There are many use cases for batch punchlines: aggregations, extractions, machine learning etc..

The beauty of the punch is that you have a single format to deal with both kinds of punchlines. You define a directed graph of nodes. Go through the Punchlines documentation where this is explained clearly.

Designing a Streaming Punchline

We will now design and run a simple punchline. We will start wimple with a punchline dag made of two nodes : one node accepting incoming data onto a tcp socket, and another one to simply print out that data. We will then elaborate and make it more realistic and useful.

Along with these exercices, it will be helpful to you to refer to the Storm Punchline documentation.

Important

This track leverages a (so-called) storm punchline. A storm-like punchline is one that uses the data flow model defined by apache storm, a popular stream processing framework. It does not however means that it actually run in a storm cluster. The important popint is the streaming model defined by storm that is still an excellent model to design powerful streaming apps.

Step 1 : A Simple Punchline

Let us write a simple punchline composed of:

    { print(root); }

key elements for the exercise

  • dag stands for Direct Acyclic Graph. A punchline is modeled as a set of nodes linked together with a publish-subscribe relationship. Each node is one element in the dag array.
  • A node is characterized by a type publish and/or subscribe parameters, and settings.

  • to run an inlined punchlet (the above root printer punchlet), you must provide punchlet_code setting to the punch node, instead of the punchlet node setting.

  • the punch node must subscribe to the a stream that your syslog input node publishes

This dag and node documentation cover these concepts in detail.

The streams and fields configuration of each node explicitely defines the data flow. Input nodes only publish data, processing nodes (most often) subscribe to and publish some data. Output nodes publish data to a data sink.

Exercise

Using your favorite file editor, create a file called `punchline_storm.yaml* based on: - the yaml structure of a punchline, - example syslog input/punch nodes reference documentation - and above hints.

a Solution
version: '6.0'
runtime: storm
dag:
- type: syslog_input
  settings:
    load_control: none
    listen:
      proto: tcp
      host: 0.0.0.0
      port: 9999
  publish:
  - stream: logs
    fields:
    - log
- type: punchlet_node
  settings:
    punchlet_code: '{ print(root); }'
  subscribe:
  - component: syslog_input
    stream: logs

Was the 'load_control' setting useful there ?

Key information

Nodes subscribe to streams by a source component name and by a stream name.

By default, each node has a 'component name' which is the same as its type.

But of course if you have multiple nodes of same type, then you have to give them a specific component name yourself by adding the optional component attribute to the node (not to its 'settings' section, in the node itself)

Step 2 : Execute your Punchline

Once your standalone is installed and started, simply run the following command to execute your punchline in foreground mode :

punchlinectl -t mytenant start -p punchline_storm.yaml

Question

Have a look at the output logs. Can you make sense of of them ?

Step 3 : Inject some data

Your punchline is now running, let us inject some data into it using the punchplatform-log-injector.sh tool. That tool is part of the punch and can sends whatever needed test/arbitrary data to various destination with flexible protocol settings.

As for other punch tools, you can refer to the manual page for usage of the logs injector.

In a new terminal, simply run :

punchplatform-log-injector.sh -c $PUNCHPLATFORM_CONF_DIR/resources/injectors/examples/syslog_injector.json -p 9999

As you can see your punchline is printing each received logs to the terminal standard output.

Question

Can you understand the content of that injector file ?

Key points

  • with this tool you can test load/performance, inject error, ... so test sets must be developed for all your parsers and pipelines
  • payloads are usually provided as raw strings, double-quoted (with doublequotes and newlines escaped as \" and '\n')
  • some fields tags can be inserted in the payloads, to have some randomization of data, and to have consistent timestamp with the realtime
  • supported protocols are tcp, udp, lumberjack (including options for compression, ssl, lumberjack frame encoding )
  • you can override procotol, target host, port, number of logs, max sending rate with command-line options
  • in tcp and lumberjack, you will have flow-control, so the displayed rate is the max throughput of network/target punchline

Step 4 : Adding a parsing node

Let us now improve our punchline. As an exercice, insert a new punch node that subscribes to the syslog input node, run some log parsers (aka punchlets) and publish the result to our printer punch node.

To help you, here is how you can reference punchlets files in the punch node settings. I.e. instead of executing an inline piece of code, you reference punchlet files.

Location of punchlets resources

  • When relative punchlet files path is provided (which is usual), the implicit root is the 'resources/punch' folder of your tenant folder)

So, if you are integrating a new tenant, ensure you store the needed resource in your tenant resources ! (For this training, you can just use the resources/punch folder of the 'mytenant' example folder from standalone)

  • note that resources can also be located in the channel folder, by using a special %{channel}%/ path start (see Punch Node configuration).
type: punchlet_node
component: processing
settings:
  punchlet:
  - punchlets/common/input.punch
  - punchlets/common/parsing_syslog_header.punch
subscribe:
- FILL THIS
publish:
- FILL THIS

Your goal is to rerun your punchline, inject some data, and understand what you see to standard output.

Question

Are you at ease with the punchlet concept ? How would you describe what a punchlet is ? An application ? a service ? a function ?

Some possible answers
  • punchlets can act as filters, as transformation functions, as data enrichers
  • punchlets can produce zero or many documents upon processing one entry document, and they can send them to various output streams of the punch node,

    So they can be alerters, filters, switches/routers, formaters for output data...

  • more complex punchlets can be stateful (accumulate counters, group session data)

Step 5 : Sending output to Elasticsearch

We now have an interesting punchline. Input logs are read from a tcp socket, then parsed into json documents. Let us now index these documents into the elasticsearch database.

Your next goal is to add an Elasticsearch Ouput node that subscribes to the parsing punchlet_node stream of data, and in turn index it into Elasticsearch.

Key naming conventions for Elastic indices

If you are doing this training on a shared platform, then work inside a 'tenant' dedicated to YOUR tests, and customize your target Elasticsearch indices so that they start with -events- instead of the sample 'mytenant'.

The 'tenant name' part is a requirement for housekeeping of your tenant data through the inbuilt service The '-events-' following part indicates the kind of data, and is used for applying standard fields types mapping on your indice.

To help you here follows the Elastisearch output node essential configuration:

Platform specifics

Be careful, your target elasticsearch cluster may differ if you run on a non-standalone platform.

In this case have a look at your platform configuration (installed at deployment time):

jq . $PUNCHPLATFORM_PROPERTIES_FILE
or even more targetted
jq '.elasticsearch.clusters|keys' $PUNCHPLATFORM_PROPERTIES_FILE

type: elasticsearch_output
settings:
  cluster_id: es_search
  per_stream_settings:
  - stream: logs
    index:
      type: daily
      prefix: mytenant-events-
    document_json_field: log
    additional_document_value_fields:
    - type: date
      document_field: '@timestamp'
      format: iso
subscribe:
- FILL THIS

Step 6 : Check the results in Elasticsearch

Relaunch your punchline:

punchlinectl -t mytenant start -p punchline_storm.yaml

Then in a new terminal, inject some data :

punchplatform-log-injector.sh -c $PUNCHPLATFORM_CONF_DIR/resources/injectors/examples/syslog_injector.json -p 9999

Check that your logs have been correctly inserted into Elasticsearch :

curl localhost:9200/_cat/indices/mytenant*?v

An Elasticsearch index starting with mytenant-events- must have been created with some documents in it.

Tip

Use Kibana to have a look at the Discover tab so better visualise the documents. Take a minute to understand what you see, the structure of the document, and compare it to the standard output trace of yoru punchline. Understanding the end to end processing from the log emitting up to its indexing into Elasticsearch must be completly clear to you.

Question

Discuss briefly with the trainer the way the json document produced by the punchlet_node is forwarded to Elasticsearch. Is there a one to one mapping between stream and fields (that are punchline streming concept) and the output document sent to Elasticsearch ?

Key points

Usually, the json document is copied from 1 field from the stream. Some other fields can be used (for the document id, for the targetted index name, for additional fields )

Optional Exercise

Now publish some more fields as published output of your 'syslog_input' and 'processing' nodes:

  • _ppf_timestamp
  • _ppf_id
  • _ppf_local_port
  • _ppf_remote_host

Are they present in kibana at the end ? Why not ? Is there some change in Elasticsearch, and why ?

Answers

Key points

  • The _ppf_xxx fields are not directly in the kibana document because only the 'log' field was used, as a json document.
  • BUT notice some additional fields anyway ? have a look at the input.punch resource to understand
  • the _ppf_id is very useful to use as id of the Elastic document, because it avoids duplicates if we replay the logs from a retention !!!!
  • did you notice that your 'technology' field value is strange ? try adding the 'meta' section in your punchline

Here is the example file ```yaml {

version: "6.0" runtime: storm meta: { vendor: training_inc technology: secret_training_device } dag: [ { type: syslog_input settings: { load_control: none listen: { proto: tcp host: 0.0.0.0 port: 9999 } } publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host ] } ] } { type: punchlet_node component: processing settings: { punchlet: [ punchlets/common/input.punch punchlets/common/parsing_syslog_header.punch ] } subscribe: [ { component: syslog_input stream: logs } ] publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host

1
2
3
      ]
    }
]

} { type: punchlet_node settings: { punchlet_code: "{ print(root); }" } subscribe: [ { component: processing stream: logs } ] } { type: elasticsearch_output settings: { cluster_id: es_data per_stream_settings : [ { stream: logs index : { type : daily prefix : cedricvf-events- } document_json_field : log document_id_field: _ppf_id additional_document_value_fields : [ { type : tuple_field document_field : @timestamp tuple_field : _ppf_timestamp } ] } ] } subscribe: [ { component: processing stream: logs } ] } ] } ```

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
        ```java tab="3b-parse-index-syslog-meta.hjson"
        {
          version: "6.0"
          runtime: storm
          meta: {
            vendor: training_inc
            technology: secret_training_device
          }
          dag: [
            {
              type: syslog_input
              settings: {
                load_control: none
                listen: {
                    proto: tcp
                    host: 0.0.0.0
                    port: 9999
                }
              }
              publish: [
                {
                  stream: logs
                  fields: [
                    log
                    _ppf_timestamp
                    _ppf_id
                    _ppf_local_port
                    _ppf_remote_host
                  ]
                }
              ]
            }
             {
            type: punchlet_node
            component: processing
            settings: {
              punchlet: [
                punchlets/common/input.punch
                punchlets/common/parsing_syslog_header.punch
              ]
            }
            subscribe: [
                  {
                    component: syslog_input
                    stream: logs
                  }    ]
            publish: [
                {
                  stream: logs
                  fields: [
                    log
                    _ppf_timestamp
                    _ppf_id
                    _ppf_local_port
                    _ppf_remote_host

                  ]
                }
            ]
          }
            {
              type: punchlet_node
              settings: {
                punchlet_code: "{ print(root); }"
              }
              subscribe: [
                  {
                    component: processing
                    stream: logs
                  }
              ]
            }
            {
            type: elasticsearch_output
            settings: {
            cluster_id: es_data
            per_stream_settings : [
              {
                stream: logs
                index : { 
                    type : daily
                    prefix : cedricvf-events-
                }
                document_json_field : log
                document_id_field: _ppf_id
                additional_document_value_fields : [
                  { 
                      type : tuple_field
                      document_field : @timestamp
                      tuple_field : _ppf_timestamp
                  }
                ]
              }
            ]
            }
            subscribe: [ 
                {
                    component: processing
                    stream: logs
                  }
            ]
          }
          ]
        }
        ```

Step 7 : Error handling

What happens if a punchlet fails to correctly parse the incoming data ? Most often you

  • cannot afford to block the whole processing until someone comes and fixes the proccessing chain to successfully parse this special case
  • cannot afford to loose your data by dropping unparsed data and going for the next input.

Instead you will want to deviate it to some error queue or index so that you can reprocess them later.

This is why error handling is required, and is a key topic to understand in order to go production. It is also a good opportunity to keep investigating what you can do with a punchline dag, streams and fields.

Refer to this documentation for quick overview of error handling.

Let us update our punchline as follows (Click to unfold):

```hjson {

version: "6.0" runtime: storm meta: { vendor: training_inc technology: secret_training_device } dag: [ { type: syslog_input settings: { load_control: none listen: { proto: tcp host: 0.0.0.0 port: 9999 } } publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host

1
2
3
      ]
    }
  ]

} { type: punchlet_node component: processing settings: { punchlet: [ punchlets/common/input.punch punchlets/common/parsing_syslog_header.punch ] } subscribe: [ { component: syslog_input stream: logs } ] publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
      ]
    }
    {
    stream: _ppf_errors
    fields: [
      _ppf_id
      _ppf_error_document
      _ppf_error_message
      _ppf_timestamp
      _ppf_channel
      _ppf_tenant
      log
    ]
  }
]

} { type: elasticsearch_output settings: { cluster_id: es_data per_stream_settings : [ { stream: logs index : { type : "daily", prefix : "cedricvf-events-" } document_json_field : log document_id_field: _ppf_id additional_document_value_fields : [ { type : tuple_field document_field : @timestamp tuple_field : _ppf_timestamp } ] } { stream : _ppf_errors index : { type : "daily", prefix : "cedricvf-events-" } document_json_field : _ppf_error_document document_id_field: _ppf_id additional_document_value_fields : [ { type : "tuple_field", document_field : "@timestamp", tuple_field : "_ppf_timestamp" } { type : "tuple_field", document_field : "error_message", tuple_field : "_ppf_error_message" } { type : "tuple_field", document_field : "channel", tuple_field : "_ppf_channel" } { type : "tuple_field", document_field : "tenant", tuple_field : "_ppf_tenant" } { type : "tuple_field", document_field : "message", tuple_field : "log" }

1
2
3
4
5
6
7
8
9
    ]
   }
]
}
subscribe: [ 
      { stream: "logs", component: "processing" }
      { stream: "_ppf_errors", component: "processing" }
  ]
}

] } ```

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
        ```java tab="4-index-with-errors.hjson"
        {
          version: "6.0"
          runtime: storm
          meta: {
            vendor: training_inc
            technology: secret_training_device
          }
          dag: [
            {
              type: syslog_input
              settings: {
                load_control: none
                listen: {
                    proto: tcp
                    host: 0.0.0.0
                    port: 9999
                }
              }
              publish: [
                {
                  stream: logs
                  fields: [
                    log
                    _ppf_timestamp
                    _ppf_id
                    _ppf_local_port
                    _ppf_remote_host

                  ]
                }
              ]
          }
          {
            type: punchlet_node
            component: processing
            settings: {
            punchlet: [
                punchlets/common/input.punch
                punchlets/common/parsing_syslog_header.punch
            ]
            }
            subscribe: [
                {
                  component: syslog_input
                  stream: logs
                }
            ]
            publish: [
                {
                  stream: logs
                  fields: [
                    log
                    _ppf_timestamp
                    _ppf_id
                    _ppf_local_port
                    _ppf_remote_host

                  ]
                }
                {
                stream: _ppf_errors
                fields: [
                  _ppf_id
                  _ppf_error_document
                  _ppf_error_message
                  _ppf_timestamp
                  _ppf_channel
                  _ppf_tenant
                  log
                ]
              }
            ]
          }
          {
            type: elasticsearch_output
            settings: {
            cluster_id: es_data
            per_stream_settings : [
                {
                stream: logs
                index : { type : "daily", prefix : "cedricvf-events-"  }
                document_json_field : log
                document_id_field: _ppf_id
                additional_document_value_fields : [
                  { 
                      type : tuple_field
                      document_field : @timestamp
                      tuple_field : _ppf_timestamp
                  }
                ]
              }
              {
                stream : _ppf_errors
                index : { type : "daily", prefix : "cedricvf-events-" }
                document_json_field : _ppf_error_document
                document_id_field: _ppf_id
                additional_document_value_fields : [
                  {   type : "tuple_field", document_field : "@timestamp",  tuple_field : "_ppf_timestamp"  }
                  {   type : "tuple_field", document_field : "error_message",  tuple_field : "_ppf_error_message"  }
                  {   type : "tuple_field", document_field : "channel",  tuple_field : "_ppf_channel"  }
                  {   type : "tuple_field", document_field : "tenant",  tuple_field : "_ppf_tenant"  }
                  {   type : "tuple_field", document_field : "message",  tuple_field : "log"  }

                ]
               }
            ]
            }
            subscribe: [ 
                  { stream: "logs", component: "processing" }
                  { stream: "_ppf_errors", component: "processing" }
              ]
            }
          ]
        }
        ```

As you can see, our processing node now publishes two streams and our elasticsearch node is subscribing to both of them. One is used for regular (correctly parsed) data, the other for handling errors.

Now inject some misconstructed log to your input port :

  echo "hello world" > /dev/tcp/127.0.0.1/9999

And see the result:

  • on the console
  • ...in which index ?

Question

Could have we used two distinct Elasticsearch output nodes ? I.e. one for regular logs and one for error logs ? If yes, discuss with the trainer what is the best option.

Step 8 : Monitoring

Refer to this documentation to understand why monitoring is important when configuring a punchline

As an exercice, let us add a monitoring reporter to our punchline. This will activate the generation of valuable performance metrics, and sends them to the configured desitnation. For example, let us simply request to print out these metrics to standard output. Add the following to your punchline:

metrics:
  reporters:
  - type: console
    reporting_interval: 10

As you can see, each 10s some metric logs are printed to stdout. These are very important to monitor punchlines health and performance.

You have many reporters available, check this documentation for the complete list.

Step 9: Go to production !

Did you see the warning for production ?

In the logs, you may have seen some warning about possible data loss...

Have a look at a "reference" configuration example of a punchline for indexing logs. Do you see yet more differences for production ?

Answers (among others)
  • need explicit setting to activate "lossless" mode of Elasticsearch Bolt
  • in production, target index name is computed at processing stage (so that in case of replay, the index names match the contained event times)
  • memory settings, max tuples pending in the punchline, batching settings... for optimal ES indexing
  • metrics reporting: every minute or several minutes (to reduce metrics amount)
  • metrics reporting: through kafka to avoid throughput problems (optimez metrics indexing)
  • raw log ('message') is a separate field, so that we can archive/extract it or forward it easily. We index it as an additional field.

Going further

Tip

If you want to go further in understanding the punchlines, do not hesitate to follow the Punchline Getting Starting section,

When you feel ready, read all reference architecture punchline examples from documentation