Skip to content

Punch Node

The punch node executes one or several punchlets on the fly. This node cannot be used to communicate with an external application, it necessarily is internal to a punchline. Its configuration looks like this :

{
  "type": "punchlet_node",
  "settings": {
    "punchlet": [
        "standard/common/input.punch",
        "standard/common/parsing_syslog_header.punch"
    ]
  },
  "component": "my_punch_bolt",
  "subscribe": [
      {
        "component": "kafka_input",
        "stream": "logs"
      }
  ],
  "publish": [
      {
        "stream": "logs",
        "fields": [
          "log", "_ppf_id", "_ppf_timestamp"
        ]
      },
      {
        "stream": "_ppf_errors",
        "fields": [
          "_ppf_id", "_ppf_error_document", "_ppf_error_message", "_ppf_timestamp"
        ]
      }
  ]
}

The key concept to understand it the relationship between the punchlet and the subscribed and published streams/fields. In this example your punchlet will receive punch tuples corresponding to the storm tuple received on the logs stream. After it returns your punchlet will generate an arbitrary punch tuple (it depends on the punchlet logic). But the punchbolt will only emit these tuples/fields that match the publish section.

Info

make sure you understand input nodes and inner nodes stream and field fundamental concepts.

The _ppf_errors stream and its fields is explained below in the dedicated error handling section.

Punchlets

The punchlet property refer to the punchlet(s) you want to execute in the node. You can execute one or several punchlets as explained hereafter.

Single Punchlet

Here is the simplest example of a single inline punchlet. It will simply print out the traversing tuple. Inline puchlets are useful in development mode.

{
  "type": "punchlet_node",
  "settings": {
    "punchlet_code": "{ print(root); }"
  }
}
Here is how to refer to a punchlet file. Note here the use of an array.
{
  "type": "punchlet_node",
  "settings": {
    "punchlet": [
      "example/parsing.punch"
    ]
  }
}

The punchlet property is a path, relative to the $PUNCHPLATFORM_CONF_DIR/resources/punch folder by default. Here it is expected from you to provide a $PUNCHPLATFORM_CONF_DIR/resources/punch/example/parsing.punch punchlet.

Sequence of Punchlets

It is common to require a sequence of punchlets to be executed on your traversing data. You can do that using one PunchNode per punchlet. But a more efficient solution consists in chaining them all in the same PunchNode. This avoids extra serialisation between PunchNodes and can save up considerable cpu resources. The way you do this is very simple :

if one of the punchlets in there raise an exception, the following ones are skipped.

You can chain punchlets one after the other. This is the recommended settings as it runs the punchlets with optimal performance.

{
  "type": "punchlet_node",
  "settings": {
      "punchlet": [
        "standard/common/input.punch",
        "standard/common/parsing_syslog_header.punch",
        "standard/apache_httpd/parsing.punch",
    ]
  }
}

Info

if one of the punchlets in there raise an exception, the subsequent ones are skipped.

Important

Using such sequence, your punchlets are executed from within the same thread. No serialisation price is incurred to pass from one to the next. If you use several executors in your node settings, as many independent thread will be used. Each sequence of thread is thus completely independent and share no context.

Graphs of Punchlet

The punchlet node supports a direct acycly dag configuration that lets you chain several stages of punchlets, each stages being executed in a pool of threads. Here is an example :

      "type": "punchlet_node",
      "component": "punchlet_dag",
      "settings": {
        "punchlet_dag": [
          { 
            "punchlet" : [
              "standard/common/first_punchlet.punch", 
              "standard/common/second_punchlet.punch"
            ],
            "executors" : 5
          },
          { 
            "punchlet" : [
              "standard/common/third_punchlet.punch" ,
              "standard/common/fourth_punchlet.punch"
            ],
            "executors" : 2,
            "affinity" : "logs.log.id"
          }
        ]
      }
    },

With this configuration :

  • the first and second punchlets are executed in 5 independent threads. Each thread will run the sequence of the two punchlets. The incoming node traffic is dispatched among these 5 threads.
  • the third and fourth punchelts are executed similarly in two threads. Besides, each pair of (third and fourth) punchlets will receive the input traffic with an affinity sticking strategy based on the [logs][log][id] field value. I.e. all tuples with the same [logs][log][id] value will be directed to the same thread so that the third or fourth punchlets can perform some stateful processing such as correlation rules, or anything that demands that all the same tuples go to the same punchlet.

Info

