Skip to content

Archiving

We've seen how the archiving service is useful, let's see now how it is executed. This page will give you insight and examples on how to create archives of your logs, with the proper indexing method to be able to explore and extract it later.

Components

Archiving logs is done using a Storm Punchline. Three key elements are required :

  • A Kafka Input : The logs you want to archive needs to come from a kafka topic. That's because we use the kafka commit system to ensure all your logs will be processed "at least once". Let's explain.
    Logs are archived in batches. If your punchline fails before the end of a batch, meaning before logs are successfully written to your back-end, you'll want your punchline to re-process these non-written logs when it'll start again. To do that, we'll commit the logs offsets to the kafka partition only after their batch has been successfully written. Therefore, when restarting the punchline, we'll start from the "last committed" offset, replaying the logs not successfully written.

  • A File Output : This is where the archiving truly happens. The File Output receives the logs, select the fields to archive, encodes them in the correct format, compresses the results if needed, stores them in batches and writes these batches to your back end. Every aspect of this process can be customized by providing the correct settings in your configuration. We'll show a few examples here, but you can take a look at the full component documentation for more information.

  • An Elasticsearch Output : When the File Output successfully writes a batch, it publishes this batch's metadata, containing all the information needed in the future to explore and extract this archive. This metadata needs to be stored in an elasticsearch index in order to be processed or explored later.

Configuration

Kafka Input

As mentioned before, Kafka is the key to make sure you won't lose any data. Two things are required here. First of all, you must set your start_offset_strategy to last_committed to make sure your punchline will replay non-committed logs in case of failure. Next, you need to publish two reserved fields : _ppf_partition_id and _ppf_partition_offset. These two are generated by the Kafka Input and will be used by the File Output to group logs into batches and name these batches. Note that logs are grouped by partition, meaning a batch will never contain logs from different partitions.

Here's a simple example of a configuration :

- type: kafka_input
  settings:
    topic: my_input_topic
    start_offset_strategy: last_committed
  publish:
    - stream: logs
      fields:
        - log
        - _ppf_partition_id
        - _ppf_partition_offset

File Output

Here's where you configure your archiving process. We'll go step by step, going through the main settings.

Back end

Let's start with the basics : the destination back end. The File Output can write to three different type of back-end : Filesystem, Ceph and S3. Now, the interesting thing is that you'll only need to provide a few settings to configure your back-end. All other settings remain the same.

The destination setting will contain the address to your back end. The prefix you use will define the type of back-end you want. For a filesystem, you'll use the file:// followed by the absolute path of your destination folder. For Ceph, you'll use ceph_configuration:// followed by the absolute path to your ceph configuration file. Finally, for S3, you'll use http:// follow by the endpoint of your S3 cluster.

The credentials needed yor your back-end must also appear in the settings. Ceph's user key is user and S3 credentials are access_key and secret_key.

Here are some examples :

# Filesystem
destination: file:///tmp/archive-logs/storage
# S3 
destination: http://127.0.0.1:9000
access_key: minioadmin
secret_key: minioadmin
# Ceph
destination: ceph_configuration:///etc/ceph/main-client.conf
user: client

You even have the possibility to set multiple destinations. Simply list your back-ends addresses in the destinations setting :

You can put any kind of back-end but keep in mind that all the destinations listed must be of the same type.

The File Output handle load balancing using the maximum_write and minimum_write settings. This way, you'll be sure to write your data to at least minimum_write destinations but to at most maximum_write destinations. For each batch, destinations are selected randomly to ensure archives are spread over all the destinations. See FileOutput documentation for more information.

destinations:
  - file:///tmp/archive-logs/storage1
  - file:///tmp/archive-logs/storage2
  - file:///tmp/archive-logs/storage3
  - file:///tmp/archive-logs/storage4
minimum_writes: 2
maximum_writes: 3

Destination

We now need to set where we want our data to be stored. We've already configured our backend, it's now time to name our file.

First of all, let's understand the concept of pool. This setting allows you to select the top folder (filesystem), bucket (S3) or pool (Ceph) you want to write your file into. By default, this pool will be set with your tenant , and it is recommended to keep it that way for multi-tenancy purpose.

