Skip to content

Cep Rules

Overview

In this chapter we explain the details of invoking Siddhi rules from within a punchlet. Siddhi is a complex event processing rule engine. Here is how it is depicted on their documentation:

image

Here is a rule example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// $PUNCHPLATFORM_CONF_DIR/examples/topologies/elasticsearch/aggregation.rule
//
// Aggregate events on 3 second batch windows, and generate a single event
// with the sum of the price of each event.
//
// The data is received data on a stream "input"
// Every 3 seconds output data is generated on stream "output"
//
define stream default (module string, name string, rtt int);

from default#window.lengthBatch(3)
select module, sum(rtt) as sum_rtt, name
insert all events into output;

As explained, rules are traversed by streams of data. Each stream is named. In our example the data is expected to arrive on the input stream. That data consists of 3-values tuples of type (resp.) string, string, and int. Should the rule match, the rule will emit a new event onto a new named data stream, here output. That output data consists in 3-values tuples of type (resp.) string, int and string.

With Siddhi rules you can express many (many) complex use cases from simple alerting to fraud detection including machine learning processing. Check out the Siddhi QL documentation.

Siddhi and Punch

Part of the work of deploying a Siddhi rule consists in preparing the input data, selecting the required fields, and formatting the output data. The punchplatform provides a Cep Bolt configuration guide. To use it you must design a topology and take care of sending the right storm stream and field to match the ones of your rule.

Using siddhi directly from within a punchlet is another option explained here. Using punch the pre and post processing is extremely easy to write, and requires a few lines of code.

In a punchlet you must simply feed the rule with input tuples that represent the rule input stream, and in turn collect the tuples generated on the output stream(s). The question though is : how do you get back the events generated by a siddhi rule ? Here is the magic : it is explained in the punch code that follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// $PUNCHPLATFORM_CONF_DIR/examples/topologies/elasticsearch/aggregation.punch
{
    if (root:[default]) {

        // you deal with the document received from the spout
        // In this example we further select a subset of the metricset metricbeat documents
        if (root:[default][doc][metricset][module] == "system" && root:[default][doc][metricset][name] == "load") {
            print("INPUT");
            print(root:[default][doc][metricset]);
            siddhi().forRule("aggregation")
              .forInputStream("default")
              .forQuery("query")
              .forOutputStream("output")
              .send(root:[default][doc][metricset]);
        }

    } else if (root:[output]) {

        // You deal with the siddhi rule generated events
        print("OUPUT");
        print(root);

    }
}

An example of topology using the previous punch:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// $PUNCHPLATFORM_CONF_DIR/examples/topologies/elasticsearch/elasticsearch_to_siddhi.json
{
  "spouts": [
    {
      "type": "elasticsearch_spout",
      "spout_settings": {
        "cluster_id": "es_search",
        "index": "metricbeat-6*",
        "query": "?q=beat.version:6*",
        "es.index.read.missing.as.empty": true
      },
      "storm_settings": {
        "component": "elasticsearch_spout"
      }
    }
  ],
  "bolts": [
    {
      "type": "punch_bolt",
      "bolt_settings": {
        "punchlet_rule_resources": [
          "%{conf}%/examples/topologies/elasticsearch/aggregation.rule"
        ],
        "punchlet": "./aggregation.punch"
      },
      "storm_settings": {
        "component": "siddhi_bolt",
        "subscribe": [
          {
            "component": "elasticsearch_spout"
          }
        ]
      }
    }
  ],
  "storm_settings": {
    "topology.worker.childopts": "-server -Xms128m -Xmx128m"
  }
}

To run this using the punchplatform-topology.sh, make sure you load your Siddhi rule as a visible resource:

1
$ punchplatform-topology.sh elasticsearch_to_siddhi.json

On a real punchplatform you will deploy your punchlet as part of a Storm topology. Checkout the Punch Storm integration chapter for instructions.

Performance Consideration

Running Siddhi rules from punchlets is extremely easy and powerful. However you pay some performance penalty because the (input and output) fields must be transformed back and forth from/into punch tuples. You do not have that extra processing using the CEP bolt.

You benefit however from a lot of flexibility to select what you emit downstream. You can use the punch language constructs to take care of additional enrichment or normalisation of your Siddhi rule output.