Skip to content

File Output

Overview

The FileOutput writes incoming tuples into archives. Archives can be plain files on a posix like filesystem, or objects stored in an object storage solution such as a S3 or Ceph object store...

image

In all cases the FileOutput can generate additional metadata that can be stored in a separate store (typically Elasticsearch). This metadata provides you with additional capabilities to efficiently extract your data. Without it, you will be on your own to navigate through folders or object store pools to locate and extract the archives you are interested in.

Pay attention that if you deal with massive or long term archiving (months or years) you will end up with hundreds of millions of files. Executing something as simple as a file or object listing will take a considerable amount of time.

Activating metadata is thus strongly encouraged. It will also allow you to benefit from the punch housekeeping feature that periodically purge out of dated archives.

The FileOutput supports writing to a POSIX file system (typically a NFS to archive log on a third party storage solution), to a S3 or Ceph distributed object store. The punch deployer lets you deploy a Ceph cluster on your on premise infrastructure easily. S3 stores are typically provided by your Cloud provider. If you do not have one we strongly recommend the Minio open source S3 store.

Settings

Main Settings

Name Type Default Value Description
**
destination** String - Url of the destination device. Required if destinations not set.
**
destinations** String Array - Urls of the destination devices. Required if destination not set.
minio_cluster String -
access_key String - Access key for S3 Cluster
secret_key String - Secret key for S3 Cluster
user String client User for Ceph Cluster
strategy String at_least_once Batching strategy. One of at_least_once, one_by_one.
streaming Boolean false Filesystem only. If true, tuples are directly inserted into destination file, instead of a RAM buffer.
encoding String csv Content encoding. One of csv, json, avro, parquet.
fields String Array - Tuple's fields to be stored. If not set, all fields are selected.
timestamp_field String - Field used to get timestamp of tuple (used for indexing). If not set, tuple processing time is used.
separator String __|__ Csv fields separator.
pool String Tenant name High-level isolated set of file. See Pools
topic String Channel name Allows grouping files under a common label. See File Prefix
compression_format String NONE Format used for compression. One of "NONE", "GZIP".
batch_size Integer 1 000 Only for "at_least_once" strategy. Maximum number of tuples added to a batch.
batch_expiration_timeout String 10s Only for "at_least_once" strategy. If no new tuples are added to a batch before timeout, batch is flushed. See Timeouts
batch_life_timeout String 0 Only for "at_least_once" strategy. If batch reach before timeout, batch is flushed. See Timeouts
maximum_writes Integer Nb of destinations Limit the amount of destinations a batch should be successfully written to. See Successful Writes.
minimum_writes Integer 1 Set the amount of destinations a batch should be successfully written to. See Successful Writes.
file_prefix_pattern String %{topic}/%{partition}/%{date}/puncharchive-%{bolt_id}-%{partition}-%{offset}-%{tags} Pattern used for object's prefix. See File Prefix.
batches_per_folder Integer 1 000 Number of batch contained in a set. See File Prefix.
tags String Array - Fields used to group tuples based on the corresponding value. See Batch Naming

Advanced Settings

Name Type Default Value Description
create_root Boolean false Create the root directory if it doesn't exists.
add_header Boolean false Add header to beginning of csv file. Ignored if encoding is not csv or if fields are empty.
filename_field String - Only for "one_by_one" strategy. Field to use to get filename. Overrides classic naming pattern.
bloom_filter_fields String Array - Fields to use for bloom filter. Activate the use of bloom filter.
bloom_filter_expected_insertions Integer - Required if using bloom filters. Should be batch_size * nb of bloom filter fields.
bloom_filter_false_positive_probability Double 0.01 False positive probability used in bloom filter.
hadoop_settings Map - Additional hadoop fs settings as Map.
charset String utf-8 Charset to use when encoding bytes to string.

Deprecated Settings

These settings are still available for compatibility but should not be used anymore.

Name New strategy
cluster Replace with destination parameter
batch_mode Replace with correct strategy : false -> one_by_one ; true -> at_least_once
folder_hierarchy_pattern Replace with file_prefix_pattern
filename_prefix Place prefix directly in file_prefix_pattern
batch_expiration_timeout_ms Replace with batch_expiration_timeout

Destination Devices

The FileOutput supports 3 types of devices : plain posix filesystem, Ceph cluster and S3 Minio Cluster. The FileOutput identifies the client to use depending on the prefix of the provided url.