The main setting that defines your filename will be the file_prefix_pattern. It allows you to set the filename of your choice, with some parameters specific to each batch. The list of these parameters can be found in the FileOutput documentation. Of course, you can insert folder delimiter / in your pattern to create a folder hierarchy.

One very useful setting is the tags one. You can provide a list of fields to group logs on the corresponding values. These fields may not be listed in the ones you want to archive and only used to group logs. You can use these fields in your filename, using %{tag:<name>}. By default, the topic tag already exists. Its default value is your channel name.

To ensure uniqueness of each filename, a unique ID is generated and put right before the extension of the file.

Here's an example to S3 bucket mytenant sorted by topic, date and ip field :

pool: mytenant
topic: ip_detection
tags:
  - ip
file_prefix_pattern: %{topic}/%{tag:ip}/%{date:YYYY-MM}/puncharchive-%{partition}-%{offset}

Content

Now let's see how you can configure the content of your batch. The File Output gives you control on what you want to archive and how you want it to be encoded.

To set how many logs will be in your batch, set the batch_size setting. When this limit will be reached, batch will be written to destination. Two more settings allow you to control when your batch will be written. The batch_expiration_timeout allows to force batch writing after some time of inactivity, even if the batch size is not reached. The batch_life_timeout allows to force batch writing some time after batch creation, even if the batch size is not reached. Both these timeouts can be set to 0 to deactivate them. They are quite useful when you want to set big batch size to limit the number of archives but you might face some inactivity periods and don't want to wait a long time for the batch to get filled.

batch_size: 10000
batch_expiration_timeout: 30s
batch_life_timeout: 10m
You may want to select only a few fields to archive. To do this, provide the fields' names in the fields settings. The provided fields must exist, or an exception will be raised, and the punchline will stop.
fields:
  - ip
  - _ppf_id
  - _ppf_timestamp
The FileOutput handle different kinds of encoding : csv, json, avro and parquet. You can pick one with the encoding setting. Note that each encoding has its own set of parameters. Csv encoding allows you to set the field separator used with the separator settings. Avro and Parquet require an Avro Schema in the schema setting. See FileOutput documentation for more information. Most of the time, you'll want to compress your data. This can be done using the compression_format setting. For csv, json and parquet, gzip value is handled, while Avro uses snappy compression.

Here is a csv example :

encoding: csv
compression_format: gzip
separator: __|__
Here is an avro example :
encoding: avro
compression_format: snappy
schema:
  type: record
  namespace: org.thales.punch
  name: example
  fields:
    - name: _ppf_id
      type: string
    - name: _ppf_timestamp
      type: string
    - name: ip
      type: string

Additional settings

In case you're doing filesystem, you can set the streaming setting to true to avoid loading a huge amount of data in RAM and directly write it in the target file as it comes. See FileOutput documentation for more information. Note that Avro and Parquet automatically works in streaming mode.

A timestamp_field setting allows you to provide a timestamp field to index the earliest timestamp and latest timestamp for each batch. This information will be used later for extraction.

Example

Here is an example on how to write a csv to Minio :

- type: file_output
  settings:
    # Back End
    destination: http://127.0.0.1:9000
    access_key: minioadmin
    secret_key: minioadmin

    # Destination
    pool: mytenant
    topic: ip_detection
    tags:
      - ip
    file_prefix_pattern: %{topic}/%{tag:ip}/%{date:YYYY-MM}/puncharchive-%{partition}-%{offset}  

    # Content
    batch_size: 10000
    batch_expiration_timeout: 30s
    batch_life_timeout: 10m
    fields:
      - ip
      - _ppf_id
      - _ppf_timestamp
    encoding: csv
    compression_format: gzip
    separator: __|__

    # Additional settings
    timestamp_field: _ppf_timestamp
  subscribe:
    - component: kafka_input
      stream: logs
  publish:
    - stream: logs
      fields: 
        - metadata

ElasticSearch Output

