public class PunchNode
extends org.thales.punch.libraries.storm.api.BaseProcessingNode
Let us start wit a simple yet complete example.
{
type" : "punch_bolt",
"bolt_settings" : {
"punchlet" : "standard/Apache_HTTP_Server/apache.punch"
},
"storm_settings" : {
"component" : "punch_bolt",
"publish" : [ { "stream" : "logs", fields" : ["log", "_ppf_timestamp", "_ppf_id"] }],
"subscribe" : [ { "component" : "kafka_spout", "stream" : "logs", "grouping": "localOrShuffle"}
}
}
The punchlet property refers to the punchlet you want to execute in the bolt.
You can have one or several that will then be executed in sequence.
{ "logs" : { "field1" : ..., "field2" : ..., "fieldn" : ... } }
Where field1..n represent the tuple fields emitted by the Kafka spout on stream "logs". The punchlet can transform that json document in many ways. It typically adds or removes fields, with the intent to forward these new fields downstream. To do that the punch bolt must declare the list of published fields. In our examples
{ "logs" : { "log" : ..., "timestamp" : ..., "uuid" : ... } }
Should the punchlet generate a document with more fields, these will be ignored and not emitted. Conversely should it generate a document without some of the declared published field, an empty value will be emitted fot that field.
Because a punchlet can act on both the stream part and the fields part of the document, a punchlet can generate new streams, can garbage a tuple, can generate several tuples for one input tuple. All in all a punchlet can basically do almost everything you can think of from simple stateless forwarding to new event generation (for example alerting).
Punchlets are determined using a path, relative to the $PUNCHPLATFORM_CONF_DIR/resources/punch/punchlet folder. Some punchlets require additional resource files, typically when they use the findByKey or findByInterval Punch operator. Others use siddhi rule that must equivalently be loaded. To add resource files to your punchlet proceed as follows:
{
"type" : "punch_bolt",
"bolt_settings" : {
"punchlet_json_resources" : [
"standard/Apache_HTTP_Server/enrichment.json"
],
"punchlet_rule_resources" : [
"standard/common/detection.rule"
],
"punchlet" : "standard/Apache_HTTP_Server/enrichment.punch"
},
"storm_settings" : { ... }
To make it easier to write, punch bolts may omit the declaration of that error stream. It will be implicitly added at topology build time. You can however decide to add that error stream explicitly should you prefer to deal with a complete explicit configuration file. Here is the same example than above with the explicit error declaration:
{
type" : "punch_bolt",
"bolt_settings" : {
"punchlet" : "standard/Apache_HTTP_Server/apache.punch"
},
"storm_settings" : {
"component" : "punch_bolt",
"publish" : [
{ "stream" : "logs", fields" : ["log", "_ppf_timestamp", "_ppf_id"] },
{ "stream" : "_ppf_errors", fields" : ["_ppf_error", "_ppf_error_message", "_ppf_timestamp", "_ppf_id"] },
],
"subscribe" : [ { "component" : "kafka_spout", "stream" : "logs", "grouping": "localOrShuffle"}
}
}
The error stream must be named "_ppf_errors".
Additional fields can be published in error stream, that can be either copied from the input stream (any field name is supported, as long at it is present in the subscribed stream), or generated by the PunchBolt :
Constructor and Description |
---|
PunchNode(org.thales.punch.libraries.storm.api.NodeSettings config,
PunchletConfig punchletConfig)
Constructor.
|
Modifier and Type | Method and Description |
---|---|
void |
cleanup() |
Map<String,Object> |
getComponentConfiguration()
We request from storm a regular callback.
|
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector) |
void |
process(org.apache.storm.tuple.Tuple tuple) |
public PunchNode(org.thales.punch.libraries.storm.api.NodeSettings config, PunchletConfig punchletConfig)
config
- the punch bolt settings.punchletConfig
- the punchlet configpublic void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void process(org.apache.storm.tuple.Tuple tuple)
process
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public Map<String,Object> getComponentConfiguration()
getComponentConfiguration
in interface org.apache.storm.topology.IComponent
getComponentConfiguration
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void cleanup()
cleanup
in interface org.apache.storm.task.IBolt
cleanup
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
Copyright © 2023. All rights reserved.