Punchlines¶
Abstract
This track covers the fundamentals of punchlines.
First of all, be sure to read the punchline value proposition chapter.
Overview¶
Stream or Batch Data Processing¶
A punchline is an application that processes data.
- streaming punchline runs forever and is typically used to continuously collect, transform and/or saves the data from various sources up to some sink(s).
- Batch punchlines also process data but run on a finite dataset. Batch punchlines starts, do what they do, then ends. There are many use cases for batch punchlines: aggregations, extractions, machine learning etc..
The beauty of the punch is that you have a single format to deal with both kinds of punchlines. You define a directed graph of nodes. Go through the Punchlines documentation where this is explained clearly.
Designing a Streaming Punchline¶
We will now design and run a simple punchline. We will start wimple with a punchline dag made of two nodes : one node accepting incoming data onto a tcp socket, and another one to simply print out that data. We will then elaborate and make it more realistic and useful.
Along with these exercices, it will be helpful to you to refer to the Storm Punchline documentation.
Important
This track leverages a (so-called) storm punchline. A storm-like punchline is one that uses the data flow model defined by apache storm, a popular stream processing framework. It does not however means that it actually run in a storm cluster. The important popint is the streaming model defined by storm that is still an excellent model to design powerful streaming apps.
Step 1 : A Simple Punchline¶
Let us write a simple punchline composed of:
- a syslog input node
- and a punch node running a punch language instruction:
{ print(root); }
key elements for the exercise
- dag stands for Direct Acyclic Graph. A punchline is modeled as a set of nodes linked together with a publish-subscribe relationship. Each node is one element in the dag array.
-
A node is characterized by a type publish and/or subscribe parameters, and settings.
-
to run an inlined punchlet (the above root printer punchlet), you must provide punchlet_code setting to the punch node, instead of the punchlet node setting.
- the punch node must subscribe to the a stream that your syslog input node publishes
This dag and node documentation cover these concepts in detail.
The streams and fields configuration of each node explicitely defines the data flow. Input nodes only publish data, processing nodes (most often) subscribe to and publish some data. Output nodes publish data to a data sink.
Exercise
Using your favorite file editor, create a file called `punchline_storm.yaml* based on: - the yaml structure of a punchline, - example syslog input/punch nodes reference documentation - and above hints.
a Solution
version: '6.0'
runtime: storm
dag:
- type: syslog_input
settings:
load_control: none
listen:
proto: tcp
host: 0.0.0.0
port: 9999
publish:
- stream: logs
fields:
- log
- type: punchlet_node
settings:
punchlet_code: '{ print(root); }'
subscribe:
- component: syslog_input
stream: logs
Was the 'load_control' setting useful there ?
Key information
Nodes subscribe to streams by a source component name and by a stream name.
By default, each node has a 'component name' which is the same as its type.
But of course if you have multiple nodes of same type, then you have to give them a specific
component name yourself by adding the optional component
attribute to the node (not to its 'settings' section, in the node itself)
Step 2 : Execute your Punchline¶
Once your standalone is installed and started, simply run the following command to execute your punchline in foreground mode :
punchlinectl -t mytenant start -p punchline_storm.yaml
Question
Have a look at the output logs. Can you make sense of of them ?
Step 3 : Inject some data¶
Your punchline is now running, let us inject some data into it using the punchplatform-log-injector.sh
tool. That tool is part of the punch and can
sends whatever needed test/arbitrary data to various destination with flexible protocol settings.
As for other punch tools, you can refer to the manual page for usage of the logs injector.
In a new terminal, simply run :
punchplatform-log-injector.sh -c $PUNCHPLATFORM_CONF_DIR/resources/injectors/examples/syslog_injector.json -p 9999
As you can see your punchline is printing each received logs to the terminal standard output.
Question
Can you understand the content of that injector file ?
Key points
- with this tool you can test load/performance, inject error, ... so test sets must be developed for all your parsers and pipelines
- payloads are usually provided as raw strings, double-quoted (with doublequotes and newlines escaped as
\"
and '\n') - some fields tags can be inserted in the payloads, to have some randomization of data, and to have consistent timestamp with the realtime
- supported protocols are tcp, udp, lumberjack (including options for compression, ssl, lumberjack frame encoding )
- you can override procotol, target host, port, number of logs, max sending rate with command-line options
- in tcp and lumberjack, you will have flow-control, so the displayed rate is the max throughput of network/target punchline
Step 4 : Adding a parsing node¶
Let us now improve our punchline. As an exercice, insert a new punch node that subscribes to the syslog input node, run some log parsers (aka punchlets) and publish the result to our printer punch node.
To help you, here is how you can reference punchlets files in the punch node settings. I.e. instead of executing an inline piece of code, you reference punchlet files.
Location of punchlets resources
- When relative punchlet files path is provided (which is usual), the implicit root is the 'resources/punch' folder of your tenant folder)
So, if you are integrating a new tenant, ensure you store the needed resource in your tenant resources !
(For this training, you can just use the resources/punch
folder of the 'mytenant' example folder from standalone)
- note that resources can also be located in the channel folder,
by using a special
%{channel}%/
path start (see Punch Node configuration).
type: punchlet_node
component: processing
settings:
punchlet:
- punchlets/common/input.punch
- punchlets/common/parsing_syslog_header.punch
subscribe:
- FILL THIS
publish:
- FILL THIS
Your goal is to rerun your punchline, inject some data, and understand what you see to standard output.
Question
Are you at ease with the punchlet concept ? How would you describe what a punchlet is ? An application ? a service ? a function ?
Some possible answers
- punchlets can act as filters, as transformation functions, as data enrichers
-
punchlets can produce zero or many documents upon processing one entry document, and they can send them to various output streams of the punch node,
So they can be alerters, filters, switches/routers, formaters for output data...
-
more complex punchlets can be stateful (accumulate counters, group session data)
Step 5 : Sending output to Elasticsearch¶
We now have an interesting punchline. Input logs are read from a tcp socket, then parsed into json documents. Let us now index these documents into the elasticsearch database.
Your next goal is to add an Elasticsearch Ouput node that subscribes to the parsing punchlet_node stream of data, and in turn index it into Elasticsearch.
Key naming conventions for Elastic indices
If you are doing this training on a shared platform, then work inside a 'tenant' dedicated to YOUR tests, and customize your target Elasticsearch indices
so that they start with
The 'tenant name' part is a requirement for housekeeping of your tenant data through the inbuilt service The '-events-' following part indicates the kind of data, and is used for applying standard fields types mapping on your indice.
To help you here follows the Elastisearch output node essential configuration:
Platform specifics
Be careful, your target elasticsearch cluster may differ if you run on a non-standalone platform.
In this case have a look at your platform configuration (installed at deployment time):
jq . $PUNCHPLATFORM_PROPERTIES_FILE
jq '.elasticsearch.clusters|keys' $PUNCHPLATFORM_PROPERTIES_FILE
type: elasticsearch_output
settings:
cluster_id: es_search
per_stream_settings:
- stream: logs
index:
type: daily
prefix: mytenant-events-
document_json_field: log
additional_document_value_fields:
- type: date
document_field: '@timestamp'
format: iso
subscribe:
- FILL THIS
Step 6 : Check the results in Elasticsearch¶
Relaunch your punchline:
punchlinectl -t mytenant start -p punchline_storm.yaml
Then in a new terminal, inject some data :
punchplatform-log-injector.sh -c $PUNCHPLATFORM_CONF_DIR/resources/injectors/examples/syslog_injector.json -p 9999
Check that your logs have been correctly inserted into Elasticsearch :
curl localhost:9200/_cat/indices/mytenant*?v
An Elasticsearch index starting with mytenant-events-
must have been created with some documents in it.
Tip
Use Kibana to have a look at the Discover tab so better visualise the documents. Take a minute to understand what you see, the structure of the document, and compare it to the standard output trace of yoru punchline. Understanding the end to end processing from the log emitting up to its indexing into Elasticsearch must be completly clear to you.
Question
Discuss briefly with the trainer the way the json document produced by the punchlet_node is forwarded to Elasticsearch. Is there a one to one mapping between stream and fields (that are punchline streming concept) and the output document sent to Elasticsearch ?
Key points
Usually, the json document is copied from 1 field from the stream. Some other fields can be used (for the document id, for the targetted index name, for additional fields )
Optional Exercise
Now publish some more fields as published output of your 'syslog_input' and 'processing' nodes:
- _ppf_timestamp
- _ppf_id
- _ppf_local_port
- _ppf_remote_host
Are they present in kibana at the end ? Why not ? Is there some change in Elasticsearch, and why ?
Answers
Key points
- The _ppf_xxx fields are not directly in the kibana document because only the 'log' field was used, as a json document.
- BUT notice some additional fields anyway ? have a look at the input.punch resource to understand
- the _ppf_id is very useful to use as id of the Elastic document, because it avoids duplicates if we replay the logs from a retention !!!!
- did you notice that your 'technology' field value is strange ? try adding the 'meta' section in your punchline
Here is the example file ```yaml {
version: "6.0" runtime: storm meta: { vendor: training_inc technology: secret_training_device } dag: [ { type: syslog_input settings: { load_control: none listen: { proto: tcp host: 0.0.0.0 port: 9999 } } publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host ] } ] } { type: punchlet_node component: processing settings: { punchlet: [ punchlets/common/input.punch punchlets/common/parsing_syslog_header.punch ] } subscribe: [ { component: syslog_input stream: logs } ] publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host
1 2 3 |
|
} { type: punchlet_node settings: { punchlet_code: "{ print(root); }" } subscribe: [ { component: processing stream: logs } ] } { type: elasticsearch_output settings: { cluster_id: es_data per_stream_settings : [ { stream: logs index : { type : daily prefix : cedricvf-events- } document_json_field : log document_id_field: _ppf_id additional_document_value_fields : [ { type : tuple_field document_field : @timestamp tuple_field : _ppf_timestamp } ] } ] } subscribe: [ { component: processing stream: logs } ] } ] } ```
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
|
Step 7 : Error handling¶
What happens if a punchlet fails to correctly parse the incoming data ? Most often you
- cannot afford to block the whole processing until someone comes and fixes the proccessing chain to successfully parse this special case
- cannot afford to loose your data by dropping unparsed data and going for the next input.
Instead you will want to deviate it to some error queue or index so that you can reprocess them later.
This is why error handling is required, and is a key topic to understand in order to go production. It is also a good opportunity to keep investigating what you can do with a punchline dag, streams and fields.
Refer to this documentation for quick overview of error handling.
Let us update our punchline as follows (Click to unfold):
```hjson {
version: "6.0" runtime: storm meta: { vendor: training_inc technology: secret_training_device } dag: [ { type: syslog_input settings: { load_control: none listen: { proto: tcp host: 0.0.0.0 port: 9999 } } publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host
1 2 3 |
|
} { type: punchlet_node component: processing settings: { punchlet: [ punchlets/common/input.punch punchlets/common/parsing_syslog_header.punch ] } subscribe: [ { component: syslog_input stream: logs } ] publish: [ { stream: logs fields: [ log _ppf_timestamp _ppf_id _ppf_local_port _ppf_remote_host
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
} { type: elasticsearch_output settings: { cluster_id: es_data per_stream_settings : [ { stream: logs index : { type : "daily", prefix : "cedricvf-events-" } document_json_field : log document_id_field: _ppf_id additional_document_value_fields : [ { type : tuple_field document_field : @timestamp tuple_field : _ppf_timestamp } ] } { stream : _ppf_errors index : { type : "daily", prefix : "cedricvf-events-" } document_json_field : _ppf_error_document document_id_field: _ppf_id additional_document_value_fields : [ { type : "tuple_field", document_field : "@timestamp", tuple_field : "_ppf_timestamp" } { type : "tuple_field", document_field : "error_message", tuple_field : "_ppf_error_message" } { type : "tuple_field", document_field : "channel", tuple_field : "_ppf_channel" } { type : "tuple_field", document_field : "tenant", tuple_field : "_ppf_tenant" } { type : "tuple_field", document_field : "message", tuple_field : "log" }
1 2 3 4 5 6 7 8 9 |
|
] } ```
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
|
As you can see, our processing node now publishes two streams and our elasticsearch node is subscribing to both of them. One is used for regular (correctly parsed) data, the other for handling errors.
Now inject some misconstructed log to your input port :
echo "hello world" > /dev/tcp/127.0.0.1/9999
And see the result:
- on the console
- ...in which index ?
Question
Could have we used two distinct Elasticsearch output nodes ? I.e. one for regular logs and one for error logs ? If yes, discuss with the trainer what is the best option.
Step 8 : Monitoring¶
Refer to this documentation to understand why monitoring is important when configuring a punchline
As an exercice, let us add a monitoring reporter to our punchline. This will activate the generation of valuable performance metrics, and sends them to the configured desitnation. For example, let us simply request to print out these metrics to standard output. Add the following to your punchline:
metrics:
reporters:
- type: console
reporting_interval: 10
As you can see, each 10s some metric logs are printed to stdout. These are very important to monitor punchlines health and performance.
You have many reporters available, check this documentation for the complete list.
Step 9: Go to production !¶
Did you see the warning for production ?
In the logs, you may have seen some warning about possible data loss...
Have a look at a "reference" configuration example of a punchline for indexing logs. Do you see yet more differences for production ?
Answers (among others)
- need explicit setting to activate "lossless" mode of Elasticsearch Bolt
- in production, target index name is computed at processing stage (so that in case of replay, the index names match the contained event times)
- memory settings, max tuples pending in the punchline, batching settings... for optimal ES indexing
- metrics reporting: every minute or several minutes (to reduce metrics amount)
- metrics reporting: through kafka to avoid throughput problems (optimez metrics indexing)
- raw log ('message') is a separate field, so that we can archive/extract it or forward it easily. We index it as an additional field.
Going further¶
Tip
If you want to go further in understanding the punchlines, do not hesitate to follow the Punchline Getting Starting section,
When you feel ready, read all reference architecture punchline examples from documentation