Topologies

Overview

A channel is made up of one or several storm topologies. The overall channel structure is described in a Channel Structure configuration file, explained in Channels. Each storm topology is described by an associated topology json file. This chapter focuses on these per topology configuration files, to give you a quick yet precise understanding on how the PunchPlatform makes you use Storm using simple json files.

Think of a topology as one input-processing-output hop. It takes data from some source using Spouts, process it using bolts, and output the resulting data using output bolts. This is illustrated next;

../../../_images/TopologySpoutsAndBolts.png

Note

Storm does not differentiate processing bolts from output bolts. In the PunchPlatform some bolts (Kafka, Elasticsearch, syslog bolts) are dedicated to output while others (the Punch and esper bolts) are dedicated to processing.

The next chapter provides more detailed information on each part.

Topology Configuration Structure

A topology configuration file chains spouts and bolts and provides all the required configuration preoperties. The overall structure of a topology configuration file is as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
    # the name of the tenant. 
    "tenant" : "mytenant",

    #the name of the channel
    "channel" : "apache",

    # a short name for this topology
    "name" : "parsing",

    #an array of spouts. 
    "spouts" : [
        {...}, ..., {...}
    ],
  
    # an array of bolts
    "bolts" : [
      {...}, ..., {...}
    ],

    # The per topology wide storm settings. All properties starting with "topology." 
    # overide the ones defined in the storm.yaml cluster wide defaults.
    # The values given here are production compatible. 
    "storm_settings" : {
        "metrics_consumers": [ "backtype.storm.metric.LoggingMetricsConsumer" ],
        "topology.builtin.metrics.bucket.size.secs": 30,
        "supervisor.monitor.frequency.secs" : 60,
        "topology.max.spout.pending" : 2000,
        "topology.enable.message.timeouts": true,
        "topology.message.timeout.secs" : 30,
        "topology.worker.childopts": "-Xms256m -Xmx256m",
        "topology.receiver.buffer.size": 32,
        "topology.executor.receive.buffer.size": 16384,
        "topology.executor.send.buffer.size": 16384,
        "topology.transfer.buffer.size": 32,
        "topology.worker.shared.thread.pool.size": 4,
        "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
        "topology.spout.wait.strategy": "backtype.storm.spout.SleepSpoutWaitStrategy",
        "topology.sleep.spout.wait.strategy.time.ms": 1,
        "topology.workers" : 1
    }
}

Note the storm settings section. You can use it to set important storm properties such as the size of the jvms, the metrics publish rates, etc.. Defaults are devinf in the storm cluster wide storm.yaml configuration file.

Understanding Streams and Fields

Warning

understanding this section is key for understanding the PunchPlatform. Please read it carefully.

In a Storm topology, the data flow is implemented using streams and fields. This is clearly explained in http://storm.apache.org/documentation/Concepts.html. Here is an extract: A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion. Streams are defined with a schema that names the fields in the stream’s tuples. By default, tuples can contain integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays. These concepts are illustrated next:

../../../_images/StreamsAndFieldsConcept.png

In the PunchPlatform you are free to use these concepts to create your own topologies, however to benefit from great features such as data replay or automatic topology supervision, your topologies must include a few special stream and fields.

Log Management Example

We will illustrates this with a log management use case. Say you receive logs such as “31/Dec/2015:01:00:00 user=bob”. The following picture illustrates how such a log is received or read by a spout, and emit in the topology transformed into a two-fields storm tuple. Only one bolt has subscribed to that spout to keep the example simple.

../../../_images/LogStreamsAndFields.png

The job of a spout is to insert the data in the topology. It should execute as little processing as possible, because most often it cannot be parallelized. If data transformation must take place it’s better to do it in a bolt. In this example the spout emits the logs in a unique “logs” stream. The next bolt will thus subscribe to that stream.

The fields in that stream are used to allow bolts to subscribe to the data, and/or to express the need to receive all data items according to some specific fields. In our example, it is very simple: the complete log is put insided a JSON document, emitted as the “log” field from the “logs” stream. The spout then adds a second field call “time”, where it stores a timestamp.

Note

