Skip to content

Channels

Here is now the third concept, the channel. A channel groups several applications (called jobs) into a consistent and useful unit. Once you defined a channel, you can start or stop it. All its jobs will be started or stopped accordingly.

Channels can contain (of course) topologies and pmls. But they can also contain Logstash, administrative tasks and much more. Using channel you can model complete applications mixing stream, batch and arbitrary tasks. It is that simple and that powerful.

Stream processing

A job can be streaming or a batch application. Let us consider the simplest channels you can think of, composed of simple streaming jobs to parse continuously parse logs, received on a TCP socket, and indexed into Elasticsearch once transformed in a normalised and enriched json data. We will go through that usage now.

Start again from the configuration directory. You will find some ready to use log channels defined for the mytenant example tenant.

1
2
ls $PUNCHPLATFORM_CONF_DIR/tenants/mytenant/channels
admin  aggregation  apache_httpd  sourcefire  stormshield_networksecurity  universal  websense_web_security

As you probably guessed, each channel deals with the corresponding log types. Except for the universal channel that we will explain next. To start channels, launch the punchctl command line tool.

1
punchctl

You have multiple tenants? Use these options: punchctl --tenant mytenant or punchctl -t mytenant

From there you have autocompletion. All the commands are documented. Try the start --channel one. If you prefer to start the channel directly you can type in

1
2
3
punchctl:mytenant> start --channel apache_httpd
job:storm:apache_httpd/main/single_topology  (mytenant_apache_httpd_single_topology-7-1557926010) ....................... ACTIVE
job:storm:apache_httpd/main/archiving_topology  (mytenant_apache_httpd_archiving_topology-8-1557926011) ................. ACTIVE

Check their running status using

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
punchctl:mytenant> status
channel:stormshield_networksecurity ..................................................................................... STOPPED
channel:admin ........................................................................................................... STOPPED
channel:sourcefire ...................................................................................................... STOPPED
channel:aggregation ..................................................................................................... STOPPED
channel:websense_web_security ........................................................................................... STOPPED
channel:universal ....................................................................................................... STOPPED
job:storm:apache_httpd/main/archiving_topology  (mytenant_apache_httpd_archiving_topology-8-1557926011) ................. ACTIVE
job:storm:apache_httpd/main/single_topology  (mytenant_apache_httpd_single_topology-7-1557926010) ....................... ACTIVE
channel:apache_httpd .................................................................................................... ACTIVE

Feel free to explore the various punchctl commands, autocompletion and inline documentation are your friends. In particular it is important to understand:

1
punchctl:mytenant> status --help

Good to know you can also use non interactive variants. Hit Ctrl-C or Ctrl-D to exit the punchctl shell. Then simply type:

1
punchctl status

Once your channel(s) are running you can inject logs. To do that the punch provides you with an injector tool. You can start it by executing the command below:

1
punchplatform-log-injector.sh -c $PUNCHPLATFORM_CONF_DIR/resources/injector/mytenant/apache_httpd_injector.json

Your are now sending generated (fake) sourcefire logs to your channel. They will be parsed, normalised and indexed into Elasticsearch. Check Kibana (http://localhost:5601) and Storm UI (http://localhost:8080), to run the platform monitoring:

If you want to generate more types of logs simply type in the following command to start all injector file found at once:

1
punchplatform-log-injector.sh -c $PUNCHPLATFORM_CONF_DIR/resources/injector/mytenant

This will launch all the logs as defined in the $PUNCHPLATFORM_CONF_DIR/resources/injector/mytenant folder. When you are done, stop injection with Ctrl-C and stop your channel. To do that you can use again the punchctl interactive tool. You can also simply directly type in:

1
punchctl stop

Batch processing

Now that you are comfortable with streaming, let's move on to batch processing. We will run a continuous aggregation channel based on a PML Plan (Spark). This aggregation is executed each minute and fetch all the logs stored in the mytenant-events-* Elasticsearch index since the last minute. Here, by minute, we want to compute:

  1. how many bytes have been written to this index
  2. what was the size (in bytes) of the biggest log

Before running the aggregation, we need to provide some data. To do so, let's start two channels with the punchctl and inject some logs.

1
2
3
4
punchctl:mytenant> start --channel sourcefire
job:storm:sourcefire/main/single_topology  (mytenant_sourcefire_single-9-1557930620) .................................... ACTIVE
punchctl:mytenant> start --channel websense_web_security
job:storm:websense_web_security/main/single_topology  (mytenant_websense_web_security_single-10-1557930646) ............. ACTIVE

Now that channels are running, let's inject some logs:

1
punchplatform-log-injector.sh -c $PUNCHPLATFORM_CONF_DIR/resources/injector/mytenant

It is important to keep injecting logs in real time because the aggregation will only fetch the last minute's logs. Keep the log injector running and start a new terminal. From the new terminal, type in this command to start the aggregation:

1
2
3
# From another terminal
punchctl start --channel aggregation
job:shiva:aggregation/common/plan-example-1  (punchplatform/mytenant/channels/aggregation/plan-example-1) ............... ACTIVE

Wait about a 1 minute, the time for the first aggregation to be completed. Then, a new Elasticsearch index should shows up with this name mytenant-aggregations-YYYY.MM.DD. Add this new index pattern to Kibana and see the results. The documents have the following fields:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "_index": "mytenant-aggregations-2019.05.15",
  "_type": "_doc",
  "_id": "QmHvu2oBm9lH_e9QjytC",
  "_version": 1,
  "_score": null,
  "_source": {
    "total_size_value": 1013339,
    "max_size_value": 298,
    "key": "sourcefire",
    "timestamp": "2019-05-15T16:40:00.148+02:00"
  },
  "fields": {
    "timestamp": [
      "2019-05-15T14:40:00.148Z"
    ]
  },
  "sort": [
    1557931200148
  ]
}

As you can see, we get an overview of the total log size and the larger log size over the last minute sorted by technology vendor. Note that one event is generated by vendor each minute. The vendor can be found in the field "key". In this example, the vendor is "sourcefire".

To stop everything, run the following commands:

1
2
3
4
5
# first, stop the aggregation channel
punchctl stop --channel aggregation

# then, stop any existing channel
punchctl stop

Congratulation ! Now, you are ready for high performance stream or batch processing pipeline !