Skip to content

FileBolt

Overview

The FileBolt writes incoming tuples into archives. Archives can be plain files on a posix like filesystem, or objects stored in an object storage solution such as a S3 or Ceph object store...

image

In all cases the FileBolt can generate additional metadata that can be stored in a separate store (typically Elasticsearch). This metadata provides you with additional capabilities to efficiently extract your data. Without it, you will be on your own to navigate through folders or object store pools to locate and extract the archives you are interested in.

Pay attention that if you deal with massive or long term archiving (months or years) you will end up with hundreds of millions of files. Executing something as simple as a file or object listing will take a considerable amount of time.

Activating metadata is thus strongly encouraged. It will also allow you to benefit from the punch housekeeping feature that periodically purge out of dated archives.

The FileBolt supports writing to a POSIX file system (typically a NFS to archive log on a third party storage solution), to a S3 or Ceph distributed object store. The punch deployer lets you deploy a Ceph cluster on your on premise infrastructure easily. S3 stores are typically provided by your Cloud provider. If you do not have one we strongly recommend the Minio open source S3 store.

Settings

Main Settings

Name Type Default Value Description
destination String - Url of the destination device. Required if destinations not set.
destinations String Array - Urls of the destination devices. Required if destination not set.
access_key String - Access key for S3 Cluster
secret_key String - Secret key for S3 Cluster
strategy String at_least_once Batching strategy. One of exactly_once, at_least_once, one_by_one.
streaming Boolean false Filesystem only. If true, tuples are directly inserted into destination file, instead of a RAM buffer.
fields String Array - Tuple's fields to be stored. If not set, all fields are selected.
timestamp_field String - Field used to get timestamp of tuple (used for indexing). If not set, tuple processing time is used.
separator String __|__ Csv fields separator.
pool String Tenant name High-level isolated set of file. See Pools
topic String Channel name Allows grouping files under a common label. See File Naming
compression_format String NONE Format used for compression. One of "NONE", "GZIP".
batch_size Integer 1 000 Only for "at_least_once" strategy. Maximum number of tuples added to a batch.
batch_expiration_timeout_ms Integer 10 000 Only for "at_least_once" strategy. If no new tuples are added to a batch before timeout, batch is flushed.
maximum_write Integer Nb of destinations Limit the amount of destinations a batch should be successfully written to. See Successful Writes.
maximum_write Integer 1 Set the amount of destinations a batch should be successfully written to. See Successful Writes.
file_prefix_pattern String %{topic}/%{partition}/%{date}/puncharchive-%{bolt_id}-%{partition}-%{offset}-%{tags} Pattern used for object's prefix. See File Prefix.
batches_per_folder Integer 1 000 Number of batch contained in a set. See File Prefix.
tags String Array - Fields used to group tuples based on the corresponding value. See Batch Naming

Advanced Settings

Name Type Default Value Description
create_root Boolean false Create the root directory if it doesn't exists.
filename_field String - Only for "one_by_one" strategy. Field to use to get filename. Overrides classic naming pattern.
capacity Integer 67 108 864 Capacity in bytes of the RAM buffer.
digest Boolean true Create a digest for each batch.
encoding String text How the data is encoded. Set file extension. One of "text", "binary", "java_serialization".
escape Boolean true Set to false if you don't want to add a line break character after each written tuple.
bloom_filter_fields String Array - Fields to use for bloom filter. Activate the use of bloom filter.
bloom_filter_expected_insertions Integer - Required if using bloom filters. Should be batch_size * nb of bloom filter fields.
bloom_filter_false_positive_probability Double 0.01 False positive probability used in bloom filter.

Deprecated Settings

These settings are still available for compatibility but should not be used anymore.

Name New strategy
cluster Replace with destination parameter
batch_mode Replace with correct strategy : false -> one_by_one ; true -> at_least_once
folder_hierachy_pattern Replace with file_prefix_pattern
folder_hierarchy_pattern Replace with file_prefix_pattern
filename_prefix Place prefix directly in file_prefix_pattern
ciphered Enciphering is not provided anymore.

Destination Devices