That timestamp is not mandatory and presented here to illustrate our example. This said, it is used in production systems to flag the log with either the time the log entered a Punchplatform, or its creation time by some equipment or application. This time is very useful for monitoring or log replay. Inserting it as a dedicated storm field makes it easy for any downstream bolt to have it at hand.

The bolt in our example topology will receive a Storm tuple with these two fields. In Storm a bolt is a piece of java or python code, in charge of processing the fields content, then emit them (or not) to the next bolt(s). In the PunchPlatform such a bolt typically runs a punchlet to do that. A punchlet makes it extra simple to process the data. First, it receives the data as a JSON document representing the Storm stream and field structure. In this example it would be:

{
  "logs" : {
     "log" : {
        "message" : "31/Dec/2015:01:00:00 user=bob"
     },
     "time" : 1389184553774
  }
}

The punchlet can then parse/enrich the data only by manipulating the JSON document. In this example, a punchlet extracted the timestamp and the user values from the “message” part, and encoded them as dedicated JSON fields. The result is:

{
  "logs" : {
     "log" : {
        "message" : "31/Dec/2015:01:00:00 user=bob",
        "timestamp" : "31/Dec/2015:01:00:00",
        "user" : "bob"
     },
     "time" : 1389184553774
  }
}

And this is what is emitted downstream the topology to the next bolts.

Note

this is the reason the PunchPlatform is so simple to code : you interact with Storm streams and fields only by manipulating a JSON document.

Multi Streams Topologies

Tuples can be sent on more than one streams. For example to generate an alert in a separate stream, a punchlet can generate something like this:

{
  "logs" : {
     "log" : {
        "message" : "31/Dec/2015:01:00:00 user=bob",
        "timestamp" : "31/Dec/2015:01:00:00",
        "user" : "bob"
     },
     "time" : 1389184553774
  },
  "alert" : {
     "user" : "bob has been detected !"
  }
}

It is that simple. However for this to work the topology must correctly chain spouts and bolts to subscribe to the required streams and fields. This is explained below.

PunchPlatform Reserved Stream

The punchlatform uses a dedicated stream for monitoring purpose. Each spout and bolt can be configured to publish and subscribe to that stream in order to enable the platform end-to-end channel supervision. This is yet another use case of Storm streams. In a production system a topology looks like this:

../../../_images/CompleteStreamsAndFields.png

which makes it possible to send control data in topologies not traversing the various punchlets deployed in it, but still traversing the topology graph for testing and monitoring purpose.

Chaining Spouts and Bolts

Spouts and bolts sections contain two sections. One to set the specific bolt properties, and one to set the storm related properties. Here is an example of a syslog spout:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
    # the type is mandatory. It specifies the type of storm component
    # valid values are "syslog_spout", "kafka_spout", "punch_bolt",
    # "kafka_bolt", "esper_bolt", "elasticsearch_bolt", etc ...
    "type" : "syslog_spout",

    # syslog spout specific settings. Refer to the syslog spout javadoc
    # for a complete description of each properties. 
    "spout_settings" : {
        # the listening address(es). Valid protocols are "tcp", "udp" or "lumberjack"
      "listen" : [
        { "proto" : "tcp" , "host" : "0.0.0.0", "port" : 9999 }
      ],

      # an optional punchlet. It will be looked for from the $PUNCHPLATFORM_CONF_DIR
      # resources directory. 
      "punchlet" : "standard/common/input.punch"
    },

    # this section is required and similar for all types of spouts or bolts.
    "storm_settings" : {
        # the number of threads for running this spout. 
        "executors": 1,
        # the storm name of this component. Bolts further the chain
        # interested in  data from this spout must specify this name.
        "component" : "syslog_spout_tcp",

        # This spout will emit data to the "logs" stream. Each data will have
        # two fields: "log" and "time"
        "publish" : [ 
          { 
            "stream" : "logs", 
            "fields" : ["log", "time"] 
          }
      ] 
    }
}

