Skip to content

Topologies

Overview

Think of a topology as an input-processing-output hop. It takes data from some source using Spouts, process it using bolts, and output the resulting data using output bolts. This is illustrated next;

image

Note

Storm does not differentiate processing bolts from output bolts. In the PunchPlatform some bolts (Kafka, Elasticsearch, syslog bolts) are dedicated to output while others (the punch, cep bolts) are dedicated to processing.

The next chapter provides more detailed information on each part.

Topology Configuration

A topology is described by an json which chains spouts and bolts and provides all the required configuration properties. The overall structure of a topology configuration file is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
  # the name of the tenant. 
  "tenant" : "mytenant",

  #the name of the channel
  "channel" : "apache",

  # a short name for this topology
  "name" : "parsing",

  #an array of spouts. 
  "spouts" : [
    ...
  ],

  # an array of bolts
  "bolts" : [
    ...
  ],

  # The per topology wide storm settings. All properties starting 
  # with "topology." overide the ones defined in the storm.yaml 
  # cluster wide defaults.
  "storm_settings" : {
    "topology.max.spout.pending" : 2000,
    "topology.enable.message.timeouts": true,
    "topology.message.timeout.secs" : 30,
    "topology.worker.childopts": "-Xms256m -Xmx256m",
    "topology.spout.wait.strategy": "backtype.storm.spout.SleepSpoutWaitStrategy",
    "topology.sleep.spout.wait.strategy.time.ms": 1,
    "topology.workers" : 1,
    ...
  }

Note the storm settings section. You can use it to set important storm properties such as the size of the jvms, the metrics publish rates, etc.. Defaults are defined in the storm cluster wide storm.yaml configuration file.

Understanding Streams and Fields

Understanding this section is key for understanding topologies. Please read it carefully.

In a Storm topology, the data flow is implemented using streams and fields. This is clearly explained in the storm documentation. Here is an extract:

A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion. Streams are defined with a schema that names the fields in the stream 's tuples. By default, tuples can contain integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays.

These concepts are illustrated next:

image

In the PunchPlatform you are free to create your own topologies, however to benefit from important features such as data replay or automatic topology supervision, your topologies may include a few special stream and fields.

Example

We will illustrates this with a log management use case. Say you receive logs such as

1
"31/Dec/2015:01:00:00 user=bob"

The following picture illustrates how such a log is received or read by a spout, and emit in the topology transformed into a two-fields storm tuple. Only one bolt has subscribed to that spout to keep the example simple.

image

The job of a spout is to insert the data in the topology. It should execute as little processing as possible, because most often it cannot be parallelized. If data transformation must take place it is better to do it in a bolt. In this example the spout emits the logs in a unique stream. The next bolt will thus subscribe to that stream.

The fields in that stream are used to allow bolts to subscribe to the data, and/or to express the need to receive all data items according to some specific fields. In our example, it is very simple: the complete log is put insided a JSON document, emitted as the field from the , where it stores a timestamp.

Note

That timestamp is not mandatory and presented here to illustrate our example. This said, it is used in production systems to flag the log with either the time the log entered a Punchplatform, or its creation time by some equipment or application. This timestamp is very useful for monitoring or log replay. Inserting it as a dedicated storm field makes it easy for any downstream bolt to have it at hand.

The bolt in our example topology will receive a Storm tuple with these two fields. In Storm a bolt is a piece of java or python code, in charge of processing the fields content, then emit them (or not) to the next bolt(s). In the PunchPlatform such a bolt typically runs a punchlet to do that. A punchlet makes it extra simple to process the data. First, it receives the data as a JSON document representing the Storm stream and field structure. In this example it would be:

1
2
3
4
5
6
7
8
{
  "logs": {
    "log": {
      "message": "31/Dec/2015:01:00:00 user=bob"
    },
    "time": 1389184553774
  }
}

The punchlet can then parse/enrich the data only by manipulating the JSON document. In this example, a punchlet extracted the timestamp and the user values from the part, and encoded them as dedicated JSON fields. The result is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
  "logs": {
    "log": {
      "message": "31/Dec/2015:01:00:00 user=bob",
      "timestamp": "31/Dec/2015:01:00:00",
      "user": "bob"
    },
    "time": 1389184553774
  }
}

And this is what is emitted downstream the topology to the next bolts. This is the reason the PunchPlatform is so simple to code : you interact with Storm streams and fields only by manipulating a JSON document.

Multi Streams Topologies

Tuples can be sent on more than one streams. For example to generate an alert in a separate stream, a punchlet can generate something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "logs": {
    "log": {
      "message": "31/Dec/2015:01:00:00 user=bob",
      "timestamp": "31/Dec/2015:01:00:00",
      "user": "bob"
    },
    "time": 1389184553774
  },
  "alert": {
    "user": "bob has been detected !"
  }
}

