Kafka Spout

The PunchPlatform KafkaSpout reads data from a Kafka topic. It automatically takes responsibility for reading one or several partition(s) of the topic, depending on the runtime presence of other Kafka spout(s) from the same topology or from topologies identically named but belonging to other Storm clusters.

Pay attention to the strategy you choose to read data from the topic. The most common strategy is to start reading data from the last saved offset. The Kafka spout saves its offset regularly in Zookeeper. Check the example configuration properties below. In most case you want the default configuration.

{
     "type" : "kafka_spout",
     "spout_settings" : {
       # Fetch data from the Kafka cluster identified by "front". You must have a corresponding entry
       # in the punchplatform.properties kafka cluster section.
       "brokers" : "front",

       # the topic to read data from
       "topic" : "mytenant.apache",

       # make the jvm exit if the Kafka Spout is not consuming one of the topic partition
       # for more than the expressed timeout.
       "watchdog_timeout_ms" : 120000,

       # Possible values are "earliest", "latest" or "last_committed".
       "start_offset_strategy" : "last_committed"

       # Below are advanced configuration items

     }
 }

The parameters accepted by the Kafka Spout are the following.

Parameter Mandatory Type Default Comment
brokers yes String   The source elasticsearch cluster id, among the elasticsearch
topic yes String   the topic to read data from
codec no String “lumberjack” with the “lumberjack” codec, data is expected to be read from kafka encoded as lumberjack frame. Use “string” to read in plain strings, for example json data. Watch out : with the String codec, the string will be emitted on a unique field. I.e. you must configure your topology to publish only one field.
group_name no String tenant + ”.” + channel + ”.” + topology_name or if ‘from_datetime’ is defined, append “.” + yyyyMMddHHmmss (current date) the consumer group name, used so that different spout instances cooperate with (and only with) consumer of same topic within the consumer group.
from_datetime no Date string   The start of the time scope to replay, with an iso 8601 format with seconds and timezone. e.g.:2016-02-09T14:40:34+01:00 The system will calculate the offset associated to the timestamp.
from_datetime_timeout_ms no int 10000 To calculate offset associated to a timestamp (‘from_datetime’), the system request Kafka. If Kafka is too long to answer then the offset will be set to NO_OFFSET value.
start_offset_strategy yes String   where to start reading from, for each partition. Possible values are “latest, “earliest or “last_committed”
failure_rate_enabled no Boolean false if true, makes the spout sleeps every time upcalled by Storm . whenever tuples fail. That is useful for forwarding topologies that durably fail to forward tuples to a remote apps (because of a failure of some downstream components). Setting this to true will quickly make your topology not busy looping reading kafka forever.
failure_rate_floor no int (ms) 20 maximum sleep time in case of failure.
failure_rate_increment no int (ms) 2 each tuple failure increases the sleep time by this value
failure_rate_decrement no int (ms) 2 each tuple acknowledgement decreases the sleep time by this value
transactional no Boolean false make the spout work using a transactional batch mode.
batch_size no int 0 target size of transactional batches (in content tuples). A new batch will be started once the OUTPUT batch contains at least this number of tuples. The actual resulting batch can be slightly bigger (in case of a big Kafka batch of tuples being handled just before the end of the batch) or significantly smaller (in case the batch_interval is reached before the batch is full.) This has no effect if ‘transactional’ is false.
batch_interval yes String “0m” expiration of transactional batches even though they are not full. This timeout is armed as soon as a batched receives its first tuple. Empty batches are never produced. Value “0” means there is no time-related expiration of batch, and batch will only finish when the batch_size has been reached. This has no effect if ‘transactional’ is false.
watchdog_timeout_ms yes int 120000 Long running topologies must never stop reading partitions This timeout protects you from zombie topic partition reader that prevent other alive spouts to process kafka data. Set to 0 to disable.
minimum_ms_between_com mits no int (ms) 15000ms After a successful state commit, the spout will wait for this delay before any new state commit. Please note that only a subsequent batch completion (in batch mode) or autocommit trigger (in non- batch mode) or explicit “commit” call will actually trigger a new commit pas this delay.

Optional : External inhibition/enabling through UDP commands

If you need to control dynamically (inhibit/enable) your topology throughput depending on an external information, you can activate an UDP Listener then provide enabling command through a simple netcat command.

When this feature is activated, then NEW tuples are only produced by the spout if an enabling update has been sent to the UDP command port recently. An enabling command is typically :

echo "enable_spout 1" >/dev/udp/localhost/8822

If an enabling command is not received within the validity interval, then the spout is disabled and no new tuples will be injected until a new enabling

