Skip to content

FileBolt

The file bolt writes tuples as part of batched files. It supports writing to a POSIX file system (typically a NFS to archive log on a third party storage solution), or to a CEPH distributed object store. A CEPH cluster is a PunchPlatform optional component that provides users with a robust, scalable and compact storage solution.

The FileBolt use an aggregation strategy : it accumulates a configurable number of tuples in memory, and once a threshold is reached, it append them all as part of a batch file. The File bolt support compression (GZIP or SNAPPY) as well as ciphering. The resulting files are then dumped to the configured backend.

refer to the file bolt javadoc documentation.

Warning

This bolt can work using a batch strategy. This is its default behavior. It then expects to receive end-of-batch markers so as to flush its content to its destination archive. Using that mode you must use the batch kafka spout as spout emitter in your topology. You can also use it to save each incoming tuple in a dedicated file. This latter mode only makes sense if you receive (say) http requests with large content.

Archive Types

The file bolt writes [batches] : a batch is simply a collection of events. For example on a log management solution a batch typically contains 20000 logs.

Batches are written either as files onto a posix filesystem, or as object onto a CEPH object storage.

filesystem

The batch files are written to a filesystem. The folder layout follows a time-based tree structure, and files are named using a time-scope naming scheme. The naming scheme for the directory structure is :

1
source<year>/<month number>/<day number>/<filename>

The naming scheme for the filename is :

1
<file_prefix><start year>-<start month number>-<start day>-<start hour>h<start minute>-<end datetime with same format>-<batch id>-<partition id><file suffix>< '.gz' if compression is active>

Object Storage

Batch files are written to an object storage (ceph cluster or NFS/Shared filesystem), together with an indexing structure designed to support time and topic based extraction.

This mode guarantees idempotent writing : in case of failure/replay of the data writing component, the batch files will be overwritten, ultimately achieving an exactly once semantics.

Info

starting from the Craig version, the archiving indexes are stored in Elasticsearch. Only the batched data goes to the object storage.

Streams And fields

You can finely control the data you write into archives. The file bolt will store all or some of the received fields. What it does is simply to transform the input tuple fields as a csv line.

Consider the following example, say the Kafka spout is configured to emit tuple with 7 fields :

