Complex Event Processing

In this chapter we explain how to use the PunchPlatform for interesting CEP (Complex Event Processing) uses cases.

Overview

The Punchplatform leverages the Siddhi rule engine. It provides you with a simple but extremely powerful query language to define complex rules that you can deploy directly in your data streams.

Siddhi rules can be setup in two different ways, directly as part of the Punch language, or through a dedicated Cep Bolt bolt. Before explaining how things are working

Let us use a simple and well known example to explain the basics: a stream of stock market events, as illustrated next.

../../../_images/CorrelationStockStream.png

This example is available using a demo channel. An injector resource ` stock_injector.json` file is available that you can use to send lines of logs looking like this:

timestamp=21/Feb/2015:00:49:33 symbol=THALES price=22
timestamp=21/Feb/2015:00:49:34 symbol=CISCO price=27
timestamp=21/Feb/2015:00:49:35 symbol=SIEMENS price=28
timestamp=21/Feb/2015:00:49:36 symbol=SAMSUNG price=14
timestamp=21/Feb/2015:00:49:37 symbol=AAPL price=30
timestamp=21/Feb/2015:00:49:38 symbol=IBM price=15
timestamp=21/Feb/2015:00:49:39 symbol=THALES price=26
timestamp=21/Feb/2015:00:49:40 symbol=CISCO price=22
timestamp=21/Feb/2015:00:49:41 symbol=SIEMENS price=11
timestamp=21/Feb/2015:00:49:42 symbol=SAMSUNG price=18
timestamp=21/Feb/2015:00:49:43 symbol=AAPL price=15
timestamp=21/Feb/2015:00:49:44 symbol=IBM price=22
timestamp=21/Feb/2015:00:49:45 symbol=THALES price=18
timestamp=21/Feb/2015:00:49:46 symbol=CISCO price=15
timestamp=21/Feb/2015:00:49:47 symbol=SIEMENS price=23
timestamp=21/Feb/2015:00:49:48 symbol=SAMSUNG price=23
timestamp=21/Feb/2015:00:49:49 symbol=AAPL price=20
timestamp=21/Feb/2015:00:49:50 symbol=IBM price=13
timestamp=21/Feb/2015:00:49:51 symbol=THALES price=10

Say you are interested in selecting from that stream only the AAPL stock items that make the AAPL last ten item average price be greater than some value. Using the Siddhi SQL, you write that as follows:

select * from StockTick(symbol='AAPL').win:length(10) having avg(price) > 10.0

The idea is to make that statement traversed by the stream of items. Whenever a item matches the statement, a new event is fired. In this case the new event is the result of select *, i.e. the new event will be the last AAPL that made the average price remain or exceed the value.

To do that with the PunchPlatform is easy. One way is to define a storm topology that will (typically) chain the following functions:

  1. a spout will pull or receive the data, do some format adjustement if needed, and emit a stock storm Tuple.
  2. a Cep bolt will run the statement, and will emit the correlated events.
  3. an Elasticsearch bolt will save your events in the Elasticsearch index of your choice.

Such a processing chain is illustrated next :

../../../_images/CorrelationStockTopologyLayout.png

To configure a Cep bolt in a topology, please refer to Cep Bolt configuration guide.

Another way is to directly embeds your rule in a punchlet. This is easier to configure and you have the full expressivness power of the punch language to format your input and output data.

Getting Started

The PunchPlatform standalone includes an correlation example. The principle is simple:

  • the input events are the battery level for different devices
  • the outputs are the raw events, and in addition alert events fired if one of the following situations occurs:
    • the battery level of a device drops down 20%
    • the battery level strongly discreases, by more than 50%, in less than 30 seconds

The PunchPlatform standalone includes an correlation example. The principle is simple:

  • the input events are the battery level for different devices
  • the outputs are the raw events, and in addition alert events fired if one of the following situations occurs:
    • the battery level of a device drops down 20%
    • the battery level strongly discreases, by more than 50%, in less than 30 seconds

Let us start by the correlation rules. The two rules are located in the resources/siddhi/cep folder.