You must provide at least one url. If only one url is provided, you can use the destination parameter. If you have to provide multiple urls, you can use the destinations parameter.

If multiple destinations are configured, they should all have the same type.

Tip

No matter your device, all the other parameters still apply !

Filesystem

To use a Filesystem implementation, your url must match the pattern file://<path>.

ex : file:///tmp/storage

The path defines the root directory used for archiving. If this root-directory doesn't exist on the processing node, an exception is raised, and the writing fails.

Warning

For the sake of simplicity when testing, the option create_root can be activated. This option allows root directory to be automatically created.
We do not recommend activating this option.
Setting this option to true may create unwanted directories if destinations are mistyped. Furthermore, not creating root directory allows to check that destination cluster is reachable.

Ceph

To use a Ceph implementation, your url must match the pattern ceph_configuration://<configuration-file-path>.

ex : ceph_configuration:///etc/ceph/main-client.conf

The path provided must lead to the Ceph cluster configuration file, containing the required configuration to connect with Ceph cluster.

S3

To use a S3 implementation, your url must match the pattern http://<cluster-address> or https://<cluster-address>.

ex : http://192.168.0.5:9000/

It is also possible to use the minio_cluster field to get URL(s) from platform properties.

The url must point to a valid cluster address. A bucket with the tenant name should be created before starting the topology.

To authenticate to your cluster, your have to provide an access_key and secret_key.

Archiving Strategy

Batching Strategy

The FileOutput can be configured to group incoming tuples in different ways. We call that the "batching strategy", and it is set in strategy parameter. These strategies are:

  • at_least_once: ensures that each tuple is processed at least once. Some duplicated tuples can be found in case of crash and restart of the punchline. This mode requires a KafkaSpout, sending additional _ppf_partition_id and _ppf_partition_offset fields as part of the emitted tuple.
  • one_by_one: write received tuples as files with no batching strategy. This very particular mode makes only sense to receive big chunks of data such as files or images, typically received on an HttpSpout.

Timeouts

A batch will only be flushed if one of these conditions is reached :

  • Size limit batch_size has been reached.
  • No tuple has been added to this batch for the duration set in batch_expiration_timeout.
  • The batch has been alive for the duration set in batch_life_timeout.

Both timeouts can be set using elasticsearch time unit notation. Available units are d for days, h for hours, m for minutes and s for seconds (ex : "10s"). You can also set a number (as string) representing the millisecond but keep in mind that the precision of these timeouts is limited to second.

For example, setting batch_expiration_timeout to 5m will ensure that a batch is flushed if no tuples has been added in the last 5 minutes. Settings batch_life_timeout to 2h will ensure that a batch will be flushed at most 2 hours after its creation.

You can disable these timeouts by setting a value of 0.

Successful Writes

To enforce tuple acknowledgement, you can set a range of required successful write.

Settings the minimum_writes parameter ensure that data will be successfully written to at least this amount of destinations. If the number of successful writes goes below this parameter, a fatal exception is raised, and the processing stops. By default, this parameter is set to 1, so that a single successful write ensure success.

Settings the maximum_writes parameter ensure that data will be successfully written to at most this amount of destinations. If the number of successful writes goes above this parameter, the FileOutput will not try to write to the remaining destinations. By default, this parameter is set to the number of provided destinations so that no destinations is ignored.

! note

1
   Devices are checked in a random order. This way, we won't always write to the same first destinations.

Streaming mode

A streaming mode is available when using filesystem device.

By default, the FileOutput will store tuples in RAM when creating batches. When the batch size or timeout is reached, this Batch is written to a file. When using streaming mode, the file is created on batch creation and tuples are written directly into it. This way, RAM is not used.

The Maximum/Minimum writes system explained above changes slightly when using streaming mode. On file creation, the logic is the same. We must at least be able to open minimum_writes destinations, and we stop when maximum_writes is reached. During batch creation, if a writing fails, we're not able to choose another destination. Therefore, this destination is considered a failure. When the remaining destinations number goes below minimum writes, a fatal exception is raised, incomplete files are deleted (when possible), and the processing stops.

To summarize, the maximum_writes limit is only used on file creation, and the minimum_writes limit is checked whenever a writing fails.

Avro/Parquet Encoding

The Filebolt supports 4 kinds of encoding : csv, json, avro and parquet. While csv and json are the standard encoding described in this document, avro/parquet encoding works in a slightly different way.

First, you must provide the schema as a setting :

