public class SyslogInput<T> extends AbstractSocketInput<T> implements ISyslogReceiver<T>
This spout launches a single dedicated thread for running the TCP server. All
received messages are enqueued, waiting for Storm to come and get them by
invoking the nextTuple()
method. In there, if there are logs they are
emitted to the topology.
When you use a SyslogSpout as entry point of your platform, it is handy to make it add extra information to the read data. You can declare one or several of the following special fields to the Storm configuration, in which case the spout will emit interesting information:
field name | comment | type |
---|---|---|
local_uuid | a short yet unique identifier | string |
local_timestamp | a timestamp in unix format | long |
local_port | the local receiving socket port number | long |
remote_port | the remote receiving socket port number | long |
local_host | the local receiving socket ip address | string |
remote_host | the remote socket ip address | string |
"listen" :
{
"proto" : "tcp",
"host" : "target.ip.address",
"port" : 9999,
"ssl" : true,
"ssl_private_key" : "/opt/keys/punchplatform.key.pkcs8",
"ssl_certificate" : "/opt/keys/punchplatform.crt",
}
property | mandatory | type | default | comment |
---|---|---|---|---|
load_control | no | string | "none" | If set to "rate", the spout will limit the rate to a specified value. You can use this to limit the incoming or outgoing rate of data from/to external systems. Check the "load_control.adaptative" and "load_control.rate" related properties |
load_control.rate | no | long | 10000 | Only relevant if load_control is set to "rate". Limit the rate to this number of message per seconds. |
load_control.adaptative | no | long | false | If true, the load control will not limit the traffic to load_control.rate message per second, but to less or more as long as the topology is not overloaded. Overload situation is determined by monitoring the Storm tuple traversal time. If that traversal time increases (which occurs quicly as soon as the topology is overloaded), the load controller will limit the allowed rate. This option makes it easy to protect your topology without limiting its rate to a too conservative rate, as it is difficult if not impossible to estimate the maximum rate. |
rcv_queue_size | no | int | 100000 | To each configured listening address corresponds one running thread, which reads in lines and stores them in a queue, in turn consumed by the spout executor(s). This parameter sets the size of that queue. If the queue is full, the socket will not be read anymore, which in turn will slow down the sender. Use this to give more or less capacity to accept burst without impacting the senders. |
multiline | no | boolean | false | If true, the spout will aggregate subsequent log line using a prefix such as "\t". Refer to the other multiline properties. |
multiline.regex | no | String | "" | set the regex used to determine if a log line is an initial or subsequent line. |
multiline.delimiter | no | String | " " | Once completed the aggregated log line is made up from the sequence of each line. You can insert a line delimiter to make further parsing easier. |
multiline.timeout | no | long | 1000 | If a single log line is received, it will be forwarded downstream after this timeout, expressed in milliseconds. |
ssl | no | string | false | Set to true to listen on SSL. If true, you must provide a certificate and a private key. |
ssl_certificate | no | string | - | An X.509 certificate chain file in PEM format |
ssl_private_key | no | string | - | A PKCS#8 private key file in PEM format. This file must be available on the Storm worker host. If you have another format, refer to the explanation below. |
ssl_secret | no | string | - | An optional secret associated to the key if it is password protected. |
secret | no | string | - | The secret to load a TLS PKCS12 SSL context |
openssl req -x509 -batch -nodes -newkey rsa:2048 -keyout punchplatform.key -out punchplatform.crt
The Syslog spout expects private key in PKCS8. Use the following to convert a
non PKCS8 to PKCS8 key.
openssl pkcs8 -topk8 -nocrypt -in punchplatform.key -out punchplatform.key.pcks8
charset
Constructor and Description |
---|
SyslogInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig)
A Spout constructor is in charge of setting the fields that will be required
later on when the spout will be effectively started and run somewhere in the
cluster.
|
SyslogInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig,
org.apache.logging.log4j.Logger subLogger)
Alternative constructor to use a Logger coming from the subclass
|
Modifier and Type | Method and Description |
---|---|
void |
close()
Close callback, notified by Storm runtime at topology deactivation.
|
void |
fail(Object o) |
void |
nextTuple()
Callback to feed the topology with tuples.
|
void |
onReceive(org.thales.punch.netty.impl.NettyTupleImpl<T> log) |
void |
open(Map stormSettings,
org.apache.storm.task.TopologyContext topologyContext,
org.apache.storm.spout.SpoutOutputCollector collector)
Setup this spout before it starts receiving data and emitting tuples into the
topology.
|
org.thales.punch.libraries.storm.api.ISpout |
registerNextTupleCallback(org.thales.punch.libraries.storm.api.INextTupleCallback cb) |
ack, getSocketHostAddress
deactivate, declareOutputFields, getPublishedStreams, regulate, sendLatencyRecord
public SyslogInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig)
spoutConfig
- the punchplatofrm spout configurationpublic SyslogInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig, org.apache.logging.log4j.Logger subLogger)
spoutConfig
- the spout configurationsubLogger
- the subclass loggerpublic void open(Map stormSettings, org.apache.storm.task.TopologyContext topologyContext, org.apache.storm.spout.SpoutOutputCollector collector)
This is called by the Storm runtime at topology activation.
open
in interface org.apache.storm.spout.ISpout
open
in class AbstractSocketInput<T>
stormSettings
- storm configurationtopologyContext
- the storm topology contextcollector
- the output collector for emitting tuples.public org.thales.punch.libraries.storm.api.ISpout registerNextTupleCallback(org.thales.punch.libraries.storm.api.INextTupleCallback cb)
public void close()
close
in interface org.apache.storm.spout.ISpout
close
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void nextTuple()
nextTuple
in interface org.apache.storm.spout.ISpout
public void fail(Object o)
fail
in interface org.apache.storm.spout.ISpout
fail
in class AbstractSocketInput<T>
public void onReceive(org.thales.punch.netty.impl.NettyTupleImpl<T> log)
onReceive
in interface ISyslogReceiver<T>
Copyright © 2023. All rights reserved.