This is really the same directed acyclic graph concept than the punchline one. The difference is simply that it is is executed in a lightweight local thread engine instead of going from one punchline node to another. The benefit of this mode is to avoid extra tuple serialization. The performance gain can be significant. The only limitation of such embedded dags is they can be executed only as part of a single unix process.

Resources

You can provide external resources to your punchlets. This is useful for enrichment, routing an many other use cases you can invent. Here is a simple example of a punchlet that uses an external resource provided in a local or json file users.json:

{
  Tuple t = getResourceTuple("users");
}

In this chapter we explain how you can provide such remote resources.

Embedded Resources

The simplest mode of providing resources (tables...) for the punchlets to use consist of embedding your resource as part of the punch node task object itself, before sending it to the execution cluster.

Danger

This mode will serialize the resource content as part of your node, in turn transfered to the target execution engine.

  • In Storm cluster mode, this basically means that the resource are part of the generated jar submitted for execution.
  • In Shiva cluster mode, the tenant configuration is uploaded to the execution server with each task, through a kafka message (max allowed : around 20 MB)

Hence, use this mode only for small resource files. For big resource files use the fetch punch operator that lets you load resources from external locations, and allows optional dynamic update.

Here is an example:

{
  "type": "punchlet_node",
  "settings": {
    #
    # arbitrary JSON files. 
    #
    "punchlet_json_resources": [
      "standard/apache_httpd/http_codes.json",
      "standard/apache_httpd/taxonomy.json"
    ],

    #
    # Siddhi rules
    #
    "punchlet_rule_resources": [
      "standard/common/detection.rule"
    ],

    #
    # your additional grok patterns. All files in there will be read and expected
    # to contain grok pattern declaraion, one by line.
    #
    "punchlet_grok_pattern_dirs": [
      "%{channel}%/patterns"
    ],

    "punchlet": [
      "standard/common/input.punch",
      "standard/common/parsing_syslog_header.punch",
      "standard/apache_httpd/parsing.punch",
      "standard/apache_httpd/enrichment.punch",
      "standard/apache_httpd/normalization.punch",
      "standard/apache_httpd/correlation.punch"
    ]
  }
}

Whenever you use a relative path (i.e. not starting with '/'), resources will be loaded from the $PUNCHPLATFORM_CONF_DIR/tenants/%{tenant}%/resources/punch root folder. If you prefer storing resource relative to the channel folder you can use the special %{channel}% placeholder.

You can also use the %{tenant}% placeholder to refer to the top of your tenant configuration folder. %{tenant}%/resources/punch/standard/common/input.punch is equivalent to standard/common/input.punch.

Here is an example. Say your taxonomy.json file is located under:

$PUNCHPLATFORM_CONF_DIR/tenants/mytenant/channels/apache/resources/taxonomy.json

HUse the following settings:

{
  "type": "punchlet_node",
  "settings": {
    "punchlet_json_resources": [
      "%{channel}%/resources/taxonomy.json"
    ]
  }
}

Last you can use absolute file name. Using that mode the file will be search directly using the provided path.

Warning

Use absolute path with caution, preferably only for development, since these files will likely not be stored in your platform configuration directory.

Controlling the Ouput stream and fields

Single Tuple

By default, the PunchNode expects the punchlet(s) to output an output document with a two level layout :

{
  "logs" : 
    { 
      "message" : "hello world"
    },
  "alert" : {
      "criticity" : "high",
      "code" : 23
  }
}
The PunchNode will then emit the corresponding tuples onto the "logs" and "alert" streams.

Note however that fields are converted to Json or Strings. In some case you may want your fields to be sent as is. For example if you generate :

{
  "logs" : 
    { 
      "array" : [1,2,3]
    }
}
You can request the PunchNode to send the "array" field as a native (java) array of long, instead of going through a json or string representation. To do that use the field_publishing_strategy

      {
        "type": "punchlet_node",
        "component" : "punchlet_node",
        "settings": {
          # Possible values are "tuple", "string" or "json"
          # default is json. 
          "field_publishing_strategy":"tuple",
          "punchlets": [...]
        }
        ...
      }

This is useful if you forward the tuple to a next ouput node that expects nativ field values, to map them directly into SQL columns or similar structures.

Warn

the tuple field_publishing_strategy requires you use only serializable data types.

Error Handling

A punchlet can raise an exception. Either explicitly or because it encounters a runtime error. Most often you cannot afford to loose the input data and must arrange to get it back together with the exception information and forward it to a backend for later reprocessing. Doing that on the punchplatform is quite easy. Simply add an additional publish stream to indicate to the node to emit the error information in the topology:

