Skip to content

File Output

Overview

The Filebolt writes incoming tuples into archives. Archives can be plain files on a posix like filesystem, or objects stored in an object storage solution such as a S3 or CEPH object store..

image

In all cases the FileBolt can generate additional metadata that can be stored in a separate store (typically Elasticsearch). These metadatas provide you with additional capabilities to efficiently extract your data. Without these, you will be on your own to navigate through folders or object store pools to locate and extract the archives you are interested in.

Pay attention that if you deal with massive or long term archiving (months or years) you will end up with hundreds of millions of files. Executing something as simple as a file or object listing will take a considerable amount of time.

Activating the metadatas is thus strongly encouraged. It will also allow you to benefit from the punch housekeeping feature that periodically purge out of dated archives.

The Filebolt supports writing to a POSIX file system (typically a NFS to archive log on a third party storage solution), to a S3 or CEPH distributed object store. The punch deployer lets you deploy a CEPH cluster on your on premise infrastructure easily. S3 stores are typically provided by your Cloud provider. If you do not have one we strongly recommend the minio open source S3 store.

Pools and Topics

The punch archiving service lets you think in terms of pools and topics.

  • A pool simply groups all your archive files into high-level isolated sets. Most often you have one pool per tenant. It provides isolation between your data.
  • Inside each pool, a topic groups data that make sense for you to manipulate at once. For example you can have one topic per type of logs (apache_httpd, sourcefire, etc ..), or instead one topic by usage (security, monitoring, etc ..). You are free to choose.

Tip

By default the pool is the tenant, and the topic is the channel. Unless you have good reasons, we recommand you stick to that pattern.

Batching Strategy

The FileBolt works using a batch strategy. A batch is simply a collection of tuples. The FileBolt will accumulate a configured number of tuples so as to produce a single file, then flush it to the destination archive. This strategy allows to reduce the input output costs, to compress the data, and to benefit from advanced indexing options. You can choose among three archiving strategies that will be covered in the next section.

Controlling the Archive Content

The FileBolt receive the incoming data as tuples. You decide which field you want to archive, which field should be considered (optionnally) as a timestamp, and which field should serve to provide you with more indexing capability. This will become clear using the examples below.

Settings

Batching Strategy

The Filebolt can be configured to group incoming tuples in different ways, we call that the "batching strategy". These strategies are:

  • exactly_once: it ensures that each tuple is processed only one time (no duplicates). It only is supported together with a batching KafkaInput. The reason is that the KafkaSpout is in charge of repeating exactly the same Kafka topic partition reading in case of restart. Using this mode the FileBolt is passive and wait to receive end-of-batch message from the KafkaSpout.
  • at_least_once: it ensures that each tuple is processed at least once. Some duplicated tuples can be found in case of crash and restart of the punchline. This mode requires a standard KafkaInput, yet configured in a way to send additional _ppf_partition_id and _ppf_partition_offset fields as part of the emitted tuple. Refer to the example below.
  • one_by_one: this very particular mode makes only sense to receive big chunks of data su as files or images, typically received on an HttpSpout. It allows you to write these as files with no batching stratey. I.e. each received data will be saved as is to the archive.

These configuration values must be set in the strategy setting field:

{
  "type": "file_bolt",
  "settings": {
    "strategy": "at_least_once",
    ...
  },
  "storm_settings": {
    ...
  }
}

You can find a complete topology configuration for each of these use case in the examples section

Timestamp Selection

The Filebolt allows you to sort the incoming event based on a timestamp field. This is especially useful to ensure that Tuples are correctly ordered in each batch file. By default, if no timestamp field is set, the unix epoch of the tuple processing time is used.

To enable it, use the timestamp_field option:

{
  "type": "file_bolt",
  "settings": {
    "timestamp_field": "_ppf_timestamp",
    ...
  },
  "storm_settings": {
    ...
  }
}

The FileBolt lets you decide which field you want to archive, which field should be considered (optionally) as a timestamp, and which field should require some indexing capability. This will become clear using examples below.

Basic Filesystem Storage

The simplest mode consists in writing to plain posix filesystem, nothing more. This mode is activated if you set a destination property value that starts with the file:// prefix. If you want to be able to search quickly through your events, please go to the next section.