schema:
  type: record
  namespace: org.thales.punch
  name: example
  fields:
    - name: _ppf_id
      type: string
    - name: _ppf_timestamp
      type: string
    - name: log
      type: string

More information on avro schema on the apache documentation..

Also, note that, in this node, avro only supports snappy compression and parquet only supports gzip compression.

Finally, Avro/Parquet encoding only works in streaming mode on a filesystem and batching mode on S3.

Batch Naming

On top of the destination device, the batch follows a specific naming pattern. This hierarchy has 3 levels : the pool, the prefix, and the unique ID.

Pools

A pool simply groups all your archive files into high-level isolated sets. Most often you have one pool per tenant. It provides isolation between your data.

The implementation of this concept depends on the device used :

  • Filesystem : top folder (after root).
  • Ceph : pool used in the cluster.
  • S3 : bucket used in the cluster.

Tip

By default the pool is the tenant, and the topic is the channel. Unless you have good reasons, we recommend you stick to that pattern.

File Prefix

The file prefix follows the pattern provided in the file_prefix_pattern parameter. This pattern allows inserting different parameters inside your prefix :

  • %{topic} : The topic parameter. Usually the channel, it allows grouping data that make sense for you to manipulate at once. For example, you can have one topic per type of logs (apache_httpd, sourcefire, etc...), or instead one topic by usage (security, monitoring, etc...). You are free to choose.
  • %{date:<pattern>} : The date of the first received tuple of this batch. The tuple timestamp is either the value of the timestamp_field, if this parameter is provided, or the timestamp of the first tuple arrival, i.e. the batch creation. The date is formatted with the pattern provided, or if non is provided yyyy.MM.dd (ex : %{date} is replaced by 2020.04.10, %{date:MM} is replaced by 04, etc...).
  • %{batches_set} : Index of the batch set. Batches can be organized by set of given size. To set the size, use the batch_per_folder parameter. When a set reaches this size, this index is incremented.
  • %{partition} : The Kafka partition id. This parameter is ignored when using one_by_one strategy.
  • %{offset} : Depends on the chosen strategy.
    • at_least_once : The kafka partition offset.
    • one_by_one : The batch offset. Every time a batch is written, this offset is incremented.
  • %{tag:<field>} : Value for given field. This field must be a tag field.
  • %{tags} : All tags values separated by a dash.
  • %{bolt_id} : A unique identifier for the component used.

Ex : %{topic}/%{date}/test/%{partitionId}/%{batches_set} with topic httpd, partition id 1 and a set index at 4 will give something like httpd/2020.04.10/test/9/4.

Unique ID

In order to ensure uniqueness and avoid filename conflicts, the filename structure is composed of a customizable prefix and an imposed unique ID. The pattern used is \-\ followed by the file extension. The uuid is generated at batch creation and is unique to each batch, preventing filename conflicts. The extension depends on the encoding. Either .avro, .parquet, .csv or .json. For csv and json, if compression, .gz is added. If the provided prefix is empty, only the uuid is used.

Archive indexing

To deal with long duration and high-traffic archiving, the punch provides a powerful indexed archiving that uses an Elasticsearch index to keep track of what is saved.

In turn this will let you extract information more efficiently than scanning a unix folder.

Bloom filtering

The FileOutput leverage the bloom filter concept. This will allow exploring archives in a more efficient way. To activate bloom filtering, just set the bloom_filter_fields. To configure your bloom filter, simply set these parameters with desired value :

  • bloom_filter_fields : Fields to use for bloom filter.
  • bloom_filter_expected_insertions : Expected insertion into this bloom filter.
  • bloom_filter_false_positive_probability : False positive probability.

Tip

Learn more about bloom filtering on the wikipedia page.

ElasticSearch indexing

The FileOutput provides an output stream publishing the metadata of every archived batch. This metadata only concerns batches that have been successfully written to their destinations. To be able to explore your archives, you need to store this metadata in an ElasticSearch Index.

Tip

Even if you plan on storing this metadata somewhere else, we strongly recommend indexing it into ElasticSearch too. It is most likely you'll have to use the tools provided by the punch to inspect your archives, and all these tools rely on the ElasticSearch indexing of your metadata.

To use this, simply add an ElasticSearchOutput after your FileOutput and set the index prefix (usually " -archive"). You have full examples in example section.

Warning

Your prefix needs to be of form *-archive-* to apply the ElasticSearch template provided with the standalone. You can find this template in $PUNCHPLATFORM_CONF_DIR/resources/elasticsearch/templates/platform/mapping_archive.json. In particular, this template is necessary to use punchplatform-archive-client.sh tool presented later.

