public class HttpInput extends AbstractSocketInput<org.thales.punch.netty.impl.NettyHttpTuple>
This spout launches a single dedicated thread for running the TCP server. All
received messages are enqueued, waiting for Storm to come and get them by
invoking the nextTuple()
method. In there, if there are logs they are
emitted to the topology.
The Http spout relies on a blocking queue in a way to slow down the receiver thread (hence the reading of the socket) should Storm be slow to empty the queue. The queue is configured to hold a decent number of message (100000), so as to tolerate a few seconds of incoming logs without slowing down.
By default the HttpSpout sends the http response directly to the client after Storm aknowledges the tuple. Another mode is available: instead of sending the response to the client once the tuple is acknowledged, the Http spout subscribes to a whiteboard, specifying a unique id corresponding to the tuple and a callback to use when a WhiteBoardBolt produces on the whiteboard. This is the callback's job to respond to the client. You can enable this mode by setting direct_response parameter to false.
property | mandatory | type | default | comment |
---|---|---|---|---|
host | yes | string | - | the listening host address. Accepted values are dotted number IP addresses, or hostname. Use ""0.0.0.0" to listen to all interfaces. |
port | yes | int | - | The listening port. |
ssl | no | string | false | Set to true to listen on SSL. If true, you must provide a certificate and a private key. |
ssl_certificate | no | string | - | An X.509 certificate chain file in PEM format |
ssl_private_key | no | string | - | A PKCS#8 private key file in PEM format. This file must be available on the Storm worker host. If you have another format, refer to the explanation below. |
ssl_secret | no | string | - | An optional secret associated to the key if it is password protected. |
secret | no | string | - | The secret to load a TLS PKCS12 SSL context |
multipart_body_size | no | Integer | 16*1024*1024 | the maximum size allowed for decoding multipart content. If you set it to 0, multiparts will not be decoded at all. |
Besides the "listen" section, the spout configuration section has the following parameter:
property | mandatory | type | default | comment |
---|---|---|---|---|
rcv_queue_size | no | int | 10000 | The per spout receive queue size. If the topology is slower to process the received requests, it will end up stop reqding the incpming sockets. |
load_control | no | string | "none" | If set to "rate", the spout will limit the rate to a specified value. You can use this to limit the incoming or outgoing rate of data from/to external systems. Check the "load_control.adaptative" and "load_control.rate" related properties |
load_control.rate | no | long | 10000 | Only relevant if load_control is set to "rate". Limit the rate to this number of message per seconds. |
load_control.adaptative | no | long | false | If true, the load control will not limit the traffic to load_control.rate message per second, but to less or more as long as the topology is not overloaded. Overload situation is determined by monitoring the Storm tuple traversal time. If that traversal time increases (which occurs quicly as soon as the topology is overloaded), the load controller will limit the allowed rate. This option makes it easy to protect your topology without limiting its rate to a too conservative rate, as it is difficult if not impossible to estimate the maximum rate. |
openssl req -x509 -batch -nodes -newkey rsa:2048 -keyout punchplatform.key -out punchplatform.crt
The Syslog spout expects private key in PKCS8. Use the following to convert a
non PKCS8 to PKCS8 key.
openssl pkcs8 -topk8 -nocrypt -in punchplatform.key -out punchplatform.key.pcks8
Here is an example configuration :
"spouts" : [ { "type" : "syslog_spout", "spout_settings" : { "listen" : { "proto" : "tcp", "host" : "localhost", "port" : 9999, "direct_response": true "ssl" : true, "ssl_private_key" : "/opt/keys/punchplatform.key.pkcs8", "ssl_certificate" : "/opt/keys/punchplatform.crt" }, } } ]
charset
Constructor and Description |
---|
HttpInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig)
A Spout constructor is in charge of setting the fields that will be
required later on when the spout will be effectively started and run
somewhere in the cluster.
|
Modifier and Type | Method and Description |
---|---|
void |
ack(Object o) |
void |
close()
Close callback, notified by Storm runtime at topology deactivation.
|
void |
deactivate() |
void |
fail(Object o) |
void |
nextTuple() |
void |
open(Map stormSettings,
org.apache.storm.task.TopologyContext topologyContext,
org.apache.storm.spout.SpoutOutputCollector collector)
Setup this spout before it starts receiving data and emitting tuples into
the topology.
|
org.thales.punch.libraries.storm.api.ISpout |
registerNextTupleCallback(org.thales.punch.libraries.storm.api.INextTupleCallback cb) |
getSocketHostAddress
declareOutputFields, getPublishedStreams, regulate, sendLatencyRecord
public HttpInput(org.thales.punch.libraries.storm.api.NodeSettings spoutConfig)
spoutConfig
- the punchplatform spout configurationpublic void open(Map stormSettings, org.apache.storm.task.TopologyContext topologyContext, org.apache.storm.spout.SpoutOutputCollector collector)
This is called by the Storm runtime at topology activation.
open
in interface org.apache.storm.spout.ISpout
open
in class AbstractSocketInput<org.thales.punch.netty.impl.NettyHttpTuple>
stormSettings
- storm configurationtopologyContext
- the storm topology contextcollector
- the output collector for emitting tuples.public org.thales.punch.libraries.storm.api.ISpout registerNextTupleCallback(org.thales.punch.libraries.storm.api.INextTupleCallback cb)
public void close()
close
in interface org.apache.storm.spout.ISpout
close
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void nextTuple()
public void ack(Object o)
ack
in interface org.apache.storm.spout.ISpout
ack
in class AbstractSocketInput<org.thales.punch.netty.impl.NettyHttpTuple>
public void fail(Object o)
fail
in interface org.apache.storm.spout.ISpout
fail
in class AbstractSocketInput<org.thales.punch.netty.impl.NettyHttpTuple>
public void deactivate()
deactivate
in interface org.apache.storm.spout.ISpout
deactivate
in class org.thales.punch.libraries.storm.api.BaseInputNode
Copyright © 2023. All rights reserved.