RetentionBolt¶
The Retention Bolt accumulate all incoming tuples until a defined event arrives. Useful when trying to have a 'batch like' behaviour.
Bolt-level parameters¶
trigger_field
: StringName of the trigger Storm field (see ‘use cases’ above)
trigger_value
: StringExpected value to emit accumulated tuples
batch_id_field
: String“default” Group tuples by batch ID. If leaved empty, every tuple will be in the same ‘default’ bucket and emitted together
retention_sec
: Long0 Max time since last tuple has been received (by batch ID). A flush is triggered when reached.
emit_on_expire
: Booleantrue If set to true, ack and emit tuples when ‘retention_sec’ is reached. Else, only ack tuples (which result in a loss of data).
Use cases¶
1) Retention with one bucket (no batch ID needed)
Let say you have a Storm stream , . For example, if you want to have a batch processing like behaviour each 30 seconds, you can use this bolt configuration :
- (referring to the Storm field 'action')
- (trigger a tuple emitting only if this value is matched)
- : 30
In that case, even if you never send a tuple with a string value in 'logs.action' the tuple will be periodically send to following bolts.
If at some point you would like to flush all the tuples (emit and ack them), send a tuple with the field 'logs.action' set to .
2) Retention with multi batch ID
If you need to accumulate tuple depending on their IDs, use the 'batch_id_field' to indicate which Storm field should be used to set this ID. This way, when a tuple containing the right 'trigger_value' is send, only tuples having this ID will be emitted.
Note
if the value received in 'batch_id_field' is null, the 'default' bucket will be used
Configuration example¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | { "tenant": ..., "channel": ..., "name": ..., "meta": {...}, "spouts": [...], "bolts": [ ..., { "type": "retention_bolt", "bolt_settings": { "trigger_field": "trigger", "trigger_value": "whateveryouwant", "batch_id_field" : "batch_id", "retention_sec" : 120, "emit_on_expire" : true }, "storm_settings": { "executors": 1, "component": "retention_bolt", "subscribe": [{ "component": "retention_kafka_spout", "stream": "logs", "grouping": "localOrShuffle" }], "publish": [{ "stream": "logs", "fields": ["raw_log","log","local_uuid","es_index", "trigger", "batch_id"] }] } }, ... ], "storm_settings": { ... "topology.enable.message.timeouts": false, ... } } |
Limits¶
To work properly, you should set the topology property 'topology.enable.message.timeouts' to 'false'. This disable the spout tuple re-emitting on timeouts. This parameter deals with tuple duplication issue.
Currently, all the pending tuples are stored in RAM so you are limited by the JVM size. Before using this bolt in production, you should test it and size the topology (e.g. 'topology.worker.childopts').