Here is an example configuration:

{
  "type": "file_bolt",
  "settings": {
    "destination": "file:///tmp/plain",
    "fields": [
      ...
    ]
  },
  "storm_settings": {
    ...
  }
}

Indexed Storage

The previous mode is too limited to deal with long duration and high-traffic archiving. The punch provides a powerful indexed archiving that uses an additional Elasticsearch index to keep track of what is saved. In turn this will let you extract information more efficiently than scanning a unix folder.

To use this, simply add an ElasticSearchBolt after your FileBolt and set the index prefix to your pool name :

{
  "type": "file_bolt",
  "settings": {
    "destination": "file:///tmp/plain",
    "pool": "mytenant",
    ...
  },
  "storm_settings": {
    "component": "bolt",
    "publish": [
        {
            "stream": "logs",
            "fields": [
              "metadata"
            ]
        }
    ],
    ...
  }
},
{
  "type": "elasticsearch_bolt",
  "settings": {
   "per_stream_settings": [
     {
       "index": {
         "type": "daily",
         "prefix": "mytenant-archive"
       },
       "document_json_field": "metadata",
       "reindex_failed_documents": true,
       "batch_size": 1,
       "error_index": { "type": "daily", "prefix": "mytenant-archive-errors"}

       ...
     }
   ]
  },
  "storm_settings": {
    "subscribe": [
      {
        "component": "bolt",
        "stream": "logs"
      }
    ],
    ...
  }
}

!!! tip Your prefix needs to be of form *-archive-* to apply the ElasticSearch template provided with the standalone. You can find this template in $PUNCHPLATFORM_CONF_DIR/resources/elasticsearch/templates/platform/mapping_archive.json. In particular, this template is necessary to use storectl tool presented later.

Pools and Topics

The punch archiving service lets you think in terms of pools and topics.

  • A pool simply groups all your archive files into high-level isolated sets. Most often you have one pool per tenant. It provides isolation between your data.
  • Inside each pool, a topic groups data that make sense for you to manipulate at once. For example you can have one topic per type of logs (apache_httpd, sourcefire, etc ..), or instead one topic by usage (security, monitoring, etc ..). You are free to choose.

Tip

By default the pool is the tenant, and the topic is the channel. Unless you have good reasons, we recommand you stick to that pattern.

Root directory creation

For the sake of simplicity when testing, the option create_root can be activated. This option allows root directory to be automatically created.
We do not recommend to activate this option for anything else than tests. Setting this option to true may create unwanted directories if destinations are mistyped. Furthermore, not creating root directory allows to check that destination cluster is reachable.

Ceph Object Storage

To save the batch files to a CEPH cluster instead of a filesystem, update the destination field. Besides that change, all the other parameter still apply. Now, instead of folders you will deal with CEPH pools. The Punch extraction tools will provide you with means to scan for pools, topics and files.

In the Filebolt settings, you can define the destination :

Warn

Path to ceph conf must be a absolute path

S3 Storage

To save the batch files to a S3 store, simply provide the http or https S3 store endpoint togewther with the access and secret keys. Here is an example:

    {
      "type": "file_bolt",
      "settings": {
        "destination": "http://192.168.0.5:9000",
        "access_key" : "8WL8LA5MX4VQI0Z9PE0J",
        "secret_key" : "P9IRtbNDonGtThlqyjFwjbOU9O30olsApTZRXJ63",
        ...
      }
    }

Just like any other archive store, you can additionally generate metadata and index them in Elasticsearch. You then endup with a powerful S3 archive with efficient search capabilities provided by the punch elasticsearch metadata index.

Examples

Exactly Onces

Here, the key parameter is the "batch_size" in the Kafka Spout. By setting a batch size, we force the Kafka spout to generate data by batches. Also note that it is not possible to put any bolt between the Kafka spout and the File bolt (not even a Punch bolt).

