public class LumberjackInput extends AbstractSocketInput<Map<String,String>>
Internally this spout launches a single dedicated thread for running the TCP server. All
received messages are enqueued, waiting for Storm to come and get them by
invoking the nextTuple()
method. In there, if there are logs they are
emitted to the topology.
The lumberjack spout relies on a blocking queue in a way to slow down the receiver thread (hence the reading of the socket) should Storm be slow to empty the queue. The queue is configured to hold a decent number of message (100000), so as to tolerate a few seconds of incoming logs without slowing down.
Just like other socket server spout, the spout settings must contain a "listen" section where you define the socket end-point characteristics. The listen section has the following settings:
property | mandatory | type | default | comment |
---|---|---|---|---|
host | yes | string | - | the listening host address. Accepted values are dotted number IP addresses, or hostname. Use ""0.0.0.0" to listen to all interfaces. |
port | yes | int | - | The listening port. |
ssl | no | string | false | Set to true to listen on SSL. If true, you must provide a certificate and a private key. |
ssl_certificate | no | string | - | An X.509 certificate chain file in PEM format |
ssl_private_key | no | string | - | A PKCS#8 private key file in PEM format. This file must be available on the Storm worker host. If you have another format, refer to the explanation below. |
ssl_secret | no | string | - | An optional secret associated to the key if it is password protected. |
secret | no | string | - | The secret to load a TLS PKCS12 SSL context |
Besides the "listen" section, the spout configuration section has the following parameter:
property | mandatory | type | default | comment |
---|---|---|---|---|
rcv_queue_size | no | int | 10000 | The per spout receive queue size. If the topology is slower to process the received requests, it will end up stop reqding the incpming sockets. |
The listening port. | ||||
load_control | no | string | "none" | If set to "rate", the spout will limit the rate to a specified value. You can use this to limit the incoming or outgoing rate of data from/to external systems. Check the "load_control.adaptative" and "load_control.rate" related properties |
load_control.rate | no | long | 10000 | Only relevant if load_control is set to "rate". Limit the rate to this number of message per seconds. |
load_control.adaptative | no | long | false | If true, the load control will not limit the traffic to load_control.rate message per second, but to less or more as long as the topology is not overloaded. Overload situation is determined by monitoring the Storm tuple traversal time. If that traversal time increases (which occurs quicly as soon as the topology is overloaded), the load controller will limit the allowed rate. This option makes it easy to protect your topology without limiting its rate to a too conservative rate, as it is difficult if not impossible to estimate the maximum rate. |
For testing, you can generate a certificate and keys using :
openssl req -x509 -batch -nodes -newkey rsa:2048 -keyout punchplatform.key -out punchplatform.crt
The Syslog spout expects private key in PKCS8. Use the following to convert a
non PKCS8 to PKCS8 key.
openssl pkcs8 -topk8 -nocrypt -in punchplatform.key -out punchplatform.key.pcks8
Here is an example configuration :
"spouts" : [ { "type" : "lumberjack_spout", "spout_settings" : { "listen" : [{ "proto" : "tcp", "host" : "localhost", "port" : 9999, "compression" : false, "ssl" : true, "ssl_private_key" : "/opt/keys/punchplatform.key.pkcs8", "ssl_certificate" : "/opt/keys/punchplatform.crt" }] } } ]
charset
Constructor and Description |
---|
LumberjackInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig)
A Spout constructor is in charge of setting the fields that will be
required later on when the spout will be effectively started and run
somewhere in the cluster.
|
Modifier and Type | Method and Description |
---|---|
void |
close()
Close callback, notified by Storm runtime at topology deactivation.
|
void |
deactivate() |
void |
fail(Object o) |
void |
nextTuple()
Callback to feed the topology with tuples.
|
void |
open(Map stormSettings,
org.apache.storm.task.TopologyContext topologyContext,
org.apache.storm.spout.SpoutOutputCollector collector)
Setup this spout before it starts receiving data and emitting tuples into
the topology.
|
org.thales.punch.libraries.storm.api.ISpout |
registerNextTupleCallback(org.thales.punch.libraries.storm.api.INextTupleCallback cb) |
ack, getSocketHostAddress
declareOutputFields, getPublishedStreams, regulate, sendLatencyRecord
public LumberjackInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig)
spoutConfig
- the punchplatofrm spout configurationpublic void open(Map stormSettings, org.apache.storm.task.TopologyContext topologyContext, org.apache.storm.spout.SpoutOutputCollector collector)
This is called by the Storm runtime at topology activation.
open
in interface org.apache.storm.spout.ISpout
open
in class AbstractSocketInput<Map<String,String>>
stormSettings
- storm configurationtopologyContext
- the storm topology contextcollector
- the output collector for emitting tuples.public org.thales.punch.libraries.storm.api.ISpout registerNextTupleCallback(org.thales.punch.libraries.storm.api.INextTupleCallback cb)
public void close()
close
in interface org.apache.storm.spout.ISpout
close
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void nextTuple()
public void fail(Object o)
fail
in interface org.apache.storm.spout.ISpout
fail
in class AbstractSocketInput<Map<String,String>>
public void deactivate()
deactivate
in interface org.apache.storm.spout.ISpout
deactivate
in class org.thales.punch.libraries.storm.api.BaseInputNode
Copyright © 2023. All rights reserved.