A bolt is similar, except it also has a subscribe declaration. Here is an example of a Punch bolt subscribing to the syslog spout just described.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
    # the Punch bolt executes a punchlet on the stream of data
    "type" : "punch_bolt",
    "bolt_settings" : {
      # the punchlet is looked for in the $PUNCHPLATFORM_CONF_DIR
      # resources directory. 
      "punchlet" : "standard/common/parsing_syslog_header.punch"
    }
    },
    "storm_settings" : {
      "executors": 1,
      "component" : "punchlet_1",

      # it subscribes to the syslog tcp spout. Note how it expressed the stream 
      # it subscribes to. 
      "subscribe" : [ 
        { 
          "component" : "syslog_spout_tcp", 
          "stream" : "logs", 
          "grouping": "shuffle"
        } 
      ],

      # this bolt will in turn will emit the data further the topology, 
      # using the same stream and fields usage. 
      # Note that it is mandatory to list the possible fields emitted by the
      # bolt. 
      "publish" : [ 
        { 
            "stream" : "logs", 
            "fields" : ["log", "time"] 
        } 
      ],
    }
}

Working With Jinja Templates

Writing topology files is tedius and error prone. The PunchPlatform uses a templating mechanism to make it much simpler.

Hint

check out the samples delivered with the standalone platform. They illustrates a log management use case, and a log transport use case. In the following we rely on the log transport use case to explain how jinja templating works.

Start by defining a json file holding the main configuration properties of your channels. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{

	"tenant" : "ltr",
	"channel_name" : "forwarder",
	"channel_structure_profile" : "ltr",
	"vendor" : "thales",

	"forwarding" : [
		{ 
			"name" : "apache", 
			"input" : { "proto" : "tcp", "host" : "0.0.0.0", "port" : "8999" },
			"output" : { "proto" : "lumberjack", "host" : "localhost", "port" : "29999" }
		},
		{ 
			"name" : "aruba", 
			"input" : { "proto" : "tcp", "host" : "0.0.0.0", "port" : "17076" },
			"output" : { "proto" : "lumberjack", "host" : "localhost", "port" : "27076" }
		},
		{ 
			"name" : "handover", 
			"input" : { "proto" : "tcp", "host" : "0.0.0.0", "port" : "17506" },
			"output" : { "proto" : "lumberjack", "host" : "localhost", "port" : "27506" }
		},
		{ 
			"name" : "postfix", 
			"input" : { "proto" : "tcp", "host" : "0.0.0.0", "port" : "17651" },
			"output" : { "proto" : "lumberjack", "host" : "localhost", "port" : "27651" }
		}
	]
}

What is defined here are four proxy configuration from one input tcp port to an output lumberjack destination. That file represent the high level user view of a channel.

