Punch Node¶
The punch node executes one or several punchlets on the fly. This node cannot be used to communicate with an external application, it necessarily is internal to a punchline. Its configuration looks like this :
{
"type": "punchlet_node",
"settings": {
"punchlet": [
"standard/common/input.punch",
"standard/common/parsing_syslog_header.punch"
]
},
"component": "my_punch_bolt",
"subscribe": [
{
"component": "kafka_input",
"stream": "logs"
}
],
"publish": [
{
"stream": "logs",
"fields": [
"log", "_ppf_id", "_ppf_timestamp"
]
},
{
"stream": "_ppf_errors",
"fields": [
"_ppf_id", "_ppf_error_document", "_ppf_error_message", "_ppf_timestamp"
]
}
]
}
The key concept to understand it the relationship between the punchlet
and the subscribed and published streams/fields. In this example your
punchlet will receive punch tuples corresponding to the storm tuple
received on the logs
stream. After it returns your punchlet will
generate an arbitrary punch tuple (it depends on the punchlet logic).
But the punchbolt will only emit these tuples/fields that match the
publish section.
Info
make sure you understand input nodes and inner nodes stream and field fundamental concepts.
The _ppf_errors
stream and its fields is explained below in the dedicated
error handling section.
Punchlets¶
The punchlet
property refer to the punchlet(s) you want to execute in the node.
You can execute one or several punchlets as explained hereafter.
Single Punchlet¶
Here is the simplest example of a single inline punchlet. It will simply print out the traversing tuple. Inline puchlets are useful in development mode.
{
"type": "punchlet_node",
"settings": {
"punchlet_code": "{ print(root); }"
}
}
{
"type": "punchlet_node",
"settings": {
"punchlet": [
"example/parsing.punch"
]
}
}
The punchlet
property is a path, relative to the $PUNCHPLATFORM_CONF_DIR/resources/punch
folder by default. Here it is expected from you to provide a $PUNCHPLATFORM_CONF_DIR/resources/punch/example/parsing.punch
punchlet.
Sequence of Punchlets¶
It is common to require a sequence of punchlets to be executed on your traversing data. You can do that using one PunchNode per punchlet. But a more efficient solution consists in chaining them all in the same PunchNode. This avoids extra serialisation between PunchNodes and can save up considerable cpu resources. The way you do this is very simple :
if one of the punchlets in there raise an exception, the following ones are skipped.
You can chain punchlets one after the other. This is the recommended settings as it runs the punchlets with optimal performance.
{
"type": "punchlet_node",
"settings": {
"punchlet": [
"standard/common/input.punch",
"standard/common/parsing_syslog_header.punch",
"standard/apache_httpd/parsing.punch",
]
}
}
Info
if one of the punchlets in there raise an exception, the subsequent ones are skipped.
Important
Using such sequence, your punchlets are executed from within the same thread. No serialisation price is incurred to pass from one to the next. If you use several executors in your node settings, as many independent thread will be used. Each sequence of thread is thus completely independent and share no context.
Graphs of Punchlet¶
The punchlet node supports a direct acycly dag configuration that lets you chain several stages of punchlets, each stages being executed in a pool of threads. Here is an example :
"type": "punchlet_node",
"component": "punchlet_dag",
"settings": {
"punchlet_dag": [
{
"punchlet" : [
"standard/common/first_punchlet.punch",
"standard/common/second_punchlet.punch"
],
"executors" : 5
},
{
"punchlet" : [
"standard/common/third_punchlet.punch" ,
"standard/common/fourth_punchlet.punch"
],
"executors" : 2,
"affinity" : "logs.log.id"
}
]
}
},
With this configuration :
- the first and second punchlets are executed in 5 independent threads. Each thread will run the sequence of the two punchlets. The incoming node traffic is dispatched among these 5 threads.
- the third and fourth punchelts are executed similarly in two threads. Besides, each pair of (third and fourth) punchlets will receive the input traffic with an affinity sticking strategy based on the
[logs][log][id]
field value. I.e. all tuples with the same[logs][log][id]
value will be directed to the same thread so that the third or fourth punchlets can perform some stateful processing such as correlation rules, or anything that demands that all the same tuples go to the same punchlet.
Info
This is really the same directed acyclic graph concept than the punchline one. The difference is simply that it is is executed in a lightweight local thread engine instead of going from one punchline node to another. The benefit of this mode is to avoid extra tuple serialization. The performance gain can be significant. The only limitation of such embedded dags is they can be executed only as part of a single unix process.
Resources¶
You can provide external resources to your punchlets. This is useful for enrichment, routing an many other use cases you can invent.
Here is a simple example of a punchlet that uses an external resource provided in a local or json file users.json
:
{
Tuple t = getResourceTuple("users");
}
In this chapter we explain how you can provide such remote resources.
Embedded Resources¶
The simplest mode of providing resources (tables...) for the punchlets to use consist of embedding your resource as part of the punch node task object itself, before sending it to the execution cluster.
Danger
This mode will serialize the resource content as part of your node, in turn transfered to the target execution engine.
- In Storm cluster mode, this basically means that the resource are part of the generated jar submitted for execution.
- In Shiva cluster mode, the tenant configuration is uploaded to the execution server with each task, through a kafka message (max allowed : around 20 MB)
Hence, use this mode only for small resource files. For big resource files use the
fetch
punch operator that lets you load resources from external locations, and allows optional dynamic update.
Here is an example:
{
"type": "punchlet_node",
"settings": {
#
# arbitrary JSON files.
#
"punchlet_json_resources": [
"standard/apache_httpd/http_codes.json",
"standard/apache_httpd/taxonomy.json"
],
#
# Siddhi rules
#
"punchlet_rule_resources": [
"standard/common/detection.rule"
],
#
# your additional grok patterns. All files in there will be read and expected
# to contain grok pattern declaraion, one by line.
#
"punchlet_grok_pattern_dirs": [
"%{channel}%/patterns"
],
"punchlet": [
"standard/common/input.punch",
"standard/common/parsing_syslog_header.punch",
"standard/apache_httpd/parsing.punch",
"standard/apache_httpd/enrichment.punch",
"standard/apache_httpd/normalization.punch",
"standard/apache_httpd/correlation.punch"
]
}
}
Whenever you use a relative path (i.e. not starting with '/'), resources will be loaded from the $PUNCHPLATFORM_CONF_DIR/tenants/%{tenant}%/resources/punch
root folder.
If you prefer storing resource relative to the channel folder you can use the special %{channel}%
placeholder.
You can also use the %{tenant}%
placeholder to refer to the top of your tenant configuration folder. %{tenant}%/resources/punch/standard/common/input.punch
is equivalent to
standard/common/input.punch
.
Here is an example. Say your taxonomy.json file is located under:
$PUNCHPLATFORM_CONF_DIR/tenants/mytenant/channels/apache/resources/taxonomy.json
HUse the following settings:
{
"type": "punchlet_node",
"settings": {
"punchlet_json_resources": [
"%{channel}%/resources/taxonomy.json"
]
}
}
Last you can use absolute file name. Using that mode the file will be search directly using the provided path.
Warning
Use absolute path with caution, preferably only for development, since these files will likely not be stored in your platform configuration directory.
Controlling the Ouput stream and fields¶
Single Tuple¶
By default, the PunchNode expects the punchlet(s) to output an output document with a two level layout :
{
"logs" :
{
"message" : "hello world"
},
"alert" : {
"criticity" : "high",
"code" : 23
}
}
Note however that fields are converted to Json or Strings. In some case you may want your fields to be sent as is. For example if you generate :
{
"logs" :
{
"array" : [1,2,3]
}
}
field_publishing_strategy
{
"type": "punchlet_node",
"component" : "punchlet_node",
"settings": {
# Possible values are "tuple", "string" or "json"
# default is json.
"field_publishing_strategy":"tuple",
"punchlets": [...]
}
...
}
This is useful if you forward the tuple to a next ouput node that expects nativ field values, to map them directly into SQL columns or similar structures.
Warn
the tuple field_publishing_strategy requires you use only serializable data types.
Error Handling¶
A punchlet can raise an exception. Either explicitly or because it encounters a runtime error. Most often you cannot afford to loose the input data and must arrange to get it back together with the exception information and forward it to a backend for later reprocessing. Doing that on the punchplatform is quite easy. Simply add an additional publish stream to indicate to the node to emit the error information in the topology:
{
"type": "punchlet_node",
"settings": {
...
},
"storm_settings": {
"subscribe": [
...
],
"publish": [
{
"stream": "logs",
"fields": [
"log",
"_ppf_id",
"_ppf_timestamp"
]
},
{
"stream": "_ppf_errors",
"fields": [
"_ppf_id",
"_ppf_error_document",
"_ppf_error_message",
"_ppf_timestamp"
]
}
]
}
}
The _ppf_errors
stream and _ppf_error_document
, _ppf_error_message
fields are reserved. What this cause is the emitting of a single-field
storm tuple in the topology, that you can handle the same way you handle
regular data. It basically contain the exception message (that includes
the input data). Because it is emitted just like any other data, you can
arrange to have it forward up to the final destination you need to save
it and reprocess it later : archiving, elasticsearch or any other.
Info
the generated error field is a ready to use json document. Most often you simply need to forward it to save it somewhere. If you would like to enrich or normalise its content in some ways, simply deploy a punchlet node that subscribes to it. Your punchlet will then be capable of changing its content. But in turn that punchlet should not fail. Bottom line : do this only if strictly required and if so pay extra attention to write a error handling punchlet that can never fail.
Additional fields can be published in error stream, that can be either copied from the input stream (any field name is supported, as long at it is present the subscribed stream), or generated by the Punch node :
_ppf_timestamp
: the standard input timestamp (long number of milliseconds since 1/1/1970)_ppf_id
: the unique id (string) of the input document_ppf_platform
: the unique id of the punchplatform instance_ppf_tenant
: the tenant name of the current channel_ppf_channel
: the name of the channel containing the failed punchlet_ppf_topology
: the name of the topology containing the failed punchlet_ppf_component
: the component name of the PunchNode containing the failed punchlet_ppf_error_message
: the exception message or class that the punchlet raised at failure time._ppf_error_document
: the JSON-escaped string document found when the punchlet fail occurred
Dealing with Timers¶
Some punchlets need to be regularly called even if no input data is received. The typical example are so-called stateful punchlets that need to make some internal structure flushed.
The way you can achieve that is be requesting the punch node to periodically invoke your punchlet with a special empty tuple. This tuple is referred to as a tick tuple. Here is how you make your punchlet receive an empty tuple every 5 seconds:
{
"type": "punchlet_node",
"settings": {
"punchlet_tick_frequency" : 2,
...
}
In you punchlet you can catch these empty tuple as simply as :
{
if (root.isEmpty()) {
// deal with your flush or expiration logic
return;
} else {
// deal with a regular tuple
}
Latency Tracking¶
Just like all nodes, you can subscribe to and publish the
_ppf_metrics
stream and _ppf_latency
field to make your node
part of the latency tracking path. Here is an example.
{
"type": "punchlet_node",
"settings": {
...
},
"subscribe": [
{
"component": "kafka_input",
"stream": "logs"
},
{
"component": "kafka_input",
"stream": "_ppf_metrics"
}
],
"publish": [
{
"stream": "logs",
...
},
{
"stream": "_ppf_errors",
...
},
{
"stream": "_ppf_metrics",
"fields": [
"_ppf_latency"
]
}
]
}
Info
the only special thing about _ppf_metrics
stream and _ppf_latency
field is that
they do not
traverse your punchlets. You do not have to explicitly protect your
punchlet code logic to ignore these. Make sure you understand
input nodes
and inner nodes
stream and field fundamental concepts.
node, i.e. as part of a single process. This mode is thus perfect for high-performance single-process/multi-threaded punchlines.
TroubleShooting and Debugging¶
If you write punchlet, make sure you are aware of the many resources to easily test them. Check the punch language documentation chapters.
A good trick to know should you have issues with stream/fields being not emitted the way you think is to add a small punchlet in the chain that simply prints out and forward the received data. That is easy by deploying an inline punchlet in a node you add to your topology :
{
"type": "punchlet_node",
"settings": {
"punchlet_code": "{ print(root); }"
},
...
}
Make sure you are aware of the following punch features, they dramatically ease your punchlet coding user experience :
- inline punchlets
- sublime text editor
- punch log injector
- the kibana inline plugin to execute grok and punchlets
- punchlinectl.sh
- topologies log levels