Skip to content

RetentionBolt

The Retention Bolt accumulate all incoming tuples until a defined event arrives. Useful when trying to have a 'batch like' behaviour.

Bolt-level parameters

  • trigger_field: String

    Name of the trigger Storm field (see ‘use cases’ above)

  • trigger_value: String

    Expected value to emit accumulated tuples

  • batch_id_field: String

    “default” Group tuples by batch ID. If leaved empty, every tuple will be in the same ‘default’ bucket and emitted together

  • retention_sec: Long

    0 Max time since last tuple has been received (by batch ID). A flush is triggered when reached.

  • emit_on_expire: Boolean

    true If set to true, ack and emit tuples when ‘retention_sec’ is reached. Else, only ack tuples (which result in a loss of data).

Use cases

1) Retention with one bucket (no batch ID needed)

Let say you have a Storm stream , . For example, if you want to have a batch processing like behaviour each 30 seconds, you can use this bolt configuration :

  • (referring to the Storm field 'action')
  • (trigger a tuple emitting only if this value is matched)
  • : 30

In that case, even if you never send a tuple with a string value in 'logs.action' the tuple will be periodically send to following bolts.

If at some point you would like to flush all the tuples (emit and ack them), send a tuple with the field 'logs.action' set to .

2) Retention with multi batch ID

If you need to accumulate tuple depending on their IDs, use the 'batch_id_field' to indicate which Storm field should be used to set this ID. This way, when a tuple containing the right 'trigger_value' is send, only tuples having this ID will be emitted.

Note

if the value received in 'batch_id_field' is null, the 'default' bucket will be used

Configuration example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
     "tenant": ...,
     "channel": ...,
     "name": ...,
     "meta": {...},

     "spouts": [...],

     "bolts": [
        ...,
        {
             "type": "retention_bolt",
             "bolt_settings": {
                 "trigger_field": "trigger",
                 "trigger_value": "whateveryouwant",
                 "batch_id_field" : "batch_id",
                 "retention_sec" : 120,
                 "emit_on_expire" : true
             },
             "storm_settings": {
                 "executors": 1,
                 "component": "retention_bolt",
                 "subscribe": [{
                 "component": "retention_kafka_spout",
                 "stream": "logs",
                 "grouping": "localOrShuffle"
             }],
             "publish": [{
                 "stream": "logs",
                 "fields": ["raw_log","log","local_uuid","es_index", "trigger", "batch_id"]
                 }]
             }
         },
        ...
     ],

     "storm_settings": {
         ...
         "topology.enable.message.timeouts": false,
         ...
     }
 }

Limits

To work properly, you should set the topology property 'topology.enable.message.timeouts' to 'false'. This disable the spout tuple re-emitting on timeouts. This parameter deals with tuple duplication issue.

Currently, all the pending tuples are stored in RAM so you are limited by the JVM size. Before using this bolt in production, you should test it and size the topology (e.g. 'topology.worker.childopts').