The FileBolt supports 3 types of devices : plain posix filesystem, Ceph cluster and S3 Minio Cluster. The Bolt identifies the client to use depending on the prefix of the provided url. You must provide at least one url. If only one url is provided, you can use the destination parameter. If you have to provide multiple urls, you can use the destinations parameter.

Tip

No matter your device, all the other parameters still apply !

Filesystem

To use a Filesystem implementation, your url must match the pattern file://<path>.

ex : file:///tmp/storage

The path defines the root directory used for archiving. If this root-directory doesn't exist on the processing node, an exception is raised, and the writing fails.

Warning

For the sake of simplicity when testing, the option create_root can be activated. This option allows root directory to be automatically created.
We do not recommend activating this option.
Setting this option to true may create unwanted directories if destinations are mistyped. Furthermore, not creating root directory allows to check that destination cluster is reachable.

Ceph

To use a Ceph implementation, your url must match the pattern ceph_configuration://<configuration-file-path>.

ex : ceph_configuration:///etc/ceph/main-client.conf

The path provided must lead to the Ceph cluster configuration file, containing the required configuration to connect with Ceph cluster.

S3

To use a S3 implementation, your url must match the pattern http://<cluster-address> or https://<cluster-address>.

ex : http://192.168.0.5:9000/

The url must point to a valid cluster address. To authenticate to your cluster, your must use the access_key and secret_key parameters.

Archiving Strategy

Batching Strategy

The FileBolt can be configured to group incoming tuples in different ways. We call that the "batching strategy", and it is set in strategy parameter. These strategies are:

  • exactly_once: ensures that each tuple is processed only one time (no duplicates). This mode requires a KafkaSpout configured in batch mode. The reason is that the KafkaSpout is in charge of repeating exactly the same Kafka topic partition reading in case of restart. Using this mode the FileBolt is passive and wait to receive end-of-batch message from the KafkaSpout.
  • at_least_once: ensures that each tuple is processed at least once. Some duplicated tuples can be found in case of crash and restart of the punchline. This mode requires a KafkaSpout, sending additional _ppf_partition_id and _ppf_partition_offset fields as part of the emitted tuple.
  • one_by_one: write received tuples as files with no batching strategy. This very particular mode makes only sense to receive big chunks of data such as files or images, typically received on an HttpSpout.

Successful Writes

To enforce tuple acknowledgement, you can set a range of required successful write.

Settings the minimum_writes parameter ensure that data will be successfully written to at least this amount of destinations. If the number of successful writes goes below this parameter, a fatal exception is raised, and the processing stops. By default, this parameter is set to 1, so that a single successful write ensure success.

Settings the maximum_writes parameter ensure that data will be successfully written to at most this amount of destinations. If the number of successful writes goes above this parameter, the FileBolt will not try to write to the remaining destinations. By default, this parameter is set to the number of provided destinations so that no destinations is ignored.

Note

Devices are checked in a random order. This way, we won't always write to the same first destinations.

Streaming mode

A streaming mode is available when using filesystem device.

By default, the FileBolt will store tuples in RAM when creating batches. When the batch size or timeout is reached, this Batch is written to a file. When using streaming mode, the file is created on batch creation and tuples are written directly into it. This way, RAM is not used.

The Maximum/Minimum writes system explained above changes slightly when using streaming mode. On file creation, the logic is the same. We must at least be able to open minimum_writes destinations, and we stop when maximum_writes is reached. During batch creation, if a writing fails, we're not able to choose another destination. Therefore, this destination is considered a failure. When the remaining destinations number goes below minimum writes, a fatal exception is raised, incomplete files are deleted (when possible), and the processing stops.

To summarize, the maximum_writes limit is only used on file creation, and the minimum_writes limit is checked whenever a writing fails.

Batch Naming

On top of the destination device, the batch follows a specific naming pattern. This hierarchy has 3 levels : the pool, the prefix, and the unique ID.

Pools

A pool simply groups all your archive files into high-level isolated sets. Most often you have one pool per tenant. It provides isolation between your data.

The implementation of this concept depends on the device used :

  • Filesystem : top folder (after root).
  • Ceph : pool used in the cluster.
  • S3 : bucket used in the cluster.

Tip

By default the pool is the tenant, and the topic is the channel. Unless you have good reasons, we recommend you stick to that pattern.