Next define template to produce the channel_structure and topology configuration file(s) you want. These are written using jinja notation. As an example here two template to produce one input topology in charge of listening on the input tcp port, and forwarding to a kafka broker, and a second one to take data from that kafka and forward it to the lumberjack destination.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
{
    "tenant" : "{{channel.tenant}}",
    "channel" : "{{channel.channel_name}}",
    "name" : "receiver",
    "spouts" : [
      {% for forwarding in channel.forwarding %}
      {
          "type" : "syslog_spout",
          "spout_settings" : { 
              "load_control" : "rate",
              "load_control.rate" : 500,
              "listen" : [
                {  
                  "proto" : "{{ forwarding.input.proto }}", 
                  "host" : "{{ forwarding.input.host }}", 
                  "port" : {{ forwarding.input.port }}
                }
              ]
          },
          "storm_settings" : {
            "executors": 1,
            "component" : "syslog_spout_{{ loop.index }}",
            "publish" : [ {"stream" : "logs", "fields" : ["log", "src_host", "src_port", "dst_host", "dst_port"] } ] 
          }
      }{%if loop.last == False %},{% endif %}{% endfor %}
    ],
    "bolts" : [
     {% for forwarding in channel.forwarding %}
      {
          "type" : "kafka_bolt",
          "bolt_settings" : {
              "brokers" : "local",
              "topic" : "{{channel.tenant}}.{{channel.channel_name}}.{{forwarding.name}}"
          },
          "storm_settings" : {
            "executors": 1,
            "component" :  "kafka_bolt_{{ loop.index }}",
            "subscribe" : [ 
              { 
                "component" : "syslog_spout_{{ loop.index }}", 
                "stream" : "logs", 
                "grouping": "localOrShuffle" 
              } 
            ] 
          }
      }{%if loop.last == False %},{% endif %}{% endfor %}
    ],
    "storm_settings" : {
        "metrics_consumers": [ "backtype.storm.metric.LoggingMetricsConsumer" ],
        "topology.builtin.metrics.bucket.size.secs": 30,
        "supervisor.monitor.frequency.secs" : 60,
        "topology.max.spout.pending" : 500,
        "topology.enable.message.timeouts": true,
        "topology.message.timeout.secs" : 30,
        "topology.worker.childopts": "-Xms350m -Xmx350m",
        "topology.receiver.buffer.size": 32,
        "topology.executor.receive.buffer.size": 16384,
        "topology.executor.send.buffer.size": 16384,
        "topology.transfer.buffer.size": 32,
        "topology.worker.shared.thread.pool.size": 4,
        "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
        "topology.spout.wait.strategy": "backtype.storm.spout.SleepSpoutWaitStrategy",
        "topology.sleep.spout.wait.strategy.time.ms": 50,
        "topology.workers" : 1
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
    "tenant" : "{{channel.tenant}}",
    "channel" : "{{channel.channel_name}}",
    "name" : "forwarder",
    "spouts" : [
      {% for forwarding in channel.forwarding %}
      {
          "type" : "kafka_spout",
          "spout_settings" : {
              "load_control" : "rate",
              "load_control.rate" : 500,
              "brokers" : "local",
              "topic" : "{{channel.tenant}}.{{channel.channel_name}}.{{forwarding.name}}"
          },
          "storm_settings" : {
              "executors": 1,
              "component" : "kafka_spout_{{ loop.index }}",
              "publish" : [ { "stream" : "logs", "fields" : ["log", "kafka_time"]}] 
          }
      }{%if loop.last == False %},{% endif %}{% endfor %}
    ],
    "bolts" : [
     {% for forwarding in channel.forwarding %}
      {
          "type" : "syslog_bolt",
          "bolt_settings" : {
              "acked" : true,
              "punchlet_code" : "{ ... }",
             
              "queue_flush_period" : 100,
              "queue_flush_size" : 100,
              "destination" : [
                  {  
                    "host" : "{{ forwarding.output.host }}", 
                    "port" : {{ forwarding.output.port }},  
                    "proto" : "{{ forwarding.output.proto }}" 
                  }
              ]
          },
          "storm_settings" : {
              "executors": 1,
              "component" :  "syslog_bolt_{{ loop.index }}",
              "subscribe" : [ 
                { 
                  "component" : "kafka_spout_{{ loop.index }}", 
                  "stream" : "logs", 
                  "grouping": "shuffle" 
                } 
              ]
          }
          
      }{%if loop.last == False %},{% endif %}{% endfor %}
    ],
    "storm_settings" : {
        "metrics_consumers": [ "backtype.storm.metric.LoggingMetricsConsumer" ],
        "topology.builtin.metrics.bucket.size.secs": 30,
        "supervisor.monitor.frequency.secs" : 60,
        "topology.max.spout.pending" : 500,
        "topology.enable.message.timeouts": true,
        "topology.message.timeout.secs" : 30,
        "topologyworker.childopts": "-Xms350m -Xmx350m",
        "topology.receiver.buffer.size": 32,
        "topology.executor.receive.buffer.size": 16384,
        "topology.executor.send.buffer.size": 16384,
        "topology.transfer.buffer.size": 32,
        "topology.worker.shared.thread.pool.size": 4,
        "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
        "topology.spout.wait.strategy": "backtype.storm.spout.SleepSpoutWaitStrategy",
        "topology.sleep.spout.wait.strategy.time.ms": 50,
        "topology.workers" : 1
    }
}

Check out the generated topology files ltr_intput_topology.json and ltr_output_topology.json.

Note

check out how the jinja templating allows to automate the publish-subscribe chaining of spouts and bolts.

Channels templating : advanced level

An advanced feature offers you to templatize multiple topologies with one template only, injecting topology-relative variables (not only channel-relative as in the previous example). In a few words all parameters defined in each topology section in channel_structure.json will be available as variables in your topology template.

For example let’s define a channel archiving two technologies: techno1 and techno2. techno1 logs come from a Kafka topic with 3 partitions and techno2 logs come from a Kafka topic with 2 partitions.

For obscure reasons we want to instantiate as many topologies as partitions for each technology, all with the same topology template. It’s possible by writing your configuration file as following:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
  "tenant": "mytenant",
  "channel_name": "archiver",
  "channel_structure_profile": "lmc/mytenant",
  "vendor": "thales",
  "technologies": [
    {
      "name": "techno1",
      "input_topic": "mytenant-techno1-output",
      "output_topic": "techno1_parsed",
      "errors_output_topic": "techno1_errors",
      "partitions": 3
    },
    {
      "name": "techno2",
      "input_topic": "mytenant-techno2-output",
      "output_topic": "techno2_parsed",
      "errors_output_topic": "techno2_errors",
      "partitions": 2
    }
  ],
  "output": {
    "archiving_cluster": "ceph:main-client",
    "archiving_pool": "mytenant-data"
  },
  "metrics": {
    "type": "elasticsearch",
    "elasticsearch_cluster_name": "es_search"
  }
}

