Skip to content

SnmpSpout

The SnmpSpout allows your topology to receive events in SNMP 'trap' format

The socket level configuration items are defined in a listen subsection of the spout configuration. Here is an example of a SNMP Trap server listening on all network interfaces on port 9999:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "type": "snmp_spout",
  "spout_settings": {
    "load_control": "none",
    "listen": {
      "host": "0.0.0.0",
      "port": 9999
    }
  },
  "storm_settings": {
    ...
  }
}

Info

Refer to the SnmpSpout javadoc documentation.

Streams And fields

The Snmp spout receives "trap" SNMP Frames, decode them and forwards a Json representation of these events :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
{
  "type": "snmp_spout",
  "spout_settings": {
    ...
  },
  "storm_settings": {
    "component": "snmp_spout
        "publish": [
      # the incoming trap events lines will be emitted as single value Tuple, 
      # Here, the stream will be named "traps", and the json document describing the trap content will be stored in the "trap" field, and emitted on the stream "traps"  
      {
        "stream": "traps",
        "fields": [
          "trap"
        ]
      }
    ]
  }
}

The snmp spout can be configured to emit additional information so as to keep track of the sender/receiver addresses, as well as unique identifiers and timestamp. You add these fields by specifying one or more of the following fields (reserved names, that cannot be used to name the 'trap' field) :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
  "type": "snmp_spout",
  "spout_settings": {
    ...
  },
  "storm_settings": {
    "component": "snmp_spout",
    "publish": [
      # the incoming log lines will be emitted as multiple value Tuple, 
      # Here having a "trap" field for the main json document, and emitted on the stream "events" 
      {
        "stream": "events",
        "fields": [
          "trap",
          # the peer source ip address
          "_ppf_local_host",
          # the peer source ip port
          "_ppf_remote_port",
          # the local listening ip address
          "_ppf_local_host",
          # the local listening port
          "_ppf_local_port",
          # a unique identifier
          "_ppf_id",
          # the receiving time
          "_ppf_timestamp"
        ]
      }
    ]
  }
}

You can add none, some or all of these additional fields. Reserved streams and fields are documented in the javadoc:

Decoded trap JSON representation

Each received trap includes one ore more variables, each composed of an OID, a type and a value. The trap is decoded into a json structure that provides both trap-level and variable-level information. For example, if you send a trap using the following command :

1
    sudo snmptrap -v 1 -c public localhost:9901 1.2.3.4 localhost 2 0 "" 1.2.3.4.5 s "18000" 1.2.4.6.8 i 42

Then the storm tuple emitted by the SnmpSpout will look like this :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
  "traps": {
    "_ppf_local_host": "0.0.0.0",
    "_ppf_remote_port": 59577,
    "trap": {
       # The OID of the source agent
      "enterprise_oid": "1.2.3.4",
      "pdu_type": -92,
      "request_id": 0,
      # For v1 or v2c traps, this is the community name (for filtering inputs if needed)
      "security_name": "public",
      # PDU format (v1 or v2/v3)
      "snmp_version": "v1",
      # The agent address that produced the trap (may differ from _ppf_remote_host in case of udp forwarding)
      "source_agent_address": "192.168.253.133", # Decoded for v1 traps only
      # uptime of the source agent (in hundredth of seconds)
      "uptime_in_cs": 18000,  # Decoded for v1 traps only
      "generic_trap_id": 2, # Decoded for v1 traps only
      "specific_trap_id": 0,  # Decoded for v1 traps only
      # Now the actual variables included in the trap frame
      "variables": [
        {
          "oid": "1.2.3.4.5",
          "type": "STRING",
          "value": "hello world"
        },
        {
          "oid": "1.2.4.6.8",
          "type": "INT32",
          "value": 42
        }
      ]
    },
    "_ppf_id": "dqGEK20B9S2OYFKK2L2p",
    "_ppf_local_port": 9901,
    "_ppf_timestamp": 1568393189545,
    "_ppf_remote_host": "127.0.0.1"
  }
}

The supported SNMP variable types are :

  • "COUNTER32" => a positive integer
  • "UINT32" => a positive integer (in SNMP, this is a separate type from the 'counter' type)
  • "COUNTER64" => a positive, long integer
  • "INT32" => a signed integer
  • "IPADDRESS" => a SNMP IP address, represented as a json string of the form "value": "x.y.z.u"
  • "STRING" => a SNMP String
  • "OPAQUE" => a binary-encoded SNMP type (such as a float number or a bytes array)n represented as an json string composed of column-separated hex representation of the bytes like "value": "0a:f3:11",

Latency Tracker Tuple

Besides emitting the received data, you can configure the Snmp spout to generate and emit periodically a so-called punchplatform latency tracker tuple. That tuple will traverse your channel and keep track of the traversal time at each traversed spout and bolt. Here is how you configure the spout to emit such record every 20 seconds.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
  "type": "snmp_spout",
  "spout_settings": {
    ...
    "self_monitoring.activation": true,
    "self_monitoring.period": 30
  },
  "storm_settings": {
    "component": "snmp_spout",
    "publish": [
      {
        "stream": "traps",
        "fields": [
          "trap"
        ]
      },
      # the latency tracker record must be emitted on these reserved stream/field.
      {
        "stream": "_ppf_metrics",
        "fields": [
          "_ppf_latency"
        ]
      }
    ]
  }
}

Refer to the monitoring guide use the resulting measured latency to monitor your channel.

Optional Parameters

  • load_control

    String : "none"

    the spout will limit the rate to a specified value. You can use this to limit the incoming or outgoing rate of data from/to external systems. Check the related properties

  • load_control.rate

    Long : 10000

    Only relevant if load_control is set to limit the rate to this number of message per seconds.

  • load_control.adaptative

    Boolean : false

    If true, the load control will not limit the traffic to load_control.rate message per second, but to less or more as long as the topology is not overloaded. Overload situation is determined by monitoring the Storm tuple traversal time. If that traversal TODO

  • queue_size

    Integer : 100000

    To each configured listening address corresponds one running thread, which reads in events and stores them in a queue, in turn consumed by the spout executor(s). This parameter sets the size of that queue. If the queue is full, the socket will not be read anymore, which in turn will slow down the sender. Use this to give more or less capacity to accept burst without impacting the senders.

Metrics

See Common netty-based spout metrics

Socket Configuration

The listen section can take the following parameters:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
"listen" : {
   # the listening host address. Use "0.0.0.0" to listen on all interfaces
   "host" : "0.0.0.0",

   # the listening port number
   "port" : 9999,

   # Not mandatory : 1000 by default.
   # the spout uses an internal per spout queue to provide its 
   # data to the topology. This makes it possible to accept burst
   # of data, without hitting the maximum pending tuples configured 
   # for the spout, as well as reading the socket as fast as 
   # possible, without depending on the Storm upcall. Of course 
   # the bigger the better but the bigger RAM you will need. 
   # When using UDP, this value should be able to receive a burst 
   # of logs (i.e. should be consistent with the EPS expected on 
   # this syslog server)
   "queue_size" : 2000
 }