File Prefix

The file prefix follows the pattern provided in the file_prefix_pattern parameter. This pattern allows inserting different parameters inside your prefix :

  • %{topic} : The topic parameter. Usually the channel, it allows grouping data that make sense for you to manipulate at once. For example, you can have one topic per type of logs (apache_httpd, sourcefire, etc...), or instead one topic by usage (security, monitoring, etc...). You are free to choose.
  • %{date} : The earliest tuple date of this batch. The tuple timestamp is either the value of the timestamp_field, if this parameter is provided, or the timestamp of the first tuple arrival, i.e the batch creation. The date is formatted with the pattern yyyy.MM.dd (ex : 2020.04.10).
  • %{batches_set} : Index of the batch set. Batches can be organized by set of given size. To set the size, use the batch_per_folder parameter. When a set reaches this size, this index is incremented.
  • %{partition} : The Kafka partition id. This parameter is ignored when using one_by_one strategy.
  • %{offset} : Depends on the chosen strategy.
    • at_least_once : The kafka partition offset.
    • one_by_one : The batch offset. Every time a batch is written, this offset is incremented.
    • exactly_once : The kafka batch id.
  • %{tag:<field>} : Value for given field. This field must be a tag field.
  • %{tags} : All tags values separated by a dash.
  • %{bolt_id} : A unique identifier for the component used.

Ex : %{topic}/%{date}/test/%{partitionId}/%{batches_set} with topic httpd, partition id 1 and a set index at 4 will give something like httpd/2020.04.10/test/9/4.

Unique ID

In order to ensure uniqueness and avoid filename conflicts, the filename structure is composed of a customizable prefix and an imposed unique ID. The pattern used is \-\ followed by the file extension. The uuid is generated at batch creation and is unique to each batch, preventing filename conflicts. The extension is always .csv and is completed by the compression extension (like .gz). If the provided prefix is empty, only the uuid is used.

Archive indexing

To deal with long duration and high-traffic archiving, the punch provides a powerful indexed archiving that uses an Elasticsearch index to keep track of what is saved.

In turn this will let you extract information more efficiently than scanning a unix folder. In particular, you will be able to leverage the archive spout to re inject your data.

Bloom filtering

The FileBolt leverage the bloom filter concept. This will allow exploring archives in a more efficient way. To activate bloom filtering, just set the bloom_filter_fields. To configure your bloom filter, simply set these parameters with desired value :

  • bloom_filter_fields : Fields to use for bloom filter.
  • bloom_filter_expected_insertions : Expected insertion into this bloom filter.
  • bloom_filter_false_positive_probability : False positive probability.

Tip

Learn more about bloom filtering on the wikipedia page.

ElasticSearch indexing

The FileBolt provides an output stream publishing the metadata of every archived batch. This metadata only concerns batches that have been successfully written to their destinations. To be able to explore your archives, you need to store this metadata in an ElasticSearch Index.

Tip

Even if you plan on storing this metadata somewhere else, we strongly recommend indexing it into ElasticSearch too. It is most likely you'll have to use the tools provided by the punch to inspect your archives, and all these tools rely on the ElasticSearch indexing of your metadata.

To use this, simply add an ElasticSearchBolt after your FileBolt and set the index prefix (usually "-archive"). You have full examples in example section.

Warning

Your prefix needs to be of form *-archive-* to apply the ElasticSearch template provided with the standalone. You can find this template in $PUNCHPLATFORM_CONF_DIR/resources/elasticsearch/templates/platform/mapping_archive.json. In particular, this template is necessary to use punchplatform-archive-client.sh tool presented later.

Batch Metadata