Next, write your channel_structure.json.j2 template:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
	"topologies" : [
		{%- for technology in channel.technologies %}
          {%- for partition in range(technology.partitions) %}
          {
              "topology" : "{{ technology.name }}_{{ partition }}_topology.json",
              "execution_mode" : "cluster",
              "cluster" : "main",
              "template" : "topology.json.j2",
              "reload_action" : "kill_then_start",
              "technology" : "{{ technology.name }}",
              "input_topic" : "{{ technology.input_topic }}",
              "output_topic" : "{{ technology.output_topic }}",
              "errors_output_topic" : "{{ technology.errors_output_topic }}",
              "partition" : {{ partition }}
          }{%if loop.last == False %},{% endif %}{%- endfor %}
		{%if loop.last == False %},{% endif %}{%- endfor %}
	],

	"autotest_latency_control" : { ... },

	"metrics" : { ... },

	"kafka_topics" : {
		  {%- for technology in channel.technologies %}
		  "{{ technology.input_topic }}_topic": {
			    "name" : "{{ technology.input_topic }}",
		  	  "kafka_cluster" : "back",
		  	  "partitions" : {{ technology.partitions }},
		  	  "replication_factor" :  2
		  }
		  {%if loop.last == False %},{% endif %}{%- endfor %}
	}
}

As you observed the topology template can be specified for each topology via parameter template. If this parameter is not specified, the template used will be the template named as the topology (appending .j2). If there is no template available an error occurs.

Then write your topology template:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
{
    "tenant" : "{{channel.tenant}}",
    "channel" : "{{channel.channel_name}}",
    {%- if topology.partition is defined %}
    "name" : "{{ topology.technology }}_{{ topology.partition }}",
    {%- else %}
    "name" : "{{ topology.technology }}",
    {%- endif %}
    "spouts" : [
	     {
            "type" : "kafka_spout",
            "spout_settings" : {
              "transactional" : true,
              "batch_size" : 15000,
              "batch_interval" : "300s",
              "start_offset_strategy" : "latest",
              "load_control" : "none",
              "load_control.rate" : 10000,
              "load_control.adaptative" : false,
              "brokers" : "back",
              "watchdog_timeout_ms" : 120000,
              "topic" : "{{ topology.input_topic }}",
              "self_monitoring.activation" : true,
              "partition" : {{ topology.partition }},
              "group_name" : "{{ topology.technology }}",
              "metrics_tags" : { "technology" : "{{ topology.technology }}" }
            },
            "storm_settings" : {
              "executors": 1,
              "component" : "kafka_spout_{{ topology.technology }}",
              "publish" : [
                {
                  "stream" : "logs",
                  "fields" : ["raw_log", "log", "local_uuid", "local_timestamp"]
                }
              ]
            }
        }
    ],

    "bolts" : [
        {
                "type" : "file_bolt",
                "bolt_settings" : {
                        "compression" : true,
                        "compression_format": "GZIP",
                        "output_tuple_default_allocation_bytes" : 1500,
                        "add_header" : true,
                        "files": {
                                "raw": { "fields" : [ "raw_log" ] },
                                "parsing": { "fields" : [ "log", "local_uuid" , "local_timestamp"] }
                        },
                        "errors_files" :{
                                "raw": { "fields" : [ "_ppf_err_raw"]},
                                "error":{ "fields" : [ "_ppf_error", "_ppf_err_uuid", "_ppf_err_ts"] }
                        },
                        "timestamp_field" : "local_timestamp",
                        "publication_actions": [
                        {
                                "action_type":"write_to_objects_storage",
                                "cluster": "{{ channel.output.archiving_cluster }}",
                                "pool":"{{ channel.output.archiving_pool }}",
                                "topic" : "{{ topology.output_topic }}"
                        }],
                        "errors_publication_actions" : [
                        {
                                "action_type":"write_to_objects_storage",
                                "cluster": "{{ channel.output.archiving_cluster }}",
                                "pool":"{{ channel.output.archiving_pool }}",
                                "topic" : "{{ topology.errors_output_topic }}"
                        }
                        ]
                },
                "storm_settings" : {
                        "executors": 1,
                        "component" : "archiver_bolt",
                        "subscribe" : [
                        {
                                "component" : "kafka_spout_{{ topology.technology }}",
                                "stream" : "logs",
                                "grouping": "partitioning"
                        }
                        ],
                        "publish" : [
                        ]
                }
        }

    ],

    "metrics" : { ... },

    "storm_settings" : { ... }
}