It is that simple. However for this to work the topology must correctly chain spouts and bolts to subscribe to the required streams and fields. This is explained below.

Spouts and Bolts Configuration Principles

In a topology json file, each spout and bolt is described in a dedicated json section. In there there are two sub-sections : one to set the specific bolt properties, and one to set the storm related properties.

Here is an example of a syslog spout:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
  # the type is mandatory. It specifies the type of storm 
  # component valid values are "syslog_spout", "kafka_spout", 
  # "punch_bolt", "kafka_bolt", "elasticsearch_bolt", etc ...
  "type" : "syslog_spout",

  # this (syslog) spout specific settings. Refer to the syslog 
  # spout documentation for a complete description of each 
  # properties. 
  "spout_settings" : {
      # the listening address(es). Valid protocols 
      # are "tcp", "udp" or "lumberjack"
        "listen" : [
          { "proto" : "tcp" , "host" : "0.0.0.0", "port" : 9999 }
        ]
    }

    # this section is required and similar for all types of 
    # spouts or bolts.
    "storm_settings" : {

      # the number of threads for running this spout. 
      "executors": 1,

      # the storm name of this component. Bolts further the chain
      # interested in  data from this spout must specify this name.
      "component" : "syslog_spout_tcp",

      # This spout will emit data to the "logs" stream. Each data 
      # will have two fields: "log" and "time"
      "publish" : [ 
        { 
          "stream" : "logs", 
          "fields" : ["log", "time"] 
        }
      ] 
    }
  }

A bolt is similar, except it also has a subscribe declaration. Here is an example of a Punch bolt subscribing to the syslog spout just described.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
  # the Punch bolt executes a punchlet on the stream of data
  "type" : "punch_bolt",
  "bolt_settings" : {
    # check the documentation for details about loading punchlets
    "punchlet" : "standard/common/parsing_syslog_header.punch"
  }
  "storm_settings" : {
    "executors": 1,
    "component" : "punchlet_1",
    # it subscribes to the syslog tcp spout. Note how it expresses 
    # the stream it subscribes to. 
    "subscribe" : [ 
      { 
        "component" : "syslog_spout_tcp", 
        "stream" : "logs", 
        "grouping": "shuffle"
      } 
    ],

    # this bolt will in turn will emit the data further the topology, 
    # using the same stream and fields usage. 
    # Note that it is mandatory to list the possible fields emitted 
    # by the bolt. 
    "publish" : [ 
      { 
          "stream" : "logs", 
          "fields" : ["log", "time"] 
      } 
    ]
  }
}

Reserved Streams

Using streams and fields you have complete control over your pipeline. In addition the punchplatform lets you use reserved streams. These are used to forward metrics and errors.

Refer to the following javadocs for a descriptions of the various reserved streams and fields.

Topology Monitoring

Monitoring is a key benefit of using the punchplatform. Topologies are monitored by two means :

External monitoring services monitor the resource usage of each topology : kafka topic backlong, cpu, memory etc..

In contrast internal monitoring services make topologies publish their own metrics to an elasticsearch backend. Internal monitoring relies on punch metric librarires used from within spouts and bolts.

Both external and internal metrics ends up being indexed and stored in an elasticsearch backend.

Note

check the rationale of using Elasticsearch as metrics backend in Metrics and MonitoringGuide. Also Refer to Spouts and Bolts for more details on metrics associated to each component.

Monitoring Metrics

A topology publishes metrics so that it can be finely monitored. A topology publishes two sorts of metrics:

  • per (bolt or spout) component metrics : each component publishes specific metrics

    Refer to each spout or bolt documentation

  • per (bolt or spout) component uptime metrics

    In addition each component publishes a so-called uptime metrics that makes it easy to keep track of process restart

  • per java process system metrics

    A topology is composed of potentially several workers. Each worker publishes useful metrics to monitor the memory, garbage collection or thread usage of each process.

The principle to activate these metrics is simply to enrich the topology configuration file with a "metrics" section. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "metrics": {
    #
    # activate the publishing of per jvm process metrics (heap and non heap memory usage)
    # Possible values are:
    # "jvm.memory" : to activate memory related metrics
    # "jvm.gc" : to activate gc related metrics
    # "jvm.threadstates" : to activate thread usage related metrics
    # "jvm.pools" : to activate buffer pools related metrics,
    #
    "jvm.memory" : true,

    #
    # List the destination metric reporters
    #
    "reporters": [
      {
        "type": "elasticsearch",
        "cluster_name": "es_search"
      },
      {
        "type": "logger",
        "reporting_interval": 5
      }
    ]
  }
}

Metrics Reporters

Each topology can be configured with one or several metric reporters, in charge of publishing the generated metrics to a given destination. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
    "spouts": [
        ...
    ],
    "bolts": [
        ...
    ],
    "metrics": {
        "reporters": [
            {
                "type": "elasticsearch",
                "cluster_name": "es_search"
            },
            {
              "type": "logger",
              "format": "kv",
              "reporting_interval": 5
            }
        ]
    },
    "storm_settings": {
        ...
    }
}