The metadata carries all the information you should have about your archived batch :

  • @timestamp : The timestamp indicating the meta data creation.
  • file.uncompressed_size : Size of content.
  • file.size : Size of file.
  • file.name : Object full name. (Combining prefix and ID)
  • file.compression.format : Compression format.
  • file.compression.ratio : Compression ratio ( >1 ).
  • debug.component_id : Storm bolt id.
  • debug.offset : Batch Offset. (See offset description in prefix section)
  • debug.input_partition : Kafka Partition.
  • batch.tuple_count : Number of tuples in this batch.
  • batch.latest_ts : Latest tuple timestamp. (See date description in prefix section)
  • batch.earliest_ts : Earliest tuple timestamp. (See date description in prefix section)
  • batch.initialization_ts : The timestamp roughly indicating the indexing time.
  • batch.digest : Batch digest.
  • fields.names : Names of archived fields.
  • fields.separator : Fields separator in csv content.
  • bloom_filter : Bloom filter hash value.
  • archive.devices_addresses : Addresses of the successfully written devices.
  • archive.pool : Archive pool. (See pools)
  • platform.channel : Channel used to archive this batch.
  • platform.tenant : Tenant used to archive this batch.
  • tags : Tags used, and their value. The topic is included in the tags.

Example :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
    "@timestamp": "2020-04-14T09:06:50.775Z",
    "file": {
      "uncompressed_size": 184500,
      "size": 13317,
      "name": "httpd/2020.04.14/puncharchive-0-1003-httpd-h2GnX3IBWWcd3bmsvUmw.csv.gz",
      "compression": {
        "format": "GZIP",
        "ratio": 13.854471727866636
      }
    },
    "debug": {
      "component_id": "urzwd3EBNj3z-fMfK9vE",
      "offset": 1003,
      "input_partition": 0
    },
    "batch": {
      "tuples_count": 133,
      "latest_ts": "2020-04-14T11:06:35.93+02:00",
      "earliest_ts": "2020-04-14T11:06:33.692+02:00",
      "initialization_ts": 1586855193711,
      "digest": "kHmSO1a79el+IgI2O7Fynw+HIWKzDoJXzCouhdSiuNs=",
      "fields": {
        "names": [
          "_ppf_id",
          "_ppf_timestamp",
          "log"
        ],
        "separator": "__|__"
      },
      "bloom_filter": ""
    },
    "archive": {
      "devices_addresses": [
        "file:///tmp/archive-logs/storage"
      ],
      "pool": "mytenant"
    },
    "platform": {
      "channel": "apache_httpd",
      "tenant": "mytenant"
    },
    "tags": {
      "topic": "httpd"
    }
}

Examples

Here are some examples of configuration. We'll consider that the tenant is mytenant and the channel is apache_httpd.

Exactly Once

