public class FilterNode
extends org.thales.punch.libraries.storm.api.BaseProcessingNode
The FilterBolt offers IP BlackListing or WhiteListing capabilities, as well as content based filtering. Each filtering works on an expected input tuple field. Note that you cannot use more than one type of filter. I.e. the filter bolt can only be configured with a single filtering type.
Here is how you filter out tuple whose one of the fields contains an IP address that you wich to blacklist. You must simply indicate on which (Storm) field you receive the IP address, typically the one set by a syslog spout.
{
"type" : "filter",
"bolt_settings" : {
IKeys.BLACKLIST : {
IKeys.FIELD : "remote_host",
"list" : [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "127.0.0.1" ]
}
},
"storm_settings" : {
"component" : "my_filter_bolt",
"publish" : [
{"stream" : "logs", "fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"] }
],
"subscribe" : [
{ "component" : "some_spout", "stream" : "logs", "grouping": "localOrShuffle" }
]
}
}
Instead of setting the blacklist explicitly, you can refer to an external Json file. For example:
{
"type" : "filter",
"bolt_settings" : {
IKeys.BLACKLIST : {
IKeys.FIELD : "remote_host",
"list" : "filter/blacklist.json"
}
},
...
}
}
Make sure you have your json file under $PUNCHPLATFORM_CONF_DIR/resources/filter/blacklist.json.
The whitelist works the other way around, it only lets tuple pass in the topology if some IP field is contained in your list.
{
"type" : "filter",
"bolt_settings" : {
"whitelist" : {
IKeys.FIELD : "remote_host",
"list" : [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "127.0.0.1/32" ]
}
},
"storm_settings" : {
...
}
}
It is handy to dispatch the white listed traffic onto different storm streams. This lets you, typically, dispatch
traffic received from (say) "172.16.0.0/12" to a kafka topic "cisco" and traffic from "192.168.0.0/16" to another (say) "apache" topic.
To do that use the following format:
{
"type" : "filter",
"bolt_settings" : {
"whitelist" : {
IKeys.FIELD : "remote_host",
"list" : [
{ "address" : "10.0.0.0/8", "stream" : "checkpoint" },
{ "address" : "172.16.0.0/12", "stream" : "cisco" },
{ "address" : "192.168.0.0/16", "stream" : "handover" },
{ "address" : "127.0.0.1/32", "stream" : "local" },
],
# if you want the unmatched data to be forwarded on a default stream you can
# add this settings. If you do that, no data will be dropped, of course.
"default_stream" : "default"
}
},
"storm_settings" : {
publish" : [
{
"stream" : "checkpoint",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
},
{
"stream" : "cisco",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
},
{
"stream" : "handover",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
},
{
"stream" : "local",
"fields" : ["log", "local_host", "local_port", "remote_host", "remote_port", "local_uuid", "local_timestamp"]
}
}
}
Note that you must declare the possible output streams in your storm settings part, including the default stream if any.
Here is how you filter out (say) logs containing a substring "accept" or "traffic".
{
"type" : "filter",
"bolt_settings" : {
IKeys.EXCLUDE_SUBSTRING : {
IKeys.FIELD : "log",
"substrings" : [ "accept" , "traffic" ],
"case_sentitive" : true
}
}
}
If you want the opposite, i.e. only let pass the data that contains the substrings, use :
{
"type" : "filter",
"bolt_settings" : {
IKeys.INCLUDE_SUBSTRING : {
IKeys.FIELD : "log",
"substrings" : [ "accept" , "traffic" ],
"case_sentitive" : true
}
}
}
{
"type" : "filter",
"bolt_settings" : {
IKeys.INCLUDE_SUBSTRING : {
IKeys.FIELD : "log",
"substrings_to_streams" : [
{ "substring" : "accept" , "stream" : "stream1"},
{ "substring" : "traffic" , "stream" : "stream2"}
{ "substring" : "traffic" , "stream" : "default"}
],
"case_sentitive" : true,
}
}
}
You can use regexes as well. Check the performance if you include a complex pattern. you MUST know that executing a regex takes about 100 times more CPU than a contains. Do not uses regexes on high traffic.
{
"type" : "filter",
"bolt_settings" : {
IKeys.EXCLUDE_REGEX : {
IKeys.FIELD : "log",
"pattern" : ".*(traffic|accept).*",
}
}
}
The filter bolt publishes a unique punchplatform metric, the drop rate. Its fields are the following:
07:19:33 c.t.s.c.p.s.c.m.LogReporter [INFO]
name="storm.drop"
count=144321
m1_rate=1534.9477088294025
storm.task_id="2"
hostname="ltr01"
storm.component_type="filter_bolt"
pp.channel="ltr"
storm.topology_name="ltr_in"
storm.container_id="local_Dimitris-MacBook-Pro.local"
storm.component_id="filter_bolt"
pp.platform_id="punchplatform"
pp.tenant="perf"
Constructor and Description |
---|
FilterNode(org.thales.punch.libraries.storm.api.NodeSettings boltConfig,
org.thales.punch.libraries.storm.api.ITopologySettings topoConfig)
Create a new Filter
|
Modifier and Type | Method and Description |
---|---|
void |
cleanup() |
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector)
Prepare this bolt.
|
void |
process(org.apache.storm.tuple.Tuple tuple)
The execute method is called by storm whenever some new tuples traverse the topology.
|
public FilterNode(org.thales.punch.libraries.storm.api.NodeSettings boltConfig, org.thales.punch.libraries.storm.api.ITopologySettings topoConfig) throws org.thales.punch.exceptions.ConfigurationException
boltConfig
- the bolt configurationtopoConfig
- the topo configurationorg.thales.punch.exceptions.ConfigurationException
public void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void process(org.apache.storm.tuple.Tuple tuple)
process
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
tuple
- the input storm tuple.public void cleanup()
cleanup
in interface org.apache.storm.task.IBolt
cleanup
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
Copyright © 2023. All rights reserved.