Cep Bolt

The Cep Bolt (Complex Event Processing Bolt) lets you insert Siddhi rules in your data streams. Refer to Siddhi, and SiddhiQl documentation.

To deploy a Siddhi rule, you must bring to the rule the right stream and fields. Notice that there is a natural correlation between Siddhi amd storm concepts. In fact the Siddhi rule engine has been designed for being integrated into Storm.

If your rule matches some condition, it will emit some new events. These will be emit in a corresponding Storm stream. You can then place some downstream bolts to take care of data formatting, additional processing, and finally plug some output bolt (a KafkaBolt or an ElasticsearchBolt) to forward the results to the right destination.

Streams And fields

Just like Storm, Siddhi works on data stream(s). The streams described in your rule must match the Storm streams subscribed by your Cep Bolt.

Correlation results will be emitted on the streams defined in the Siddhi rule. These output stream(s) can in turn be referred to by downstream bolts.

By default the Cep bolt also publishes the input data on the input stream, even if that input data does not match the rule condition. You can thus use a Siddhi rule as a proxy forwarding transparently a stream of data, and generating new stream(s) of data in case the rule condition matches.

Parameters

Bolt-level parameters

Parameter Mandatory Type Default Comment
rule no (if plan defined) String   Path of rule resource (prefixed by resources/siddhi) containing Siddhi plan (rule description). Mandatory if there is no inlined specified plan.
plan no (if rule defined) String   Siddhi plan (full rule description). You can specify an inline plan or use rule parameter to specify a rule path.

Configuration example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
 {
     "type": "cep_bolt",
     "bolt_settings": {
        "rule" : "my_siddhi_rule.rule"
     },
     "storm_settings": {
        "executors": 1,
        "component": "my_cep_bolt",
        "subscribe": [ {
           "component": "my_source",
           "stream": "my_source_stream",
           "grouping": "localOrShuffle"
        } ]
     }
  }

Rule example

//
// Generate an event whenever a battery level drops lower than some threshold
//
define stream input (local_uuid string, battery float, deviceId int);
@info(name = 'query') from logs[battery < 0.2] \
  select battery, local_uuid, avg(battery) \
  as battery_avg group by deviceId \
  insert into output

Refer to the Cep Getting Started section.

Limitation

Cep Bolt works on fields (Storm meaning). You cannot use nested fields in a correlation rule. When you want to use a nested field in a rule, use a punchlet in front of your Cep bolt to transform your nested field in a Storm field (do not forget to publish it).

Alternatively you can embed a Siddhi rule directly from within a punchlet. Refer to Siddhi Rules.