Batch Metadata

The metadata carries all the information you should have about your archived batch :

  • @timestamp : The timestamp indicating the metadata creation.
  • file.uncompressed_size : Size of content in bytes.
  • file.size : Size of file in bytes.
  • file.name : Object full name. (Combining prefix and ID)
  • file.encoding : File encoding.
  • file.compression.format : Compression format.
  • file.compression.ratio : Compression ratio ( >1 ).
  • debug.component_id : Node id.
  • debug.offset : Batch Offset. (See offset description in prefix section)
  • debug.input_partition : Kafka Partition.
  • batch.tuple_count : Number of tuples in this batch.
  • batch.latest_ts : Latest tuple timestamp. (See date description in prefix section)
  • batch.earliest_ts : Earliest tuple timestamp. (See date description in prefix section)
  • batch.initialization_ts : The timestamp roughly indicating the indexing time.
  • batch.digest : SHA256 digest of file when using Ceph
  • fields.names : Names of archived fields.
  • fields.separator : Fields separator in csv content.
  • bloom_filter : Bloom filter hash value.
  • archive.devices_addresses : Addresses of the successfully written devices.
  • archive.pool : Archive pool. (See pools)
  • platform.channel : Channel used to archive this batch.
  • platform.tenant : Tenant used to archive this batch.
  • tags : Tags used, and their value. The topic is included in the tags.

Example :

{
  "@timestamp": "2020-04-14T09:06:50.775Z",
  "file": {
    "uncompressed_size": 184500,
    "size": 13317,
    "name": "httpd/2020.04.14/puncharchive-0-1003-httpd-h2GnX3IBWWcd3bmsvUmw.csv.gz",
    "encoding": "CSV",
    "compression": {
      "format": "GZIP",
      "ratio": 13.854471727866636
    }
  },
  "debug": {
    "component_id": "urzwd3EBNj3z-fMfK9vE",
    "offset": 1003,
    "input_partition": 0
  },
  "batch": {
    "tuples_count": 133,
    "latest_ts": "2020-04-14T11:06:35.93+02:00",
    "earliest_ts": "2020-04-14T11:06:33.692+02:00",
    "initialization_ts": 1586855193711,
    "fields": {
      "names": [
        "_ppf_id",
        "_ppf_timestamp",
        "log"
      ],
      "separator": "__|__"
    },
    "bloom_filter": ""
  },
  "archive": {
    "devices_addresses": [
      "file:///tmp/archive-logs/storage"
    ],
    "pool": "mytenant"
  },
  "platform": {
    "channel": "apache_httpd",
    "tenant": "mytenant"
  },
  "tags": {
    "topic": "httpd"
  }
}

Examples

Here are some examples of configuration. We'll consider that the tenant is mytenant and the channel is apache_httpd.

At least once

Take a look at the following KafkaInput published fields. You can see two PunchPlatform reserved fields: "_ppf_partition_id" and "_ppf_partition_offset". When using the at_least_once strategy, compared to the exactly_once, the FileOutput deals with the batching logic itself and not using the KafkaInput anymore. This is why we must explicitly declare these two extra fields.

Here is an example of this configuration:

  dag:
    - component: kafka_reader
      type: kafka_input
      settings:
        topic: <your_input_topic_name>
        start_offset_strategy: last_committed
        brokers: local
      publish:
        - stream: logs
          fields:
            - log
            - _ppf_id
            - _ppf_timestamp
            - _ppf_partition_id
            - _ppf_partition_offset
    - component: file_output
      type: file_output
      settings:
        strategy: at_least_once
        destination: file:///tmp/archive-logs/storage
        compression_format: GZIP
        timestamp_field: _ppf_timestamp
        batch_size: 1000
        batch_expiration_timeout: 10s
        fields:
          - _ppf_id
          - _ppf_timestamp
          - log
      subscribe:
        - component: kafka_reader
          stream: logs
      publish:
        - stream: metadatas
          fields:
            - metadata
    - component: elasticsearch_output
      type: elasticsearch_output
      settings:
        cluster_name: es_search
        index:
          type: daily
          prefix: mytenant-archive-
        document_json_field: metadata
        batch_size: 1
        reindex_failed_documents: true
        error_index:
          type: daily
          prefix: mytenant-archive-errors
      subscribe:
        - component: file_output
          stream: metadatas