{
  "type": "punchlet_node",
  "settings": {
    ...
  },
  "storm_settings": {
    "subscribe": [
      ...
    ],
    "publish": [
      {
        "stream": "logs",
        "fields": [
          "log",
          "_ppf_id",
          "_ppf_timestamp"
        ]
      },
      {
        "stream": "_ppf_errors",
        "fields": [
          "_ppf_id",
          "_ppf_error_document",
          "_ppf_error_message",
          "_ppf_timestamp"
        ]
      }
    ]
  }
}

The _ppf_errors stream and _ppf_error_document, _ppf_error_message fields are reserved. What this cause is the emitting of a single-field storm tuple in the topology, that you can handle the same way you handle regular data. It basically contain the exception message (that includes the input data). Because it is emitted just like any other data, you can arrange to have it forward up to the final destination you need to save it and reprocess it later : archiving, elasticsearch or any other.

Info

the generated error field is a ready to use json document. Most often you simply need to forward it to save it somewhere. If you would like to enrich or normalise its content in some ways, simply deploy a punchlet node that subscribes to it. Your punchlet will then be capable of changing its content. But in turn that punchlet should not fail. Bottom line : do this only if strictly required and if so pay extra attention to write a error handling punchlet that can never fail.

Additional fields can be published in error stream, that can be either copied from the input stream (any field name is supported, as long at it is present the subscribed stream), or generated by the Punch node :

  • _ppf_timestamp : the standard input timestamp (long number of milliseconds since 1/1/1970)
  • _ppf_id : the unique id (string) of the input document
  • _ppf_platform : the unique id of the punchplatform instance
  • _ppf_tenant : the tenant name of the current channel
  • _ppf_channel : the name of the channel containing the failed punchlet
  • _ppf_topology : the name of the topology containing the failed punchlet
  • _ppf_component : the component name of the PunchNode containing the failed punchlet
  • _ppf_error_message : the exception message or class that the punchlet raised at failure time.
  • _ppf_error_document : the JSON-escaped string document found when the punchlet fail occurred

Dealing with Timers

Some punchlets need to be regularly called even if no input data is received. The typical example are so-called stateful punchlets that need to make some internal structure flushed.

The way you can achieve that is be requesting the punch node to periodically invoke your punchlet with a special empty tuple. This tuple is referred to as a tick tuple. Here is how you make your punchlet receive an empty tuple every 5 seconds:

{
  "type": "punchlet_node",
  "settings": {
    "punchlet_tick_frequency" : 2,
    ...
  }

In you punchlet you can catch these empty tuple as simply as :

{
  if (root.isEmpty()) {
    // deal with your flush or expiration logic
    return;
  } else {
    // deal with a regular tuple
  }

Latency Tracking

Just like all nodes, you can subscribe to and publish the _ppf_metrics stream and _ppf_latency field to make your node part of the latency tracking path. Here is an example.

{
  "type": "punchlet_node",
  "settings": {
    ...
  },
  "subscribe": [
      {
        "component": "kafka_input",
        "stream": "logs"
      },
      {
        "component": "kafka_input",
        "stream": "_ppf_metrics"
      }
    ],
  "publish": [
      {
        "stream": "logs",
        ...
      },
      {
        "stream": "_ppf_errors",
        ...
      },
      {
        "stream": "_ppf_metrics",
        "fields": [
          "_ppf_latency"
        ]
      }
    ]
}

Info

the only special thing about _ppf_metrics stream and _ppf_latency field is that they do not traverse your punchlets. You do not have to explicitly protect your punchlet code logic to ignore these. Make sure you understand input nodes and inner nodes stream and field fundamental concepts.

node, i.e. as part of a single process. This mode is thus perfect for high-performance single-process/multi-threaded punchlines.

TroubleShooting and Debugging

If you write punchlet, make sure you are aware of the many resources to easily test them. Check the punch language documentation chapters.

A good trick to know should you have issues with stream/fields being not emitted the way you think is to add a small punchlet in the chain that simply prints out and forward the received data. That is easy by deploying an inline punchlet in a node you add to your topology :

{
    "type": "punchlet_node",
    "settings": {
        "punchlet_code": "{ print(root); }"
    },
        ...
}

Make sure you are aware of the following punch features, they dramatically ease your punchlet coding user experience :

  • inline punchlets
  • sublime text editor
  • punch log injector
  • the kibana inline plugin to execute grok and punchlets
  • punchlinectl.sh
  • topologies log levels