Skip to content

FileBolt

The Filebolt writes incoming tuples into files/archives. It supports writing to a POSIX file system (typically a NFS to archive log on a third party storage solution), or to a CEPH distributed object store. A CEPH cluster is a PunchPlatform optional component that provides users with a robust, scalable and compact storage solution.

Batching strategy

The Filebolt can be configured to group incoming tuples in differents ways, we call that the "batching strategy". These strategies are:

  • "exactly_once": ensure that each tuple is proceed only one time (no duplicates)
  • "at_least_once": ensure that each tuple is at least proceed once but duplicated tuple can be found
  • "one_by_one": equivalent to a batch size of one, generate one file by tuple

These configuration values must be set in the strategy settings field:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
  "type": "file_bolt",
  "bolt_settings": {
    "strategy": "exaclty_once",
    ...
  },
  "storm_settings": {
    ...
  }
}

Basics

Understanding punch archiving is easier with an example. We will consider you receive tuples from a Kafka topic. For the sake of our example, each tuple consists in four fields : "ip", "raw", "user" and "timestamp". Here is an example using a json representation:

1
2
3
4
5
6
{
  "ip": "128.245.162.123",
  "raw": "31-05-2019 11:18:46 host 128.245.162.123 jerome",
  "user": "jerome",
  "timestamp": "31-05-2019 11:18:46"
}

The FileBolt lets you decide which field you want to archive, which field should be considered (optionnally) as a timestamp, and
which field should serve to provide you with more indexing capability. This will become clear using examples below.

By default, the FileBolt works using a batch strategy. A batch is simply a collection of tuples. The FileBolt will accumulate a configured number of tuples so as to produce a single files, then flush it to the destination archive. This strategy allows to reduce the input output costs, to compress the data, and to benefit from advanced indexing options. A typical configuration consists in producing files containing several tenths of thousands of tuples. Here is a typical Kafka Spout configuration to read such records from Kafka, using a batch size of (say) 16:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
     {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "test_timed_archive1",
        "start_offset_strategy": "last_committed",
        "brokers": "local",
        "batch_size": 16
      },
      "storm_settings": {
        "component": "spout",
        "publish": [
          {
            "stream": "logs",
            "fields": [ "raw", "ip", "user", "timestamp" ]
          }
        ]
      }
    }

Note

For the batch strategy to work, the FileBolt expects to receive end-of-batch markers, so as to flush its content to its destination archive. This mode only works if you use a batch kafka spout as a spout emitter, as illustrated here.

You can choose among three archiving strategies. We will cover each using our simple example.

Plain Filesystem Archiving