For the sake of the illustration this example added two useful behavior:

  • compression is activated. You can use "GZIP" or "NONE" to disable compression.
  • one of the tuple field is used as timestamp as indicated by the timestamp_field property. This tells the FileOutput to use that incoming field as an indication of the record timestamp. This in turn is useful to generate, for each batch file, an earliest and latest timestamps, used for naming files or for indexing purpose. If you do not have such a ready-to-use field, the current time will be used instead.

If you run this topology, you will obtain the following archived files, with an ID that may be different:

/tmp/archive-logs/storage
└── mytenant
    └── apache_httpd
        └── 0
            └── 2020.04.14
                ├── puncharchive-ooEjeHEBeuU3WQMkBTrm-0-1586858493604-apache_httpd-Yo-nX3IBWWcd3bmsgIz5.csv.gz
                ├── puncharchive-ooEjeHEBeuU3WQMkBTrm-0-1586858493605-apache_httpd-h2GnX3IBWWcd3bmsvUmw.csv.gz
                ...

You can control the folder layout using the file_prefix_pattern property.

Here is an extract of the first one:

31-05-2019 10:18:59 host 128.114.24.54 frank
31-05-2019 10:19:59 host 128.95.200.73 bob
31-05-2019 10:20:59 host 128.202.228.156 alice

Tip

Note that each line only contains the raw log value. This is what is configured in the FileOutput fields settings. If you select more fields, fields are separated by a configurable separator.

One by one

In some situations, you may want to use the FileOutput to generate one file per incoming tuple, the one_by_one strategy can be used for that special case. The example below illustrates this use case.

Note that the KafkaInput can be in "batch" mode or not, both case will work.

dag:
  - component: kafka_reader
    type: kafka_input
    settings:
      topic: "<your_input_topic_name>"
      start_offset_strategy: last_committed
      brokers: local
    publish:
      - stream: logs
        fields:
          - log
          - _ppf_id
          - _ppf_timestamp
  - component: file_output
    type: file_output
    settings:
      strategy: one_by_one
      destination: file:///tmp/archive-logs/storage
      timestamp_field: _ppf_timestamp
      file_prefix_pattern: "%{topic}/%{partition}/%{date}"
      fields:
        - _ppf_id
        - _ppf_timestamp
        - log
    subscribe:
      - component: kafka_reader
        stream: logs

In some situation, you may want to write incoming tuples to several files based on a Storm field value. On that case, you can use the filename_field option. When enabled, for each tuple, the FileOutput gets this field value and uses it as the filename. So in this situation, the previous input/nodes choose the final file name by setting the right value in this field.

Device types

Changing the device type from FileSystem to Ceph or S3 only requires a little change in the destination and some credentials. Here are the different configurations :

  • Filesystem :
    settings:
      destination: file:///tmp/archive-logs/storage
    
  • Ceph :
    settings:
      destination: ceph_configuration:///etc/ceph/main-client.conf
      user: client
    
  • S3 :
    settings:
      destination: http://localhost:9000
      access_key: admin
      secret_key: adminSecret
    

Scaling Up Considerations

Archiving topologies are designed to scale in a way to write batches of files in a safe and idempotent way. The question is : how do you scale to achieve enough parallelism to write lots of data coming from multi partitioned topics ?

In all case, a single FileOutput instance must be in charge of handling a single batch. The reason is simply that a batch is nothing more that a continuous sequence of record from the same Kafka partition. Having a complete batch handled by a single instance of the FileOutput guarantees that even after a failure/replay, the same batches will be regenerated with the same identifiers, and the same content.

You have four ways to enforce that one-batch-per-instance condition.

  1. Use a single FileOutput instance : this is of course the easiest solution to start with. That single instance will receive all the data from all the partition. The FileOutput is designed in a way to achieve parallel concurrent writing of several batches. This setup can thus be efficient enough for many use cases. Try it first.
  2. Use several instances of the FileOutput on top of a single kafka input. That works but use the grouping strategy to enforce that each node will receive all the records from the same Kafka partition.
  3. Use one FileOutput instance per partition. This is easy to configure, deploy as many instances of the KafkaInput as partitions, one dedicated for each partition, and set up a FileOutput for each of the input.
  4. if you ultimately need to deploy more nodes than you have partitions, use the grouping strategy.

Prefer the simplest solutions. A single process topology deployed on a multi-core server can achieve a high throughput.

Java documentation

To see the more in-depth configuration, refer to the FileOutput javadoc documentation .