The final step is to index metadata published by the File Output into an Elasticsearch index.

The only indication for this step is to choose an index name matching pattern *-archive-*, in order for elasticsearch to use the metadata mapping provided by the punch.

For a full metadata example and description, see the File Output documentation.

- type: elasticsearch_output
  settings:
    index:
      type: daily
      prefix: mytenant-archive-
    document_json_field: metadata
    batch_size: 1
    reindex_failed_documents: true
    error_index:
      type: daily
      prefix: mytenant-archive-errors
  subscribe:
    - component: file_output
      stream: metadatas

We can only insist on how important this step is. Indexing metadata is mandatory for extraction and exploration. Getting this metadata back is extremely complicated, if not impossible, and extracting non-indexed archives always leads to trouble.

Full Example

Here's an example of a punchline storing csv with gzip compression into minio. Data extracted from kafka contains three fields ip, _ppf_id and _ppf_timestamp :

version: "6.0"
type: punchline
runtime: storm
tenant: mytenant
channel: ip_detection
dag:
  - type: kafka_input
    settings:
      topic: ip_adresses_topic
      start_offset_strategy: last_committed
    publish:
      - stream: logs
        fields:
          - ip
          - _ppf_id
          - _ppf_timestamp
          - _ppf_partition_id
          - _ppf_partition_offset
  - type: file_output
    settings:
      destination: http://127.0.0.1:9000
      access_key: minioadmin
      secret_key: minioadmin
      pool: mytenant
      topic: ip_detection
      tags:
        - ip
      file_prefix_pattern: %{topic}/%{tag:ip}/%{date:YYYY-MM}/puncharchive-%{partition}-%{offset}
      batch_size: 10000
      batch_expiration_timeout: 30s
      batch_life_timeout: 10m
      fields:
        - ip
        - _ppf_id
        - _ppf_timestamp
      encoding: csv
      compression_format: gzip
      separator: __|__
      timestamp_field: _ppf_timestamp
    subscribe:
      - component: kafka_input
        stream: logs
    publish: 
      - stream: metadatas
        fields: 
          - metadata
  - type: elasticsearch_output
    settings:
      index:
        type: daily
        prefix: mytenant-archive-
      document_json_field: metadata
      batch_size: 1
      reindex_failed_documents: true
      error_index:
        type: daily
        prefix: mytenant-archive-errors
    subscribe:
      - component: file_output
        stream: metadatas
metrics:
  reporters:
    - type: kafka

Monitoring

Once our punchline configured and started, we may want to have some insight on how it's running. The File Output node provides a few key metrics to understand how the archiving is doing :

  • The amount of Batches flushed
  • The amount of Tuple processed. Note that this metric increase for every tuple processed, even if its batch has not been flushed yet.
  • The amount of data written. This indicates the bytes flushed.
  • The amount of tuple per batch. This allows you to monitor if your batch filling is optimized, or if you need to set a different timeout or batch size to reach the batch filling you want.
  • The tuple processing time. A quick insight on the time require to process a tuple.
  • The batch flushing time. A quick insight on the time require to flush a batch. This can highlight some I/O issues when writing to your device, for example.
  • The compression rate. Allow you to check the compression efficiency. Of course, this is only updated when flushing a batch.

To use these metrics, make sure you include a Kafka reporter in your punchline configuration, as shown in the previous example :

metrics:
  reporters:
    - type: kafka
For these metrics to reach elasticsearch, you must start your platform monitoring channel :
channelctl --tenant platform start --channel monitoring

To check on these metrics, a dashboard is provided with the punch resources. You can import it using the punchplatform-setup-kibana.sh command :

punchplatform-setup-kibana.sh --import --pattern archiving_monitoring.ndjson

Now, you should find this dashboard in kibana.

image

You can select a Pool and Topic to have more precise information.

Exploring

You may want more detailed information about your pools, topics or object. The punch provides a tool for that.

The storectl command, allows you to : check your pool/topic status, to list existing topics in a pool or existing objects in a topic, dump objects or even full topics. You can find full examples in the storectl documentation.