Skip to content

Complex Event Processing

Overview

This chapter explains how to use the PunchPlatform for various Complex Event Processing (CEP) uses cases. To do so, the PunchPlatform relies on Siddhi rule engine. It provides a simple but extremely powerful query language (called Siddhi QL) to define complex rules directly deployed in your data streams.

Before going any further, let's get started with the basics: a simple and well known example of stock market events.

image

Here is an extract of input logs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
timestamp=21/Feb/2015:00:49:33 symbol=THALES price=22
timestamp=21/Feb/2015:00:49:34 symbol=CISCO price=27
timestamp=21/Feb/2015:00:49:36 symbol=SAMSUNG price=14
timestamp=21/Feb/2015:00:49:37 symbol=AAPL price=30
timestamp=21/Feb/2015:00:49:39 symbol=THALES price=26
timestamp=21/Feb/2015:00:49:40 symbol=CISCO price=22
timestamp=21/Feb/2015:00:49:42 symbol=SAMSUNG price=18
timestamp=21/Feb/2015:00:49:43 symbol=AAPL price=15
timestamp=21/Feb/2015:00:49:45 symbol=THALES price=18
timestamp=21/Feb/2015:00:49:46 symbol=CISCO price=15
timestamp=21/Feb/2015:00:49:48 symbol=SAMSUNG price=23
timestamp=21/Feb/2015:00:49:49 symbol=AAPL price=20
timestamp=21/Feb/2015:00:49:51 symbol=THALES price=10

For example, let's say you would like to compute the average price of "THALES" stocks based on stock price greater than 10 only.

If you are familiar with traditional SQL queries, you would express it this way:

1
select symbol, AVG(price) as price_average from Stocks where symbol = "THALES" and price > 10; 

To express the same query using Siddhi SQL, you write it that way:

1
select symbol, avg(price) from Stocks[symbol=='AAPL' and price > 10]#window.time(10 min); 

As you can see, the syntax is pretty close. The main difference comes from the "window" keyword. Because we are working with streams and events, most of query are linked to a time window or a number of events. The idea is to make that statement traversed by the stream of items. Whenever a item matches the statement, a new event is fired.

Getting Started

Let's go a little deeper in the Siddhi Query Language with more advanced usages. For example, you want to monitor the state of a battery in real-time. Each second, you receive as input one JSON event per battery's sensor. The input looks like that:

1
2
3
4
5
{
  "deviceId": "DzX9d57q",
  "type": "Li-ion",
  "level": 86.2
}

You want to detect when you battery are not healthy anymore and trigger some event/alert when it happens. For exemple, you want to known when:

  • the battery level of a device goes under 20%
  • the battery level strongly discreases, by more than 50% in less than 30 seconds

For the first rule, it could be expressed that way. We keep all the event over the last minute, compute the average level by device and trigger an event if the average is lower than 20%. We also prevent flooding alert events by limiting the output rate to only 1 event every 10 seconds.

1
2
3
4
5
6
7
8
9
define stream battery_metrics (deviceId string, type string, level float);

@info(name = 'low_battery')
from battery_metrics#window.time(1 min)
select deviceId, type, local_uuid, avg(level) as level_avg
group by deviceId
having level_avg < 20
output last every 10 sec
insert into alert_stream;

For the 2nd rule, here is what we can do.

1
2
3
4
5
6
7
8
9
define stream battery_metrics (deviceId string, type string, level float); 

@info(name = 'query')
from every ( e1 = battery_metrics )
  -> every ( e2 = battery_metrics[ level < (e1.level - 0.5) AND deviceId == e1.deviceId ])
  within 30 sec
select e2.deviceId, e2.level, e2.type
group by e2.deviceId
insert current events into alert_stream;

To have a better view of Siddhi capabilities, take a look at the official Siddhi Query Guide

Now that you are more confident with the core usage of Siddhi, let's see how to use it with the PunchPlatform. Currenlty, Siddhi rules can be setup in two different ways:

Leveraging the Punch Siddhi Operator

The simplest way to deploy and understand Siddhi rule is to rely on the Punch Siddhi operator.

To use a Siddhi rule named my_rule.rule within a Punchlet named my_punchlet.punch, you will write them this way:

my_rule.rule

1
2
3
4
5
6
7
define stream input (name string, temperature double);