In the above example, the topology forwards its metrics to both an Elasticsearch cluster and a log file.

Please refer to the metric reporters section to cover all the existing possibilities.

Traversal Time

Among the many published metrics, one of them deserves a particular attention as it is extremly useful to track the end-to-end traversal time of your data, across one or several Kafka hops. Each spout can be configured to perioidcally generate a traversl time tracking message that will be published onto a reserved stream. Here is an example to activate this every 30 seconds:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
  "spouts": [
    {
      "type": "some_spout",
      "spout_settings": {
        "self_monitoring.activation": true,
        "self_monitoring.period": 30,
        ...
      },
      "storm_settings": {
        "component": "your_component_id",
        "publish": [
          {
            ...
          },
          {
            "stream": "_ppf_metrics",
            "fields": [
              "_ppf_latency"
            ]
          }
        ]
      }
    },
    ...
  ],
  ...
}

This settings makes the spout publish a special message on the stream _ppf_metrics with a single field _ppf_latency. It is then up to you to have your bolts subscribe to that stream/field and forward it to where you need, possibly across Kafka or lumberjack hops.

Custom Metrics tags

To ease dashboarding, and to allow metrics selection based on user-provided tags, you can include metrics tags in spout or bolt settings. These are key/values, that will be automatically be appended as context tags in the published metrics.

Here is an example:

1
2
3
4
5
6
7
"spout_settings": {
  ...
  "self_monitoring.activation": true,
  "metrics_tags": {
    "technology": "{{channel.vendor}}"
  }
}

Topology advanced settings

Tuple acknowledgements

By default Storm is resilient. All tuples are acknowledged by bolts, and these Acknowledgments are reported back to the originating spout. This is however not needed on some of your topologies. In particular front end topologies (the LTR input topologies for examples), do not need it.

Because it has a CPU costs, it is best to disable it if you do not need it. To do that include the following in the storm settings section of your topology:

1
2
3
4
5
"storm_settings": {
  ...
  "topology.acker.executors": 0,
  ...
}

JVM memory setting

The memory allocated to each topology process is defined in the [storm _settings] section of the topology configuration file.

1
2
3
4
5
"storm_settings": {
  ...
  "topology.worker.childopts": "-Xms512m -Xmx512m",
  ...
}

How much do you need ? There are two uses cases. Topologies that only process and forwards data, (such as Log Parsing or forwarding topologies) require little memory. Make sure you give enough memory so that the number of pending tuples (20000 in our example) will fit in.

Be careful though : the more spouts and bolts you have in a topology, the more memory and CPU you need.

Stateful topologies performing aggregations require much more RAM, it is up to you to estimate the requirement in function of the batch size and or period.

Warning

if your topology use big resource files, set your memory accordingly. If you experience OutOfMemory errors in test or production, it means you do not have enough.

Overload control

Overload control is an important parameter to understand. If not setup correctly, the topologies may experience Tuple failures and retry, resulting in a under-performant behavior.

The native Storm mechanism based on the number of pending tuples allowed per spout in a topology. For example with the following setup:

1
2
3
4
5
"storm_settings": {
  ...
  "topology.max.spout.pending": 2000,
  ...
}

You allow only 2000 messages (i.e. tuple) in a topology. If your topology contains a bolt that can only cope with less than 2000 messages per seconds, you will be quick to reach that number of pending message and Storm will stop process incoming data. The way Storm stops processing data is simply to stop requesting data from Spout. That means that your (say) socket or Kafka spout will stop reading data, resulting in slowing down you socket client, or filling your Kafka backlog.

Note that at high rate, even if your output bolt can cope with 2000 messages/seconds, you are likely to accumulate more in the topology. As a rule of thumb allocate three to five time the maximum rate your topology is expected to cope with.

Topology timeout

Timeout control is an important parameter to understand. If not setup correctly, the topologies may experience Tuple failures and retry, resulting in a under-performant behavior.

1
2
3
4
5
6
"storm_settings": {
  ...
  "topology.enable.message.timeouts": true,
  "topology.message.timeout.secs" : 10,
  ...
}

In this example, the timeout is enabled and the value is set at 10 seconds. It means a tuple will fail if no acknowledgment is received by the spout in the next 10 seconds. The tuple will be send again by the spout.

Usually, 30 or 60s is sufficient for a topology between two kafka cluster. If you send data through a slow network, index data in a large elasticsearch cluster or compress a large amount of logs before write them to a block storage 120 or 240s is a better configuration.

Scale a topology in multiple worker

To increase performance, a topology can be configured to run in a multiple process. The following example show how to configure a topology to run in two workers:

1
2
3
4
5
"storm_settings" : {
  ...
  "topology.workers": 2,
  ...
}