public class FilterNode
extends org.thales.punch.libraries.storm.api.BaseProcessingNode
The FilterBolt offers IP BlackListing or WhiteListing capabilities, as well as content based filtering. Each filtering works on an expected input tuple field. Note that you cannot use more than one type of filter. I.e. the filter bolt can only be configured with a single filtering type.
{
"type" : "filter",
"bolt_settings" : {
IKeys.BLACKLIST : {
IKeys.FIELD : "remote_host",
"list" : [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "127.0.0.1" ]
}
},
"storm_settings" : {
"component" : "my_filter_bolt",
"publish" : [
{"stream" : "logs", "fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"] }
],
"subscribe" : [
{ "component" : "some_spout", "stream" : "logs", "grouping": "localOrShuffle" }
]
}
}
Instead of setting the blacklist explicitly, you can refer to an external Json file. For example:
{
"type" : "filter",
"bolt_settings" : {
IKeys.BLACKLIST : {
IKeys.FIELD : "remote_host",
"list" : "filter/blacklist.json"
}
},
...
}
}
Make sure you have your json file under $PUNCHPLATFORM_CONF_DIR/resources/filter/blacklist.json.
{
"type" : "filter",
"bolt_settings" : {
"whitelist" : {
IKeys.FIELD : "remote_host",
"list" : [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "127.0.0.1/32" ]
}
},
"storm_settings" : {
...
}
}
It is handy to dispatch the white listed traffic onto different storm streams. This lets you, typically, dispatch
traffic received from (say) "172.16.0.0/12" to a kafka topic "cisco" and traffic from "192.168.0.0/16" to another (say) "apache" topic.
To do that use the following format:
{
"type" : "filter",
"bolt_settings" : {
"whitelist" : {
IKeys.FIELD : "remote_host",
"list" : [
{ "address" : "10.0.0.0/8", "stream" : "checkpoint" },
{ "address" : "172.16.0.0/12", "stream" : "cisco" },
{ "address" : "192.168.0.0/16", "stream" : "handover" },
{ "address" : "127.0.0.1/32", "stream" : "local" },
],
# if you want the unmatched data to be forwarded on a default stream you can
# add this settings. If you do that, no data will be dropped, of course.
"default_stream" : "default"
}
},
"storm_settings" : {
publish" : [
{
"stream" : "checkpoint",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
},
{
"stream" : "cisco",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
},
{
"stream" : "handover",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
},
{
"stream" : "local",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
}
}
}
Note that you must declare the possible output streams in your storm settings part, including the default stream if any.
{
"type" : "filter",
"bolt_settings" : {
IKeys.EXCLUDE_SUBSTRING : {
IKeys.FIELD : "log",
"substrings" : [ "accept" , "traffic" ],
"case_sentitive" : true
}
}
}
If you want the opposite, i.e. only let pass the data that contains the substrings, use :
{
"type" : "filter",
"bolt_settings" : {
IKeys.INCLUDE_SUBSTRING : {
IKeys.FIELD : "log",
"substrings" : [ "accept" , "traffic" ],
"case_sentitive" : true
}
}
}
{
"type" : "filter",
"bolt_settings" : {
IKeys.INCLUDE_SUBSTRING : {
IKeys.FIELD : "log",
"substrings_to_streams" : [
{ "substring" : "accept" , "stream" : "stream1"},
{ "substring" : "traffic" , "stream" : "stream2"}
{ "substring" : "traffic" , "stream" : "default"}
],
"case_sentitive" : true,
}
}
}
{
"type" : "filter",
"bolt_settings" : {
IKeys.EXCLUDE_REGEX : {
IKeys.FIELD : "log",
"pattern" : ".*(traffic|accept).*",
}
}
}
07:19:33 c.t.s.c.p.s.c.m.LogReporter [INFO]
name="storm.drop"
count=144321
m1_rate=1534.9477088294025
storm.task_id="2"
hostname="ltr01"
storm.component_type="filter_bolt"
pp.channel="ltr"
storm.topology_name="ltr_in"
storm.container_id="local_Dimitris-MacBook-Pro.local"
storm.component_id="filter_bolt"
pp.platform_id="punchplatform"
pp.tenant="perf"
Constructor and Description |
---|
FilterNode(org.thales.punch.libraries.storm.api.NodeSettings boltConfig,
org.thales.punch.libraries.storm.api.ITopologySettings topoConfig)
Create a new Filter
|
Modifier and Type | Method and Description |
---|---|
void |
cleanup() |
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector)
Prepare this bolt.
|
void |
process(org.apache.storm.tuple.Tuple tuple)
The execute method is called by storm whenever some new tuples traverse the topology.
|
public FilterNode(org.thales.punch.libraries.storm.api.NodeSettings boltConfig, org.thales.punch.libraries.storm.api.ITopologySettings topoConfig) throws org.thales.punch.exceptions.ConfigurationException
boltConfig
- the bolt configurationtopoConfig
- the topo configurationUnknownHostException
- if one of your settings contains an invalid hostorg.thales.punch.exceptions.ConfigurationException
public void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void process(org.apache.storm.tuple.Tuple tuple)
process
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
tuple
- the input storm tuple.public void cleanup()
cleanup
in interface org.apache.storm.task.IBolt
cleanup
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
Copyright © 2022. All rights reserved.