Finally, after execution of punchplatform-channel.sh with –configure option, you get your channel structure and your 5 topologies.

Testing And Developing

If you need to test performance, understand how a topology work or simply test your punchlet, it is best to design small topologies, that you can run locally.

Check the $PUNCHPLATFORM_CONF_DIR/samples/examples folder. There you will find very simple sample files you can start with. If you execute punchplatform-channel.sh --configure on one of them, you will get a new channel (named example) in a new tenant (examples).

These channels are designed to be very easily configurable. You can combine spouts and bolts in simple chains to (say)

  1. read a file and forward its data to elasticsearch
  2. read a file and forward its data to kafka
  3. read a kafka topic and forward the data to elasticsearch
  4. etc etc..

Topology Monitoring

The PunchPlatform Storm components regularly publish metrics, that can be by configuration dispatched to various metric collector. Ultimately these metrics make their way to a final Elasticsearch metric backend. Elasticsearch is a powerful metric backend, check the rational in Metrics and MonitoringGuide,

Latency Monitoring

In addition to collecting metrics of each components, the PunchPlatform comes in with a powerful mechanism to automatically measure the latency of the traversing data in between various points. You, the user, can define so called paths, within a topology or across several topologies, to define start and end points of latency measure.

The principle is the following : Spout can be configured to periodically send latency monitoring messages as part of the traffic. These messages traverse your bolts, and pass from one topology to the next one crossing lumberjack or kafka hops. Along the way, these messages are enriched with each component (bolt, spout) identifier and timestamp.

Each storm component instantiates a handler ‘SelfMonitoringHandler’. The handler executes different actions :

  1. Compute information about the timestamp injection and the current time
  2. Add the latency metrics in the reporter queue

There is only one instance of reporter of each type by storm worker. Each reporter has a Map of metrics to send and all the selfmonitoring handlers have a reference to a static map for additional metrics (storm, kafka, …)

Each bolt and spout can be setup to send metrics to a metric database. This could be configured in the topology file in the section ‘metrics’.

"metrics" : {
      "reporters" : {
          "myelasticsearch" : {
            "type" : "elasticsearch" ,
            "cluster_name" : "es_search",
            "nodes" : [
              {
                  "transport_address" : "127.0.0.1",
                  "transport_port" : 9300
              }
            ],
            "max_results_size" : 10000,
            "native_es_settings" : {
              "transport.netty.workerCount" : "1"
            }
          },
          "myslf4j_metric_reporter" : { "type" : "slf4j" , "reporting_interval" : 5, "logging_level" : "DEBUG" }
      },
      "metrics_source" : {
        "reporter_name" : myelasticsearch
      },
}