The simplest mode consists in writing CSV files using a plain posix filesystem. This mode is activated if you set a destination property value that starts with the file:\\\ prefix. Here is an complete example configuration.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
  "tenant": "mytenant",
  "channel": "test_archive",
  "name": "test_archive_topology",
  "spouts": [
    {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "test_timed_archive1",
        "start_offset_strategy": "earliest",
        "brokers": "local",
        "batch_size": 16
      },
      "storm_settings": {
        "component": "spout",
        "publish": [
          {
            "stream": "logs",
            "fields": [ "raw", "ip", "user", "timestamp" ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "bolt_settings": {
        "destination" : "file:///tmp/plain",
        "compression_format" : "GZIP",
        "fields": [
          "raw"
        ],
        "timestamp_field" : "timestamp"
      },
      "storm_settings" : {
        "component": "bolt",
        "subscribe": [
          {
            "component": "spout",
            "stream": "logs"
          }
        ]
      }
    }
  ],
  "storm_settings": {
    "topology.worker.childopts": "-server -Xms256m -Xmx256m"
  }
}

For the sake of the illustration this example added two useful behavior:

  • compression is activated. You can use "GZIP", "SNAPPY" to compress or "NONE" to disable compression
  • one of the tuple field is used as timestamp as indicated by the timestamp_field property. What this does is to tell the FileBolt to use that incoming field as an indication of the record timestamp. This in turn is useful so as to generate, for each batch file, an earliest and latest timestamps, used for naming files or for indexig purpose. If you do not have such a ready-to-use field, the current time will be used instead.

If you run this topology, you will obtain the following archived files:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
[[tmp]$ tree plain/
plain/
└── 2019
    └── 05
        └── 31
            ├── puncharchive-2019-05-31-10h18-2019-05-31-10h33-1559290779566-0.csv.gz
            ├── puncharchive-2019-05-31-10h34-2019-05-31-10h49-1559290779567-0.csv.gz
            ├── puncharchive-2019-05-31-10h50-2019-05-31-11h05-1559290779568-0.csv.gz
            └── puncharchive-2019-05-31-11h06-2019-05-31-11h21-1559290779569-0.csv.gz

3 directories, 4 files

Here is an extract of the first one:

1
2
3
4
5
6
[31]$ gunzip puncharchive-2019-05-31-10h18-2019-05-31-10h33-1559291972387-0.csv.gz
[31]$ more puncharchive-2019-05-31-10h18-2019-05-31-10h33-1559291972387-0.csv
# earliest="2019-05-31 10:18:59:000" latest="2019-05-31 10:33:59:000" fields=raw separator="__|__"
31-05-2019 10:18:59 host 128.114.24.54 frank
31-05-2019 10:19:59 host 128.95.200.73 bob
31-05-2019 10:20:59 host 128.202.228.156 alice

Tip

Each file contains a header plus one line per tuple. Here it contains only the raw value. Other fields (ip, user, timestamp) are not put in. If you select more fields, fields are separated by a configurable separator.

As you can guess, the FileBolt uses a naming scheme that allows you to quickly locate ranges of time, and a folder organisation that you can control. Here we did not specify anything the default option were used to produce a time based folder organisation.

1
<root_destination>/<year>/<month>/<day>/<filename>

Check the advanced options to generate the layout you prefer.

Indexed Archiving

The previous mode is too limited to deal with long duration and high-traffic archiving. The punch provides a more powerful indexed archiving that uses an additional elasticsearch index to keep track of what is saved. In turn this will let you extract information more efficiently than scanning a unix folder. In particular you will be able to leverage the archive spout to reinject your data.

Pools and Topics

The punch archiving service lets you think in terms of pools and topics.

  • A pool simply groups all your archive files into high-level isolated sets. Most often you have one pool per tenant. It provides isolation between your data.
  • Inside each pool, a topic groups data that make sense for you to manipulate at once. For example you can have one topic per type of logs (apache_httpd, sourcefire, etc ..), or instead one topic by usage (security, monitoring, etc ..). You are free to choose.

Tip

By default the pool is the tenant, and the topic is the channel. Unless you have good reasons, we recommand you stick to that pattern.

Indexed FileSystem Archiving

Here is our example revisited to use this mode:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
  "tenant": "mytenant",
  "channel": "test_archive",
  "name": "test_archive_topology",
  "spouts": [
    {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "test_timed_archive1",
        "start_offset_strategy": "earliest",
        "brokers": "local",
        "batch_size": 16
      },
      "storm_settings": {
        "component": "spout",
        "publish": [
          {
            "stream": "logs",
            "fields": [ "raw", "ip", "user", "timestamp" ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "bolt_settings": {
        "destination" : "indexed_filestore:///tmp/indexed",
        "es_cluster_id": "es_search",
        "folders_hierachy_pattern": "%{topic}",
        "compression_format" : "GZIP",
        "fields": [
          "raw"
        ],
        "timestamp_field" : "timestamp"
      },
      "storm_settings" : {
        "component": "bolt",
        "subscribe": [
          {
            "component": "spout",
            "stream": "logs"
          }
        ]
      }
    }
  ],
  "storm_settings": {
    "topology.worker.childopts": "-server -Xms256m -Xmx256m"
  }
}

Note

Only one properties was added : the es_cluster_id property to specify the elasticsearch cluster were to save the index metadata.

The result is now :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
[tmp]$ tree indexed/
indexed/
└── mytenant
    └── test_archive
        └── 0
            └── 2019.05.31
                └── 1559293276
                    ├── test_archive-0-1559293276905.data
                    ├── test_archive-0-1559293276905.metadata
                    ├── test_archive-0-1559293276906.data
                    ├── test_archive-0-1559293276906.metadata
                    ├── test_archive-0-1559293276907.data
                    ├── test_archive-0-1559293276907.metadata
                    ├── test_archive-0-1559293276908.data
                    └── test_archive-0-1559293276908.metadata

5 directories, 8 files

As you can see the file organisation is slightly different, and you have more files. The *.data files contains your raw logs, while the *.metadata files contain the meta data associated with each data file. These metadata also are saved into an elasticsearch index. You can control the folder layout using the folder_hierarchy_pattern property. For example using "folders_hierachy_pattern": "%{topic}" you will get

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
[tmp]$ tree indexed/
indexed/
└── mytenant
    └── test_archive
        ├── test_archive-0-1559292899572.data
        ├── test_archive-0-1559292899572.metadata
        ├── test_archive-0-1559292899573.data
        ├── test_archive-0-1559292899573.metadata
        ├── test_archive-0-1559292899574.data
        ├── test_archive-0-1559292899574.metadata
        ├── test_archive-0-1559292899575.data
        └── test_archive-0-1559292899575.metadata

2 directories, 8 files

This parameter wil be explained in detail below.

Ceph Object Storage

If instead of :

1
   "destination" : "indexed_filestore:///tmp/indexed"

you use:

1
    "destination" : "ceph:<cluster_name>"

or

1
    "destination" : "ceph_configuration://<your ceph configuration file path>"

The FileBolt will save the batch files to a CEPH cluster. Besides that change, all the other parameter still apply. Instead of folders you will now deal with CEPH pools. The punch extraction tools will provide you with means to scan for pools, topics and files.

Streaming Mode

In some situations, you may need to use the FileBolt to generate one file per incoming tuple. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   {
      "type": "file_bolt",
      "bolt_settings": {
        "destination": "file:///tmp/data",
        "batch_mode": false,

        # Using the streaming mode you must provide the file bolt
        # with the name of the file to be used to store each incoming tuple.
        "filename_field": "filename",
        "fields": [
          "raw"
        ]
      },
      "storm_settings": {
        "component": "bolt",
        "subscribe": [
          {
            "component": "spout",
            "stream": "logs"
          }
        ]
      }
    }

Notice the batch_mode parameter now set to false, and the additional parameter filename_field that is now required to name each file. If you do that you will find the exact same folder tree than before but with a lot more files created. That is the point, using the streaming mode you have one file per incoming event.

Tip

this is typically used to ingest big HTTP bodies and save them as files.

Parameters

To see the more in-depth configuration, refer to the file bolt javadoc documentation.

  • destination (string)

    The url of the destination archive store.
    examples: 'file://', 'indexed_filestore://', 'ceph_configuration://', 'ceph:'. Multiple address can be specified if indexed_filestore is used. Each address should be delimitted by a comma. Only the first address needs 'indexed_filestore://" on beginning.
    example : 'indexed_filestore://addr1,addr2' The bolt will store archives in the provided destinations hosted by the node where the topology containing the file bolt is running

  • es_cluster_id (string)

    If a ceph or indexed_filestore (indexed shared-filesystem objects-storage) cluster is configured you must specify an Elasticsearch cluster name. This cluster is used by the FileBolt to store batches index. Cluster name must match one of the Elasticsearch clusters names referenced in your punchplatform.properties. Example: "es_search".

  • pool (string)

    name of the ceph pool of objects (usually, one by tenant).

  • topic (string)

    logical subset of data in the ceph pool. This will in turn lets you extract data within that topic.

  • es_index_prefix (string)

    By default objects indexation documents will be stored into a dedicated time-stamped index. This index name is the concatenation of an optional es_index_prefix, the pool name, an optional es_index_suffix and the current date.

  • es_index_suffix (string)

    By default objects indexation documents will be stored into a dedicated time-stamped index. This index name is the concatenation of an optional es_index_prefix, the pool name, an optional es_index_suffix and the current date.
    default: "-objects-storage".

  • es_timeout (string)

    If an ES indexation request trigs this time-out, the FileBolt will raise an exception.
    default: "10s"

  • folders_hierachy_pattern (string)

    If objects-storage is enabled on a filesystem, batches file are written following a specific folders structure. You can hard-code it with a pure static value such as my_archives/my_technology but we also provide you useful variables: %{topic} for a topic, %{partition} for partition number, %{date} for the earliest tuple timestamp in the current batch (YYYY.mm.dd format) and %{batches_set} for the current batch id. For example, you could use: my_archives/%{batches_set}-%{date}/abc/%{topic}.
    default: "%{topic}/%{date}/%{batches_set}"

  • batches_per_folder (integer)

    If objects-storage is implemented over a file-system, batches of logs will be written on multiple folders and sub-folders. Each sub-folder, if default folders hierarchy pattern is kept, will contain at most this number of batches.
    default: 1000

  • minimum_success_required (integer)

    Integer greater than 0. By default, this parameter is equal to 1. Use this parameter if you want to enforce tuple acknowledgement given the condition that data were successfully written to a specified number of destinations.

  • maximum_success (integer)

    Integer greater than 0. By default, this parameter is equal to the number of destinations provided in "destination" parameter. Use this parameter if you want to limit the amount of destinations used.

  • compression_format (string)

    Use one of the .gz, .snappy. File extension will be added to your batches if you write to a posix filesystem.
    default: "NONE"

  • required_ciphering_content (boolean)

    true to enable ciphering, if true you need an enciphering section as decribed by the next 4 properties.
    default: false

  • [enciphering].keystore (string)

    Specify keystore absolute path.

  • [enciphering].keystore_pass (string)

    Specify keystore absolute password.

  • [enciphering].keys (dictionary)

    Specify list of key ids (aliases).

  • [enciphering].[keys].key_id (string)

    id (alias) of the public and private rsa key.

  • add_header (boolean)

    if true, a header line will be generated at the top of each file, to indicate the included fields and earlies/latest timestamps
    default: true

  • separator (string)

    the csv fields separator
    default: '__|__'

  • batch_mode (boolean)

    if set to false, the FileBolt will write to a file each incoming tuple.
    default: true

  • timestamp_field (string)

    The tuple field to be used as a timestamp reference. This field must be a valid timestamp (epoch or ISO). It will be use to generated the batch filename based on the first event timestamp.
    Example: "_ppf_timestamp"

  • credentials

    If you need basic auth, use a credentials dictionary to provide the user password to use. For example : "credentials" : { "user" : bob, "password" : "bob's password" }

    This settings can be combined with ssl. token parameter can be specified like that: "credentials": { "token": "mytoken", "token_type": "ApiKey" }. Note, if user and password are specified, they will be ignored in favor of token parameter. Token are the base64 encoded string "user:password" if set to type: Basic

Ceph Debug Tool

The punch ships with a companion tool punchplatform-objects-storage.sh that lets you easily check your archive status. Here are a few examples. The following command returns a summary of a pool.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
[~]$ punchplatform-objects-storage.sh \
        pool-status                   \
        --pool mytenant               \
        --destination indexed_filestore:///tmp/indexed

Pool                          :mytenant
Object                        :4
Pool usage                    :4.6 kB
Cluster usage                 :182.5 GB
Cluster free                  :50.9 GB

Here is how to get adetails abouts topics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
[~]$ punchplatform-objects-storage.sh               \
        list-topics                                 \
        --destination indexed_filestore:///tmp/indexed  \
        --from-date -7d                             \
        --to-date now                               \
        --pool mytenant                             \
        --elasticsearch-cluster-url localhost       \
        --details
Pool                          : mytenant
Topics Number                 : 1

Topic                         : test_archive
Batch Size                    : 16
Tuple Count                   : 256
Earliest                      : 2019-05-31T08:10:47.940+02:00[Europe/Paris]
Latest                        : 2019-05-31T11:21:59+02:00[Europe/Paris]
Uncompressed Size             : 8.4 kB
Effective Stored Size         : 12.3 kB
Compression Ratio             : 1.466466
1/Compression Ratio           : 0.681912

Scaling Up Considerations

Archiving topologies are designed to scale in a way to write batches of files in a safe and idempotent way. The question is : how do you scale to achieve enough parallelism to write lots of data coming from multi partitioned topics ?

In all case, a single file bolt instance must be in charge of handling a single batch. The reason is simply that a batch is nothing more that a continuous sequence of record from the same Kafka partition. Having a complete batch handled by a single instance of the file bolt guarantees that even after a failure/replay, the same batches will be regenerated with the same identifiers and the same content.

You have four ways to enforce that one-batch-per-instance condition.

  1. Use a single file bolt instance : this is of course the easiest solution to start with. That single instance will receive all the data from all the partition. The file bolt is designed in a way to achieve parallel concurrent writing of several batches. This setup can thus be efficient enough for many use cases. Try it first.
  2. Use several instances of the file bolt on top of a single kafka spout. That works but use the grouping strategy to enforce that each bolt will receive all the records from the same Kafka partition.
  3. Use one file bolt instance per partition. This is easy to configure, deploy as many instance of the Kafka spout, one dedicated for each partition, and setup a file bolt for each of the spout.
  4. if you ultimately need to deploy more bolts than you have partitions, use the grouping strategy.

Prefer the simplest solutions. A single process topology deployed on a multi-core server can achieve a high throughput.

Java documentation

To see the more in-depth configuration, refer to the file bolt javadoc documentation.