To activate this feature, enrich your spout settings with the following section :
"remotecontrol_command_listener": {
  "enabler_command_key" : "enable_spout",
  "enabling_validity_in_ms" : 60000,
  "listening_port" : 8822,
  "listening_address" : "127.0.0.1"
}

When this feature is activated, an additional metric is published by the bolt : inhibition.ms (type gauge) that provides the number of ms since start of current inhibition period.

A typical cron job can be used to enable the spout only when the backlog of next topology is low enough :

#!/bin/bash

elasticsearch=myesnode03p:9200


function sendcommand () {
  local technology=$1
  local port=$2
  local value=$3

 for node in 192.168.98.91 192.168.98.92 192.168.98.93 ; do

  # Because we do not know on which node of our cluster the topology to control is running,
  # we just send the command frame to all potential hosts....

  echo "enable_${technology}_extractor ${value}" >/dev/udp/${node}/${port}
 done

}


function regulate () {

  local technology=$1
  local port=$2

        backlog=$(curl -s ${elasticsearch}'/metrics*/_search' -d '{ "query": {"query_string":{"query":"name:kafka.spout.fetch.messages-backlog AND tags.pp.channel:'${technology}' AND ts:>now-2m"}},"aggregations" : {"backlog":{"max":{"field":"kafka.spout.fetch.messages-backlog.gauge"}}}  }' | jq -r '.aggregations.backlog.value')

      # HERE IS THE THRESHOLD REGULATION CONDITION:
      # Notice that if no metric can be fetched, we do the same as if we determined a high backlog

      if [ "${backlog}" != "null" ] && [ ${backlog} -lt 500000 ] ; then
        echo ENABLING $technology FLOW at $(date) because backlog=$backlog ... >> /tmp/regulation.log
        sendcommand $technology $port 1

      else
        echo STOPPING $technology FLOW at $(date) because backlog=$backlog ... >> /tmp/regulation.log
        sendcommand $technology $port 0
      fi
}

echo "" >> /tmp/regulation.log
regulate apache 9851
regulate arkoon 9853
regulate juniper 9859
regulate stormshield 9861
regulate windows 9862
regulate sourcefire 9875
regulate siteminder 9885
regulate sophos 9895

Note that it is possible to declare multiple enabler command key in a same udp listener instance if you want to control several spouts ; you can also use the same enabler command key for multiple spout executors.

Streams And fields

The KafkaSpout emits in the topology n-fields tuple over a single stream. The fields must match the fields name received in the Kafka messages.

../../../../_images/KafkaSpoutContract.png

As an example the following configuration will make the KafkaSpout emit only the field “log” in the stream “logs”. For this to work, a “log” field must appear in the Kafka message. If other fields are present in the Kafka message, they will not be emitted as fields.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
       spout_settings" : { ... },
       "storm_settings" : {
         "executors": 1,
         "component" : "a_kafka_spout",
         "publish" : [
           {
             "stream" : "logs",
             "fields" : [
               "log"
             ]
           }
         ]
       }
   }

Fetching Data From Remote Platforms

A Kafka spout can fetch data from remote Kafka cluster deployed on a remote site, i.e. one running in another platform. This is typically used to fetch data from customer site. To do that you must add a “remote” section in your punchplatform.properties file and set there the kafka and zookeeper settings of the target remote platform. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
"remote" :{
      "customer-1" : {
          "kafka" : {
              "clusters" : {
                  "kafka-customer-1" : {
                      "brokers" : [ "x.y.z.t:9092" ],
                      "zk_cluster" : "zk-custmoer-1",
                      "zk_root" : "kafka-customer-1"
                  }
              }
          },

          "zookeeper" : {
              "clusters" : {
                  "zk-customer-1" : {
                      "servers" : ["x.y.z.t:2181"],
                      "punchplatform_root_node" : "/customer-1"
                  }
              }
          }
      }
 }

With such a configuration you can have your KafkaSpout fetching data from that remote system using the remote property as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
    "type" : "kafka_spout",
    "spout_settings" : {
      ...
      "remote" : "customer-1",
      "brokers" : "kafka-customer-1",
      ...
    },
    "storm_settings" : {...}
}

Implementation Note

The Kafka spout uses Zookeeper to save its offset, and to share the partition load with the other spouts from the same topology. This zookeeper root is determined as follows:

/<punchplatform_zk_root>/kafka-consumer/<kafka_brokers_name>/<group_name>/<topic>

where:

  • punchplatform_zk_root is the zookeeper root of the zookeeper cluster.
  • kafka_brokers_name is the target kafka cluster name
  • group_name is the name of the property “group_name” in the kafka spout. By default, it is the name of the topology (tenant + “.” + channel + “.” + topology).
  • topic is the topic name.

Metrics

See Kafka Spout Metrics