Metric-Source

  • metrics.metrics_source.reporter_name : The name of the reporter defined in “reporters” section of the same file.
  • metrics.metrics_source.configuration : The configuration of the datasource (same as a reporter). For example,
"metrics" : {
      [...],
       "metrics_source" : {
          "configuration" : {
            "type" : "elasticsearch" ,
            "cluster_name" : "es_search",
            "transport" : "127.0.0.1:9300",
            "max_results_size" : 10000,
            "native_es_settings" : {
              "transport.netty.workerCount" : "1"
            }
          }
      },
     [...],
}

One purpose of this metrics is to give the latency between an injection point and the current bolt or spout.

Each spout can be configured to send metrics to a metric database. This allows to measure the latency between that spout and another spout or bolt downstream. In turn that latency is used to report a health status of the concerned channel.

{
   "tenant" : "{{channel.tenant}}",
   "channel" : "{{channel.channel_name}}",
   "name" : "input_front",
   [...],

   "spout_settings" : {
       # Default 30.
       "self_monitoring.frequency" : 30,
       # Indicates if the self monitoring is activated for the topology. Default false.
       "self_monitoring.activation" : true
      }
   [...]
}

Warning

the spout self monitoring is only supported for single executor spouts. Do not activate it for multi instance spouts.

If there is no metric backend on your platform, (typically a Ltr), you can make the metrics forwarded downstream (typically a Lmc) where a metric database is running.

PunchPlatform allows the user to configure specific tests to monitor the system. The channel_structure file takes into account the property autotest_latency_control. Don’t forget to define the section metrics with reporters in the channel_structure file. The autotest_latency_control section can be defined in the topolofy file.

"autotest_latency_control" : {
    "path_controls" : {
        "xxxxxxxxxxxxxx" : {
             "input" : {
                "root_node_name" : "punchplatform",
                "tenant_name" : "mytenant",
                "channel_name" : "apache",
                "storm_container_id" : "main",
                "topology_name" : "input",
                "component_name" : "input"
                },
              "output" : {
                "root_node_name" : "punchplatform",
                "tenant_name" : "mytenant",
                "channel_name" : "output",
                "storm_container_id" : "main",
                "topology_name" : "output_search",
                "component_name" : "elasticsearch"
                },
                "warn_threshold_s" : 2,
                "error_threshold_s" : 4
        }
    }
},
Property Description
path_controls List of controls
xxxxxxxxxxxxxx It’s a user property. You can put the name of your choice. This name will appear as a tooltip in the admin console.
input Input metric
output Output metric
warn_threshold_s Lower threshold. if the latency between input metric and output metric is lower then the health of path is GREEN
error_threshold_s Upper threshold. if the latency between input metric and output metric is upper then the health of path is RED. Between the two thresholds : YELLOW
../../../_images/latency.png

Custom metrics tags

To ease dashboarding, and to allow metrics selection based on user-provided tags, you can include metrics tags in any spout or bolt settings. These are key/values, that will be automatically be appended as context tags in the published metrics.

Note

this is only available with the Elasticsearch metrics backend.

Here is an example:

"spout_settings" : {
              "listen" : {
                "proto" : "tcp",
                "host" : "0.0.0.0",
                "port" : {{channel.input.tcp_port}}
              },
              "load_control" : "rate",
              "load_control.rate" : 500,
              "load_control.adaptative" : true,
              "self_monitoring.activation" : true,
              "metrics_tags" : {"technology": "{{channel.vendor}}" }
            },

Note

Custom metrics tags are also aggregated with storm task identifier in order to facilitate the dashboarding. For example “metrics_tags” : {“technology”: “{{channel.vendor}}” } will add the “tags.technology” tag but also “tags.by_task.technology” tag with value {{channel.vendor}}_{task_id}. This additional tag permits to build accurate average on a metric in Kibana with only one “group by” request and also avoid split graphs. The tag should be used only once in a tenant if you don’t specify the component_id of Storm in your request.

User Topologies

The PunchPlatform provides you with a set of ready to use bolts and spouts. You might however need to write your own, and insert them in a PunchPlatform topology, in turn part of a channel. Say for example you need a HDFS bolt, that you would like to chain with the PunchPlatform Kafka spout, so as to insert the data to an HDFS filesystem..

