public class RetentionNode
extends org.thales.punch.libraries.storm.api.BaseProcessingNode
The Retention Bolt stores all incoming tuples into a memory table a until a defined event arrives. It then let them flow downstream
This bolt can subscribe to one or several Storm stream(s).
WARNING: To work properly, this bolt need the topology setting variable 'topology.enable.message.timeouts' to be set to 'false'. This disable the spout tuple re-emitting on timeouts. Thanks to this parameter, we deal with the tuple duplication issue.
Limit: Currently, all the pending tuples are stored in RAM so you are limited by the JVM size. Before using this bolt in production, you should test it (for example by sending a large number of tuple) and see when it fails.
property | default | description |
---|---|---|
trigger_field | MANDATORY | Name of the Storm stream field containing the expected value to trigger |
trigger_value | MANDATORY | The value that the variable received have to match to trigger the Tuples flushing |
batch_id_field | "default" | (Optional) Create multiple "Tuple flushing list" based on the value received. This way, you can flush only tuples associated to a specific ID |
retention_sec | 0 | The maximum time allowed between the last event received and the current tick tuple in seconds. If this limit is reach, the trigger is pulled : all tuples are acked and (depending on the 'emit_on_expired' value) emitted |
emit_on_expire | true | When the 'retention_sec' is reached, this boolean choose what to do with tuples. If set to 'true', each tuple will be emited and acked to the next bolt. Else if set to 'false', tuples will be acked but NOT emitted which lead to a loss of all these tuples |
{
"tenant": ...,
"channel": ...,
"name": ...,
"meta": {...},
"spouts": [
...
],
"bolts": [
...,
{
"type": "retention_bolt",
"bolt_settings": {
"trigger_field": "trigger",
"trigger_value": "whateveryouwant",
"batch_id_field" : "batch_id",
"retention_sec" : 120,
"emit_on_expire" : true
},
"storm_settings": {
"executors": 1,
"component": "retention_bolt",
"subscribe": [{
"component": "retention_kafka_spout",
"stream": "logs",
"grouping": "localOrShuffle"
}],
"publish": [{
"stream": "logs",
"fields": ["raw_log","log","local_uuid","es_index", "trigger", "batch_id"]
}]
}
},
...
],
"storm_settings": {
...
"topology.enable.message.timeouts": false,
...
}
}
Constructor and Description |
---|
RetentionNode(org.thales.punch.libraries.storm.api.NodeSettings boltSettings,
org.thales.punch.libraries.storm.api.ITopologySettings topoConfig)
Create a new retention bolt
|
Modifier and Type | Method and Description |
---|---|
void |
cleanup() |
Map<String,Object> |
getComponentConfiguration() |
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector)
Prepare this bolt.
|
void |
process(org.apache.storm.tuple.Tuple tuple)
The execute method is called by storm whenever some new tuples traverse the topology.
|
public RetentionNode(org.thales.punch.libraries.storm.api.NodeSettings boltSettings, org.thales.punch.libraries.storm.api.ITopologySettings topoConfig) throws UnknownHostException
boltSettings
- the bolt configurationtopoConfig
- the topo configurationUnknownHostException
- if one of your settings contains an invalid hostpublic void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void process(org.apache.storm.tuple.Tuple tuple)
process
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
tuple
- the input storm tuple.public Map<String,Object> getComponentConfiguration()
getComponentConfiguration
in interface org.apache.storm.topology.IComponent
getComponentConfiguration
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void cleanup()
cleanup
in interface org.apache.storm.task.IBolt
cleanup
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
Copyright © 2022. All rights reserved.