@info(name = 'temperature')
from input[(temperature > 10 and temperature < 20) and name == 'sensor1']
select name, avg(temperature) as average, max(temperature) as max, min(temperature) as min
group by name
insert all events into output;

my_punchlet.punch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
  if (root:[logs]) {
    // Here you process a regular input tuple coming from a spout or a previous bolt
    siddhi()
      .forRule("my_rule")
      .forInputStream("input")
      .forOutputStream("output")
      .send(root:[logs][log]);
    print("-- input log");
    print(root);
  } else {
    // There, you only deal with Siddhi generated events.
    // The received event have a "output" top-leve key, the
    // same as the declared output stream.
    print("-- Siddhi trigger event");
    print(root);
  }
}

Now, to test it, the easiest way is to use HJSON Punchlet format. Everything will be merge into one single file.

You never heard about the HJSON format ? Take a look at the official website

my_complete_punchlet.punch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
{
    description:
    '''
    This example provide an easy way to filter your stream based on event values.

    In this case, we only keep event coming from "sensor1" with a temperature
    in the range 10 < x < 20 (degrees).
    '''

    tests: [
        {
            logs: {
                log: {
                    name: sensor1
                    temperature: 12.6
                }
            }
        },
        {
            logs: {
                log: {
                    name: sensor2
                    temperature: 99.99
                }
            }
        },
        {
            logs: {
                log: {
                    name: sensor1
                    temperature: 7.8
                }
            }
        },
        {
            logs: {
                log: {
                    name: sensor1
                    temperature: 32.0
                }
            }
        },
        {
            logs: {
                log: {
                    name: sensor1
                    temperature: 17.2
                }
            }
        }
    ]

    rules: {
        my_simple_filter: '''
            define stream input (name string, temperature double);

            @info(name = 'query')
            from input[(temperature > 10 and temperature < 20) and name == 'sensor1']
            select name, avg(temperature) as average, max(temperature) as max, min(temperature) as min
            group by name
            insert all events into output;
        '''
    }

    punchlet:
    '''
    {
        if (root:[logs]) {
            // Here you process a regular input tuple coming from a spout or a previous bolt
            siddhi()
                .forRule("my_simple_filter")
                .forInputStream("input")
                .forOutputStream("output")
                .send(root:[logs][log]);
            print("-- Input log:");
        } else {
            // There, you only deal with Siddhi generated events.
            print("-- The Siddhi callback:");
        }
    }
    '''
}

Feel free to copie/paste the code-block above into a file called my_complete_punchlet.punch. Now, run it using this command:

1
$ punchplatform-puncher.sh my_complete_punchlet.punch

The resulting output should look like this one:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
-- Input log:
{
  "logs": {
    "log": {
      "name": "sensor1",
      "temperature": 12.6
    }
  }
}
-- Input log:
{
  "logs": {
    "log": {
      "name": "sensor2",
      "temperature": 99.99
    }
  }
}
-- Input log:
{
  "logs": {
    "log": {
      "name": "sensor1",
      "temperature": 7.8
    }
  }
}
-- Input log:
{
  "logs": {
    "log": {
      "name": "sensor1",
      "temperature": 32
    }
  }
}
-- Input log:
{
  "logs": {
    "log": {
      "name": "sensor1",
      "temperature": 17.2
    }
  }
}
-- The Siddhi callback:
{
  "output": {
    "average": 12.6,
    "min": 12.6,
    "max": 12.6,
    "name": "sensor1"
  }
}
-- The Siddhi callback:
{
  "output": {
    "average": 14.899999999999999,
    "min": 12.6,
    "max": 17.2,
    "name": "sensor1"
  }
}

Congratulations !

That is official, you are now able to run any Siddhi rule on the PunchPlatform. If you want to go further, please visit the Official Query Guide to discover all the power and possibilities that Siddhi offers.

Leveraging the CEP Bolt (legacy)

Be careful

The CEP Bolt is not supported anymore. Please use Siddhi embedded within a Punchlet

Another way to use Siddhi is by using the dedicated bolt called "cep_bolt". To do so, we define a storm topology that will (typically) chain the following functions:

  1. A spout pull or receive the data, do some format adjustement if needed, and emit a stock event as Storm Tuple.
  2. A Cep bolt run the statement and emit the correlated events.
  3. An Elasticsearch bolt save these events in the index of your choice.

To see how to setup and configure a Cep bolt in a topology in practice, please refer to Cep Bolt configuration.