//
// Generate an event whenever a battery level drops lower than some treshold
//
define stream logs (log string, local_uuid string, battery float, deviceId int);
@info(name = 'query') from logs[battery < 0.2] \
  select battery, log, local_uuid, avg(battery) \
  as battery_avg group by deviceId \
  insert into filteredStream
//
// Generate an event whenever a battery decrease within some time frame
// ends up loosing more than 50 % capacity
//
define stream logs (log string, local_uuid string, battery float, deviceId int);
@info(name = 'query') from every( e1 = logs ) \
  -> every( e2 = logs[ ((e1.battery - 0.5) >= battery) AND ( e1.deviceId == deviceId ) ]) within 30 sec \
select e2.battery, e2.log, e2.local_uuid group by e2.deviceId \
insert current events into correlationStream

Leveraging the Punch Siddhi Operator

The simplest way to deploy and understand Siddhi rule is to rely on the punch siddhi operator. Refer to the Checkout the Siddhi Rules chapter.

Leveraging the CEP Bolt

The PunchPlatform standalone includes an correlation example. The principle is simple:

  • the input events are the battery level for different devices
  • the outputs are the raw events, and in addition alert events fired if one of the following situations occurs:
    • the battery level of a device drops down 20%
    • the battery level strongly discreases, by more than 50%, in less than 30 seconds

In this example we actually combine two PunchPlatform features. First we use the Cep bolt to run the correlation rules and to emit alerts in real time. Second, we also want the platform to emit the corresponding alert(s) to an external supervisor. For the later we leverage the ElastAlert component.

Let us start by the correlation rules. The two rules are located in the resources/siddhi/cep folder.

//
// Generate an event whenever a battery level drops lower than some treshold
//
define stream logs (log string, local_uuid string, battery float, deviceId int);
@info(name = 'query') from logs[battery < 0.2] \
  select battery, log, local_uuid, avg(battery) \
  as battery_avg group by deviceId \
  insert into filteredStream
//
// Generate an event whenever a battery decrease within some time frame
// ends up loosing more than 50 % capacity
//
define stream logs (log string, local_uuid string, battery float, deviceId int);
@info(name = 'query') from every( e1 = logs ) \
  -> every( e2 = logs[ ((e1.battery - 0.5) >= battery) AND ( e1.deviceId == deviceId ) ]) within 30 sec \
select e2.battery, e2.log, e2.local_uuid group by e2.deviceId \
insert current events into correlationStream

These rules are embedded in the Cep bolt, itself setup inside a Storm topology. The demo template will do all that for you. Let us start the demo track.

  1. Install the standalone with the elastalert option:
./install --with-elastalert -s
  1. Create an elastalert_status index in Kibana with match_id.ts as timestamp field. You must create that index before installation.
  2. Import manually demo dashboard inside Kibana. Use the Kibana UI for that. The dashboard is located in $PUNCHPLATFORM_CONF_DIR/resources/kibana/dashboards/cep/cep_battery_dashboard.json
  3. Create and start the cep_battery channel :
> punchplatform-channel.sh --configure tenants/mytenant/configurations_for_channels_generation/lmc_cep/battery_channel.json
> punchplatform-channel.sh --start mytenant/cep_battery
  1. Inject events :
> punchplatform-log-injector.sh -c resources/injector/mytenant/cep/battery_injector.json
  1. Observe the events and alerts in the Dashboard Alerts Kibana dashboard

Note

As you will see, two devices are emitting their battery level. The battery level of the first device strongly decreases and falls below 20%. Two alerts will be generated. The battery level of the second device decreases slowly and ends up falling below 20%. Only one one alert is raised.

Note the PunchPlatform stand-alone offers you another correlation example : a cybersecurity use-case with brute-force attack detection. Please launch:

> punchplatform-channel.sh --configure tenants/mytenant/configurations_for_channels_generation/lmc_cep/bruteforce_attack_channel.json
> punchplatform-channel.sh --start mytenant/cep_bruteforce
> punchplatform-log-injector.sh -c resources/injector/mytenant/cep/bruteforce_injector.json