Develop Your Bolts and Spouts

Write your own spout or bolt in your Java maven project, as well as a PunchPlatform factory class for creating them. You must add following maven dependencies to your project:

<properties>
  <storm.version>1.1.1</storm.version>
  <punchplatform-storm-api.version>0.4.0</punchplatform-storm-api.version>
  <punchplatform-topology.version>0.5.0</punchplatform-topology.version>
</properties>

<dependencies>
  <dependency>
    <groupId>com.thales.services.cloudomc.punchplatform.storm</groupId>
    <artifactId>punchplatform-storm-api</artifactId>
    <version>${punchplatform-storm-api.version}</version>
  </dependency>
  <dependency>
    <groupId>com.thales.services.cloudomc.punchplatform</groupId>
    <artifactId>punchplatform-topology</artifactId>
    <version>${punchplatform-topology.version}</version>
  </dependency>
</dependencies>

Refer to the punchplatform-thirdparty maven example. Make sure you generate a jar with dependencies, so as to include the required PunchPlatform classes. These will provide the topology launcher class you need to run your topology.

Refer to Your Topology

In the topology json file refer to your bolt and to your jar as follows:

{
  "tenant" : "your_tenant",
  "channel" : "your_channel",
  "name" : "your_topology_name",

  # This must be an absolute path
  "jar" : "/opt/punchplatform/lib/punchplatform-topology-0.5.0-jar-with-dependencies.jar",

  "bolt" : {

    # Indicate you are the provider of this bolt
    "type" : "third_party",

    # Provide the factory class for creating your bolt. This class must have a
    # public constructor and implement the PunchPlatformBoltFactory class from
    # the com.thales.services.cloudomc.punchplatform.storm.api package.
    "class" : "com.your.org.YourBoltFactory",

    "bolt_settings" : {
      ...
    },
    "storm_settings" : {
      ...
    }
  }
}

You can then run your topology just like any other.

Placing Spouts and Bolts

The PunchPlatform provides you with a topology scheduler to let you place some of your topologies on specific servers.

Often you have input topologies running socket spouts, listening on some TCP/UDP addresses. If you submit such a topology to an unique Storm cluster, your spouts may end up not running on the server supposed to hold that listening address. To workaround that issue, declare in the topology configuration file a list of servers where your topology should go.

Let us start with an example. Say you have a simple topology with just a spout and a bolt. The storm cluster has four servers, each tagged with a unique identifier: blue1, blue2, green1, green2. Say you would like your topology to run on the blue1 and blue2 servers. Here is your setup:

../../../_images/TopologyPlacementConfig.png

Note

the tags are defined in the storm.yaml configuration file part of the Strom environment installed on each server.

By requesting the topology to run only on the blue servers, the PunchPlatform storm scheduler will dispatch the spouts and bolt(s) only on these. When you define such a configuration you must carefully consider the number of workers (i.e. the number of processes of your topology) together with the number of spouts and bolts instances (executors). If your goal is to have a spout running on each blue server, setting two spouts and two workers will do the job. The resulting placement is the following:

../../../_images/TopologyPlacementDispatch.png

The tags are declared in the storm setting Here is the part of the topology file where you declare your tags:

{
  "spouts" : [ ... ],
  "bolts" : [ ... ],
  "storm_settings" : {
    "punchplatform.supervisors" : ["blue1", "blue2"],
    "topology.workers" : 3,
    ...
  }
}

Tags must be declared in the storm.yaml files on each server as follows :

supervisor.scheduler.meta:
  tag: "blue"
storm.scheduler: "com.thales.services.cloudomc.punchplatform.storm.core.TopologyScheduler"

If you use topology placement, make sure:

  1. all your servers have tags, this features does not work properly on partially tagged clusters
  2. the PunchPlatform scheduler jar is installed in the storm lib directory on the nimbus server(s)

Note

this is, of course, done at installation time by the PunchPlatform deployer tool.

Metrics

The metrics that will be available in the metrics backend depends on the spouts and bolts included in your topology. Please refer to Spouts and Bolts for more details on metrics associated to each component type.