{
  "dag": [
    {
      "type": "kafka_input",
      "settings": {
        "topic": "<your_input_topic_name>",
        "start_offset_strategy": "last_committed",
        "brokers": "local",
        "batch_size": 1000
      },
      "storm_settings": {
        "executors": 1,
        "component": "kafka_reader",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log",
              "_ppf_timestamp"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "settings": {
        "destination": "file:///tmp/archive-logs/storage",
        "es_cluster_id": "es_search",
        "pool": "apache-httpd-archiving",
        "compression_format" : "GZIP",
        "topic": "httpd",
        "timestamp_field": "_ppf_timestamp",
        "folders_hierarchy_pattern": "%{topic}/%{partition}/%{date}",
        "fields": [
          "log"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "file_bolt",
        "subscribe": [
          {
            "component": "kafka_reader",
            "stream": "logs"
          }
        ]
      }
    }
  ]
}

For the sake of the illustration this example added two useful behavior:

  • compression is activated. You can use "GZIP", "SNAPPY" to compress or "NONE" to disable compression
  • one of the tuple field is used as timestamp as indicated by the timestamp_field property. What this does is to tell the FileBolt to use that incoming field as an indication of the record timestamp. This in turn is useful so as to generate, for each batch file, an earliest and latest timestamps, used for naming files or for indexing purpose. If you do not have such a ready-to-use field, the current time will be used instead.

If you run this topology, you will obtain the following archived files:

[[tmp]$ tree plain/
plain/
└── 2019
    └── 05
        └── 31
            ├── puncharchive-2019-05-31-10h18-2019-05-31-10h33-1559290779566-0.csv.gz
            ├── puncharchive-2019-05-31-10h34-2019-05-31-10h49-1559290779567-0.csv.gz
            ├── puncharchive-2019-05-31-10h50-2019-05-31-11h05-1559290779568-0.csv.gz
            └── puncharchive-2019-05-31-11h06-2019-05-31-11h21-1559290779569-0.csv.gz

3 directories, 4 files
Here is an extract of the first one:

Tip

Note that each line only contains the raw log value. This is what is configured in the file bolt fields settings. If you select more fields, fields are separated by a configurable separator.

Warn

Previous punch releases optionally included a header line. It has been deprecated starting at punch 5.6.2.

As you can guess, the FileBolt uses a naming scheme that allows you to quickly locate ranges of time, and a folder organisation that you can control. Here we did not specify anything the default option were used to produce a time based folder organisation.

<root_destination>/<year>/<month>/<day>/<filename>
Check the advanced options to generate the layout you prefer.

At least once

Take a look at the following Kafka spout published fields. You can see two PunchPlatform reserved fields: "_ppf_partition_id" and "_ppf_partition_offset". When using the at_least_once strategy, compared to the exactly_once the batching logic is dealt with by the File bolt ifself and not by the Kafka spout anymore. This is why we must explicitly declare these two extra fields.

Here is an example of this configuration:

{
  "dag": [
    {
      "type": "kafka_input",
      "settings": {
        "topic": "<your_input_topic_name>",
        "start_offset_strategy": "last_committed",
        "brokers": "local"
      },
      "storm_settings": {
        "executors": 1,
        "component": "kafka_reader",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log",
              "color",
              "_ppf_id",
              "_ppf_timestamp",
              "_ppf_partition_id",
              "_ppf_partition_offset"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "settings": {
        "strategy": "at_least_once",
        "destination": "file:///tmp/archive-logs/storage",
        "pool": "apache-httpd-archiving",
        "topic": "httpd",
        "timestamp_field": "_ppf_timestamp",
        "folders_hierarchy_pattern": "%{topic}/%{partition}/%{date}",
        "compression_format" : "GZIP",
        "fields": [
          "_ppf_id",
          "_ppf_timestamp",
          "log"
        ],
        "tags": ["color"],
        "batch_size": 1000,
        "batch_expiration_timeout_ms": 10000
      },
      "storm_settings": {
        "executors": 1,
        "component": "file_bolt",
        "subscribe": [
          {
            "component": "kafka_reader",
            "stream": "logs"
          }
        ]
      }
    }
  ]
}

The result is now :

[tmp]$ tree indexed/
indexed/
└── mytenant
    └── test_archive
        └── 0
            └── 2019.05.31
                └── 1559293276
                    ├── test_archive-0-1559293276905.data
                    ├── test_archive-0-1559293276906.data
                    ├── test_archive-0-1559293276907.data
                    └── test_archive-0-1559293276908.data

5 directories, 4 files

You can control the folder layout using the folder_hierarchy_pattern property. For example using "folders_hierarchy_pattern": "%{topic}" you will get

One by one

In some situations, you may want to use the FileBolt to generate one file per incoming tuple, the one_by_one strategy can be used for that special case. The example below illustrates this use case.

Note that the Kafka spout can be in "batch" mode or not, both case will work.

{
  "dag": [
    {
      "type": "kafka_input",
      "settings": {
        "topic": "<topic_name>",
        "start_offset_strategy": "last_committed",
        "brokers": "local"
      },
      "storm_settings": {
        "executors": 1,
        "component": "kafka_reader",
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "log",
              "_ppf_id",
              "_ppf_timestamp"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "settings": {
        "strategy": "one_by_one",
        "destination": "file:///tmp/archive-logs/storage",
        "pool": "apache-httpd-archiving",
        "topic": "httpd",
        "timestamp_field": "_ppf_timestamp",
        "folders_hierarchy_pattern": "%{topic}/%{partition}/%{date}",
        "fields": [
          "_ppf_id",
          "_ppf_timestamp",
          "log"
        ]
      },
      "storm_settings": {
        "executors": 1,
        "component": "file_bolt",
        "subscribe": [
          {
            "component": "kafka_reader",
            "stream": "logs"
          }
        ]
      }
    }
  ]
}

In some situation, you may want to write incoming tuples to several files based on a Storm field value. On that case, you can use the filename_field option. When enabled, for each tuple, the Filebolt gets this field value and uses it as the filename. So in this situation, the previous spouts/bolts choose the final file name by setting the right value in this field.

Parameters

  • strategy (string, mandatory)

    select the expected batching strategy

    values: "exactly_once", "at_least_once" or "one_by_one"

  • destination (string, mandatory)

    The url of the destination archive store.
    examples: 'file://', 'ceph_configuration://'. Multiple address can be specified if indexed_filestore is used. Each address should be delimited by a comma. The bolt will store archives in the provided destinations hosted by the node where the topology containing the file bolt is running

  • fields (string array)

    List of the Storm field from the subscribed stream that will be written to file. This allows you to select only a subgroups of available fields if necessary.

    example : ["log", "_ppf_id", "_ppf_timestamp"]

  • pool (string)

    name of the ceph pool of objects (usually, one by tenant).

  • topic (string)

    logical subset of data in the ceph pool. This will in turn lets you extract data within that topic.

  • folders_hierarchy_pattern (string)

    If objects-storage is enabled on a filesystem, batches file are written following a specific folders structure. You can hard-code it with a pure static value such as my_archives/my_technology but we also provide you useful variables: %{topic} for a topic, %{partition} for partition number, %{date} for the earliest tuple timestamp in the current batch (YYYY.mm.dd format) and %{batches_set} for the current batch id. For example, you could use: my_archives/%{batches_set}-%{date}/abc/%{topic}.
    default: "%{topic}/%{date}/%{batches_set}"

  • batches_per_folder (integer)

    If objects-storage is implemented over a file-system, batches of logs will be written on multiple folders and sub-folders. Each sub-folder, if default folders hierarchy pattern is kept, will contain at most this number of batches.
    default: 1000

  • minimum_success_required (integer)

    Integer greater than 0. By default, this parameter is equal to 1. Use this parameter if you want to enforce tuple acknowledgement given the condition that data were successfully written to a specified number of destinations.

  • maximum_success (integer)

    Integer greater than 0. By default, this parameter is equal to the number of destinations provided in "destination" parameter. Use this parameter if you want to limit the amount of destinations used.

  • compression_format (string)

    Use one of the .gz, .snappy. File extension will be added to your batches if you write to a posix filesystem.
    default: "NONE"

  • required_ciphering_content (boolean)

    true to enable ciphering, if true you need an enciphering section as described by the next 4 properties.
    default: false

  • [enciphering].keystore (string)

    Specify keystore absolute path.

  • [enciphering].keystore_pass (string)

    Specify keystore absolute password.

  • [enciphering].keys (dictionary)

    Specify list of key ids (aliases).

  • [enciphering].[keys].key_id (string)

    id (alias) of the public and private rsa key.

  • add_header [LEGACY] (boolean)

    this option has no effect since the version 5.6.2

    if true, a header line will be generated at the top of each file, to indicate the included fields and earlies/latest timestamps
    default: true

  • separator (string)

    the csv fields separator
    default: '__|__'

  • timestamp_field (string)

    The tuple field to be used as a timestamp reference. This field must be a valid timestamp (epoch or ISO). It will be use to generated the batch filename based on the first event timestamp.
    Example: _ppf_timestamp

  • credentials

    If you need basic auth, use a credentials dictionary to provide the user password to use. For example : "credentials" : { "user" : bob, "password" : "bob's password" }

    This settings can be combined with ssl. token parameter can be specified like that: "credentials": { "token": "mytoken", "token_type": "ApiKey" }. Note, if user and password are specified, they will be ignored in favor of token parameter. Token are the base64 encoded string "user:password" if set to type: Basic

  • batch_size (integer)

    Only for "at_least_once", define the maximum number of events stored in a batch before being flushed.

    default: 1000

  • batch_expiration_timemout_ms (integer)

    Only for "at_least_once", set a batch timeout in milliseconds. If no new events are added to a batch for more than this period, the batch is flushed.

    default: 10000

  • tags (string array)

    Only works in "at_least_once" batching mode. This parameter modify the batch grouping strategy to gather events based on their "tags" values. To each tag must correspond a subscribed Storm field. Each batch will be written in a separate file.

Extraction Debug Tool

The punch ships with a companion tool storectl that lets you easily check your archive status. Here are a few examples. The following command returns a summary of a pool of which metadata is in index mytenant-archive-*.

storectl pool-status \
    --device-address file:///tmp/indexed \
    --pool mytenant --es-index-pattern mytenant-archive

Batch Count: 2
Cluster free: 722.0 GB
Pool usage: 99.2 kB
Cluster usage: 212.5 GB

Here is how to get details abouts topics:

storectl list-topics \
    --from-date -7d --to-date now \
    --pool mytenant --es-cluster-name es_search \
    --es-index-pattern mytenant-archive --details

topic_number: 1
topics: 
  httpd: 
   uncompressed_size: 1.4 MB
   batch_size: 1093
   compression_factor: 14.941518164812445
   size: 99.2 kB
   latest_date: 2020-02-21T15:58:08.324+01:00[Europe/Paris]
   batch_count: 2
   compression_ratio: 0.06692760327093258
   earliest_date: 2020-02-21T15:57:49.844+01:00[Europe/Paris]
pool: mytenant

Scaling Up Considerations

Archiving topologies are designed to scale in a way to write batches of files in a safe and idempotent way. The question is : how do you scale to achieve enough parallelism to write lots of data coming from multi partitioned topics ?

In all case, a single file bolt instance must be in charge of handling a single batch. The reason is simply that a batch is nothing more that a continuous sequence of record from the same Kafka partition. Having a complete batch handled by a single instance of the file bolt guarantees that even after a failure/replay, the same batches will be regenerated with the same identifiers and the same content.

You have four ways to enforce that one-batch-per-instance condition.

  1. Use a single file bolt instance : this is of course the easiest solution to start with. That single instance will receive all the data from all the partition. The file bolt is designed in a way to achieve parallel concurrent writing of several batches. This setup can thus be efficient enough for many use cases. Try it first.
  2. Use several instances of the file bolt on top of a single kafka spout. That works but use the grouping strategy to enforce that each bolt will receive all the records from the same Kafka partition.
  3. Use one file bolt instance per partition. This is easy to configure, deploy as many instance of the Kafka spout, one dedicated for each partition, and setup a file bolt for each of the spout.
  4. if you ultimately need to deploy more bolts than you have partitions, use the grouping strategy.

Prefer the simplest solutions. A single process topology deployed on a multi-core server can achieve a high throughput.

Java documentation

To see the more in-depth configuration, refer to the file bolt javadoc documentation.