Here, the key parameter is the batch_size in the Kafka Spout. By setting a batch size, we force the Kafka spout to generate data by batches. Also, note that it is not possible to put any bolt between the Kafka spout and the File bolt (not even a Punch bolt).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
{
  "spouts": [
    {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "<your_input_topic_name>",
        "start_offset_strategy": "last_committed",
        "brokers": "local",
        "batch_size": 1000
      },
      "storm_settings": {
        "executors": 1,
        "component": "kafka_reader",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log",
              "_ppf_timestamp"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "bolt_settings": {
        "strategy": "exactly_once",
        "destination": "file:///tmp/archive-logs/storage",
        "compression_format" : "GZIP",
        "timestamp_field": "_ppf_timestamp",
        "fields": [
          "log"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "file_bolt",
        "subscribe": [
          {
            "component": "kafka_reader",
            "stream": "logs"
          }
        ],
        "publish": [
          {
            "stream": "metadatas",
            "fields": [
              "metadata"
            ]
          }
        ]
      }
    },
    {
      "type": "elasticsearch_bolt",
      "bolt_settings": {
        "cluster_name": "es_search",
        "index": {
          "type": "daily",
          "prefix": "mytenant-archive-"
        },
        "document_json_field": "metadata",
        "batch_size": 1,
        "reindex_failed_documents": true,
        "error_index": { "type": "daily", "prefix": "mytenant-archive-errors"}
      },
      "storm_settings": {
        "component": "elasticsearch_bolt",
        "subscribe": [
          {
            "component": "file_bolt",
            "stream": "metadatas"
          }
        ]
      }
    }
  ]
}

For the sake of the illustration this example added two useful behavior:

  • compression is activated. You can use "GZIP" or "NONE" to disable compression.
  • one of the tuple field is used as timestamp as indicated by the timestamp_field property. This tells the FileBolt to use that incoming field as an indication of the record timestamp. This in turn is useful to generate, for each batch file, an earliest and latest timestamps, used for naming files or for indexing purpose. If you do not have such a ready-to-use field, the current time will be used instead.

If you run this topology, you will obtain the following archived files, with an ID that may be different:

1
2
3
4
5
6
7
8
/tmp/archive-logs/storage
└── mytenant
    └── apache_httpd
        └── 0
            └── 2020.04.14
                ├── puncharchive-ooEjeHEBeuU3WQMkBTrm-0-1586858493604-apache_httpd-Yo-nX3IBWWcd3bmsgIz5.csv.gz
                ├── puncharchive-ooEjeHEBeuU3WQMkBTrm-0-1586858493605-apache_httpd-h2GnX3IBWWcd3bmsvUmw.csv.gz
                ...

Here is an extract of the first one:

1
2
3
31-05-2019 10:18:59 host 128.114.24.54 frank
31-05-2019 10:19:59 host 128.95.200.73 bob
31-05-2019 10:20:59 host 128.202.228.156 alice

Tip

Note that each line only contains the raw log value. This is what is configured in the file bolt fields settings. If you select more fields, fields are separated by a configurable separator.

Warning

Previous punch releases optionally included a header line. It has been deprecated starting at punch 5.6.2.

Here we used the default file_prefix_pattern. Check the advanced options to generate the layout you prefer.

At least once

Take a look at the following Kafka spout published fields. You can see two PunchPlatform reserved fields: "_ppf_partition_id" and "_ppf_partition_offset". When using the at_least_once strategy, compared to the exactly_once, the FileBolt deals with the batching logic itself and not using the Kafka spout anymore. This is why we must explicitly declare these two extra fields.

Here is an example of this configuration:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
{
  "spouts": [
      {
        "type": "kafka_spout",
        "spout_settings": {
          "topic": "<your_input_topic_name>",
          "start_offset_strategy": "last_committed",
          "brokers": "local"
        },
        "storm_settings": {
          "executors": 1,
          "component": "kafka_reader",
          "publish": [
            {
              "stream": "logs",
              "fields": [
                "log",
                "_ppf_id",
                "_ppf_timestamp",
                "_ppf_partition_id",
                "_ppf_partition_offset"
              ]
            }
          ]
        }
      }
    ],
    "bolts": [
      {
        "type": "file_bolt",
        "bolt_settings": {
          "strategy": "at_least_once",
          "destination": "file:///tmp/archive-logs/storage",
          "timestamp_field": "_ppf_timestamp",
          "compression_format" : "GZIP",
          "batch_size": 1000,
          "batch_expiration_timeout_ms": 10000,
          "fields": [
            "_ppf_id",
            "_ppf_timestamp",
            "log"
          ]
        },
        "storm_settings": {
          "executors": 1,
          "component": "file_bolt",
          "subscribe": [
            {
              "component": "kafka_reader",
              "stream": "logs"
            }
          ],
          "publish": [
            {
              "stream": "metadatas",
              "fields": [
                "metadata"
              ]
            }
          ]
        }
      },
      {
        "type": "elasticsearch_bolt",
        "bolt_settings": {
          "cluster_name": "es_search",
          "index": {
            "type": "daily",
            "prefix": "mytenant-archive-"
          },
          "document_json_field": "metadata",
          "batch_size": 1,
          "reindex_failed_documents": true,
           "error_index": { "type": "daily", "prefix": "mytenant-archive-errors"}

        },
        "storm_settings": {
          "component": "elasticsearch_bolt",
          "subscribe": [
            {
              "component": "file_bolt",
              "stream": "metadatas"
            }
          ]
        }
      }
    ]
}

The result is now :

1
2
3
4
5
6
7
8
/tmp/archive-logs/storage
└── mytenant
    └── apache_httpd
        └── 0
            └── 2020.04.14
                ├── puncharchive-ooEjeHEBeuU3WQMkBTrm-0-1586858493604-apache_httpd-Yo-nX3IBWWcd3bmsgIz5.csv.gz
                ├── puncharchive-ooEjeHEBeuU3WQMkBTrm-0-1586858493605-apache_httpd-h2GnX3IBWWcd3bmsvUmw.csv.gz
                ...

You can control the folder layout using the file_prefix_pattern property.

One by one

In some situations, you may want to use the FileBolt to generate one file per incoming tuple, the one_by_one strategy can be used for that special case. The example below illustrates this use case.

Note that the Kafka spout can be in "batch" mode or not, both case will work.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
{
  "spouts": [
    {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "<topic_name>",
        "start_offset_strategy": "last_committed",
        "brokers": "local"
      },
      "storm_settings": {
        "executors": 1,
        "component": "kafka_reader",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log",
              "_ppf_id",
              "_ppf_timestamp"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "bolt_settings": {
        "strategy": "one_by_one",
        "destination": "file:///tmp/archive-logs/storage",
        "timestamp_field": "_ppf_timestamp",
        "file_prefix_pattern": "%{topic}/%{partition}/%{date}",
        "fields": [
          "_ppf_id",
          "_ppf_timestamp",
          "log"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "file_bolt",
        "subscribe": [
          {
            "component": "kafka_reader",
            "stream": "logs"
          }
        ]
      }
    },
    {
      "type": "elasticsearch_bolt",
      "bolt_settings": {
        "cluster_name": "es_search",
        "index": {
          "type": "daily",
          "prefix": "mytenant-archive-"
        },
        "document_json_field": "metadata",
        "batch_size": 1,
        "reindex_failed_documents": true,
        "error_index": { "type": "daily", "prefix": "mytenant-archive-errors"}
      },
      "storm_settings": {
        "component": "elasticsearch_bolt",
        "subscribe": [
          {
            "component": "file_bolt",
            "stream": "metadatas"
          }
        ]
      }
    }
  ]
}

In some situation, you may want to write incoming tuples to several files based on a Storm field value. On that case, you can use the filename_field option. When enabled, for each tuple, the FileBolt gets this field value and uses it as the filename. So in this situation, the previous spouts/bolts choose the final file name by setting the right value in this field.

Device types

Changing the device type from FileSystem to Ceph or S3 only requires a little change in the destination and some credentials. Here are the different configurations :

  • Filesystem :
1
2
3
4
5
{
  "bolt_settings": {
    "destination" : "file:///tmp/archive-logs/storage"
  }
}
  • Ceph :
1
2
3
4
5
6
{
  "bolt_settings": {
    "destination" : "ceph_configuration:///etc/ceph/main-client.conf",
    "user":  "admin"
  }
}
  • S3 :
1
2
3
4
5
6
7
{
  "bolt_settings": {
    "destination" : "http://localhost:9000",
    "access_key": "admin",
    "secret_key": "adminSecret"
  }
}

Extraction Debug Tool

The punch ships with a companion tool punchplatform-archive-client.sh that lets you easily check your archive status. See punchplatform-archive-client documentation.

Scaling Up Considerations

Archiving topologies are designed to scale in a way to write batches of files in a safe and idempotent way. The question is : how do you scale to achieve enough parallelism to write lots of data coming from multi partitioned topics ?

In all case, a single file bolt instance must be in charge of handling a single batch. The reason is simply that a batch is nothing more that a continuous sequence of record from the same Kafka partition. Having a complete batch handled by a single instance of the file bolt guarantees that even after a failure/replay, the same batches will be regenerated with the same identifiers, and the same content.

You have four ways to enforce that one-batch-per-instance condition.

  1. Use a single file bolt instance : this is of course the easiest solution to start with. That single instance will receive all the data from all the partition. The file bolt is designed in a way to achieve parallel concurrent writing of several batches. This setup can thus be efficient enough for many use cases. Try it first.
  2. Use several instances of the file bolt on top of a single kafka spout. That works but use the grouping strategy to enforce that each bolt will receive all the records from the same Kafka partition.
  3. Use one file bolt instance per partition. This is easy to configure, deploy as many instances of the Kafka spout as partitions, one dedicated for each partition, and setup a file bolt for each of the spout.
  4. if you ultimately need to deploy more bolts than you have partitions, use the grouping strategy.

Prefer the simplest solutions. A single process topology deployed on a multi-core server can achieve a high throughput.

Java documentation

To see the more in-depth configuration, refer to the file bolt javadoc documentation.