1
2
3
"storm_settings" : { 
  "publish" : { "stream" : "logs" , "fields" : [ "Date","Open","High","Low","Close","Volume","Adj Close" ]    
}

The file bolt will receive values like this

1
2
3
4
5
6
7
8
9
{ 
  "stream" : "logs" , 
  "fields" : [ 
    "2017-12-28,171.000000", 171.850006, 170.479996, 171.080002, 171.080002, 16480200 ]
},
{
  "stream" : "logs" , 
  "fields" : [ "2017-12-29,170.520004", 170.589996, 169.220001, 169.229996, 169.229996, 25999900 ]
}

The file bolt will generate archives with the corresponding following lines:

1
2
2017-12-28|171.000000|171.850006|170.479996|171.080002|171.080002|16480200
2017-12-29|170.520004|170.589996|169.220001|169.229996|169.229996|25999900

Parameters

Mandatory parameters

  • destination : String
    • The url of the destination archive store.

Optional Parameters

  • es_cluster_id : String
    • If a ceph or indexed_filestore (indexed shared-filesystem objects-storage) destination is configured you must specify an Elasticsearch cluster name. This cluster is used by the FileBolt to store batches index. Cluster name must match one of the Elasticsearch clusters names referenced in your punchplatform.properties. Example: "es_search".
  • compression_format : String
    • Use one of the .gz, .snappy. File extension will be added to your batches if you write to a posix filesystem.
    • default:
  • required_ciphering_content : boolean
    • true to enable ciphering, if true you need an enciphering section as decribed by the next 4 properties.
    • default: false
  • [enciphering].keystore : String
    • Specify keystore absolute path.
  • [enciphering].keystore_pass : String
    • Specify keystore absolute password.
  • [enciphering].keys : Dictionary
    • Specify list of key ids (aliases).
  • [enciphering].[keys].key_id : String
    • id (alias) of the public and private rsa key.
  • fields : [String]
    • the FileSpout generates CSV files. You must precise the fields you want, in the required order. Each value must correspond to a Storm field name.
  • add_header : boolean
    • if true, a header line will be generated at the top of each file, to indicate the included fields and earlies/latest timestamps
    • default: true
  • separator : String
    • the csv fields separator
    • default: '__|__'
  • batch_mode : Boolean
    • if set to false, the FileBolt will write to a file each incoming tuple.
    • default: true

FileSystem Archive Parameters

If you use a plain filesystem destination, there are no additional parameters.

Object Storage Archive Parameters

If you use an object storage destination, you must set the following additional parameters:

  • cluster : String
    • for CEPH storage, /. Ceph client configuration file that has to be available at this path on the server that will run the topology. A fileystem emulation can be obtained for tests purpose without CEPH by using .
  • pool : String
    • name of the ceph pool of objects (usually, one by tenant).
  • topic : String
    • logical subset of data in the ceph pool. This will in turn lets you extract data within that topic.
  • es_index_prefix : String
    • By default objects indexation documents will be stored into a dedicated time-stamped index. This index name is the concatenation of an optional es_index_prefix, the pool name, an optional es_index_suffix and the current date.
  • es_index_suffix : String
    • By default objects indexation documents will be stored into a dedicated time-stamped index. This index name is the concatenation of an optional es_index_prefix, the pool name, an optional es_index_suffix and the current date.
    • Default: "-objects-storage".
  • es_timeout : String
    • If an ES indexation request trigs this time-out, the FileBolt will raise an exception. Example: "10s" (for 10 seconds).
    • default: "10s"
  • folders_hierachy_pattern : String
    • If objects-storage is implemented over a file-system, batches of logs will be written on multiple folders and sub-folders. This folders hierarchy is described in this parameter as a pattern. It can contain a static string like "my_archives/my_technology" and/or specific patterns: %{topic} for a topic, %{partition} for partition number, %{date} for earliest tuple date time in batch (following YYYY.mm.dd format only), or %{batches_set} which is the first numbers of batch number. This last pattern is tricky: it allows to store a fixed maximum number of batches in a folder. For example if "batches_per_folder" equals 1000 and if "folders_hierarchy_pattern" is "%{batches_set} then batches 1098 and 1099 will be stored in folder "1" and batches 2000 and 2001 will be stored in folder "2".
    • default: "%{topic}/%{partition}/%{date}/%{batches_set}"
  • batches_per_folder : Integer
    • If objects-storage is implemented over a file-system, batches of logs will be written on multiple folders and sub-folders. Each sub-folder, if default folders hierarchy pattern is kept, will contain at most this number of batches.
    • default: 1000

Example(s)

Example: minimal

Here is a simple yet complete topology example to archive logs taken from a kafka topic. In this example we only select two fields ( and ) from the record read by Kafka.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
{
  "tenant": "mytenant",
  "channel": "apache_httpd",
  "name": "logs_archiving",
  "spouts": [
    {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "mytenant_output_logs",
        "start_offset_strategy": "last_committed",
        "brokers": "local",
        "batch_size" : 10000
      },
      "storm_settings": {
        "executors": 1,
        "component": "kafka_spout",
        "publish": [
          {
            "stream" : "logs", "fields" : ["log", "local_uuid"]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "bolt_settings": {
        "destination": "file:///tmp/archive-logs/storage"
      },
      "storm_settings": {
        "executors": 1,
        "component": "file_bolt",
        "subscribe": [
          {
            "component": "kafka_spout",
            "stream": "logs",
            "grouping": "partitioning"
          }
        ]
      }
    }
  ]
}

Example: archiving with index (elasticsearch)

Execute this topology first:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
{
  "tenant": "mytenant",
  "name": "logs_archiving",
  "spouts": [
    {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "mytenant.apache2",
        "start_offset_strategy": "last_committed",
        "brokers": "local",
        "batch_size" : 1
      },
      "storm_settings": {
        "executors": 1,
        "component": "kafka_spout",
        "publish": [
          {
            "stream" : "logs",
            "fields": ["log"]       
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "bolt_settings": {
        "destination": "indexed_filestore://tmp/archive-logs/storage",
        "es_cluster_id": "es_search",
        "pool": "testpoolname",
        "topic": "mytenant.apache2",
        "fields": [
          "log"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "file_bolt",
        "subscribe": [
          {
            "component": "kafka_spout",
            "stream": "logs",
            "grouping": "localOrShuffle"
          }
        ]
      }
    }
  ],
  "storm_settings": {
    "topology.worker.childopts": "-Xms512m -Xmx512m"
  }
}

Fill your kafka with some data and see the result in your kibana instance with the newly created index...

you will need the file below (AAPL.csv) in the same directory where your topology is located:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
Date,Open,High,Low,Close,Adj Close,Volume
2017-12-28,171.000000,171.850006,170.479996,171.080002,171.080002,16480200
2017-12-29,170.520004,170.589996,169.220001,169.229996,169.229996,25999900
2018-01-02,170.160004,172.300003,169.259995,172.259995,172.259995,25555900
2018-01-03,172.529999,174.550003,171.960007,172.229996,172.229996,29517900
2018-01-04,172.539993,173.470001,172.080002,173.029999,173.029999,22434600
2018-01-05,173.440002,175.369995,173.050003,175.000000,175.000000,23660000
2018-01-08,174.350006,175.610001,173.929993,174.350006,174.350006,20567800
2018-01-09,174.550003,175.059998,173.410004,174.330002,174.330002,21584000
2018-01-10,173.160004,174.300003,173.000000,174.289993,174.289993,23959900
2018-01-11,174.589996,175.490005,174.490005,175.279999,175.279999,18667700
2018-01-12,176.179993,177.360001,175.649994,177.089996,177.089996,25418100
2018-01-16,177.899994,179.389999,176.139999,176.190002,176.190002,29565900
2018-01-17,176.149994,179.250000,175.070007,179.100006,179.100006,34386800
2018-01-18,179.369995,180.100006,178.250000,179.259995,179.259995,31193400
2018-01-19,178.610001,179.580002,177.410004,178.460007,178.460007,32425100
2018-01-22,177.300003,177.779999,176.600006,177.000000,177.000000,27108600
2018-01-23,177.300003,179.440002,176.820007,177.039993,177.039993,32689100
2018-01-24,177.250000,177.300003,173.199997,174.220001,174.220001,51105100
2018-01-25,174.509995,174.949997,170.529999,171.110001,171.110001,41529000
2018-01-26,172.000000,172.000000,170.059998,171.509995,171.509995,37687500
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
{
  "spouts": [
    {
      "type": "file_spout",
      "spout_settings": {
        "read_file_from_start": true,
        "path": "./AAPL.csv"
      },
      "storm_settings": {
        "component": "file_spout",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "punch_bolt",
      "bolt_settings": {
        "punchlet": "./AAPL.punch"
      },
      "storm_settings": {
        "component": "punch_bolt",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log"
            ]
          }
        ],
        "subscribe": [
          {
            "component": "file_spout",
            "stream": "logs"
          }
        ]
      }
    },
    {
      "type": "kafka_bolt",
      "bolt_settings": {
        "topic": "mytenant.apache2",
        "brokers": "local",
        "encoding": "lumberjack",
        "producer.acks": "all",
        "producer.batch.size": 16384,
        "producer.linger.ms": 5
      },
      "storm_settings": {
        "component": "kafkawritter",
        "subscribe": [
          {
            "component": "punch_bolt",
            "stream": "logs",
            "grouping": "localOrShuffle"
          }
        ]
      }
    }
  ]
}

Scaling Up Considerations

Archiving topologies are designed to scale in a way to write batches of files in a safe and idempotent way. The question is : how do you scale to achieve enough parallelism to write lots of data coming from multi partitioned topics ?

In all case, a single file bolt instance must be in charge of handling a single batch. The reason is simply that a batch is nothing more that a continuous sequence of record from the same Kafka partition. Having a complete batch handled by a single instance of the file bolt guarantees that even after a failure/replay, the same batches will be regenerated with the same identifiers and the same content.

You have four ways to enforce that one-batch-per-instance condition.

  1. Use a single file bolt instance : this is of course the easiest solution to start with. That single instance will receive all the data from all the partition. The file bolt is designed in a way to achieve parallel concurrent writing of several batches. This setup can thus be efficient enough for many use cases. Try it first.
  2. Use several instances of the file bolt on top of a single kafka spout. That works but use the grouping strategy to enforce that each bolt will receive all the records from the same Kafka partition.
  3. Use one file bolt instance per partition. This is easy to configure, deploy as many instance of the Kafka spout, one dedicated for each partition, and setup a file bolt for each of the spout.
  4. if you ultimately need to deploy more bolts than you have partitions, use the grouping strategy.

Prefer the simplest solutions. A single process topology deployed on a multi-core server can achieve a high throughput.