Skip to content

Archive Reader Node

Overview

The archive_reader node extracts archived data from S3, Ceph or filesystem, using metadata stored in Elasticsearch.

Warning

The input stream suscribed MUST contain a "metadata" field. Subscribing to a stream without metadata field will raise an exception and make your punchline fail.

Filters

The Archive Reader will read a batch for every corresponding Metadata received as input. Therefore, filters and selections about dates, pool, tags etc... must be applied in the Elasticsearch query.

type: extraction_input
component: input
settings:
  index: mytenant-archive-*
  nodes:
  - localhost
publish:
  - stream: metadata
    fields:
    - metadata

Once you have filtered your metadata, you can now apply some filters on your data.

Two options are available : match_bloom and match_string.

match_bloom allows you to filter out archives matching a bloom filter value. Of course, you must have archived your data using bloom filter to use this option. This option will filter out some files, not the lines in those files.

match_string allows you to filter out lines containing a specific string. This happens after the bloom filter. Each line of the opened archives will be processed and only the ones containing the provided string will be kept in the output dataset.

Credentials

When using the archive reader, most of the settings needed for the extraction are contained in the metadata. However, the credentials are obviously not provided in the metadata. Therefore, you need to provide them in your node settings (user for ceph, access_key and secret_key for S3). You can also provide some hadoop_settings if you're using avro/parquet encoding.

Note

The device address used is the first one found in the metadata. You can force a device_address by providing it in the node settings, but it is your responsability to ensure the archive you're reading are present on the provided address.

Avro/Parquet

The avro schema of your archives are provided in the metadata. Therefore, there is no additional settings required for avro/parquet encoding. However, you can provide a schema in the node settings to force its use. You can also set some hadoop_settings for your extraction.

Error handling

To keep trace of the failing metadata while continuing to extract the remaining data, you need to define a _ppf_errors publish stream. Your failing metadata will automatically be published here, and the extraction will continue. If no stream is defined, the tuple is failed.

Fault tolerance

A Success stream is provided to keep track of already processed metadata in case of punchline failure. You can find a full description of this feature in the extraction documentation.

Subscribe and Publish

The archive reader expects and provide some reserved fields and streams.

While you can subscribe from any component, any stream containing any field, a metadata field is expected as input. You can choose another field name using the metadata_field setting.
On every publish stream, the archive reader handles the declared fields with a simple logic : if the field is one of the input fields, the corresponding input value is provided, allowing propagating input values along the pipeline. If the field is a reserved field, the corresponding value is set. For any other field, the main value is set.

Here are the different streams, and their reserved fields : - Line stream : this stream can take any name and will output the extracted line. It allows an optional _ppf_metadata field containing the batch metadata for this line. - Error stream : this stream uses the name _ppf_errors and will emit the metadata when an exception occurs. It allows a _ppf_error_message field containing the exception message. - Success stream : this stream uses the name _ppf_successes and will emit the metadata when an extraction successfully ends. It allows a _ppf_lines_cnt field containing the amount of lines read and a _ppf_bytes_cnt field containing the amount bytes read.

Example

version: 6.0
type: punchline
runtime: storm
meta:
  tenant: mytenant
  channel: archive_reader
  vendor: archive_reader
dag:

- type: extraction_input
  component: input
  settings:
    index: mytenant-archive-*
    nodes:
    - localhost
  publish:
  - stream: metadata
    fields:
    - metadata

- type: archive_reader_node
  component: reader
  settings:
    device_address: file:///tmp/archive-logs/storage
  subscribe:
  - component: input
    stream: metadata
  publish:
  - stream: data
    fields:
    - data
  - stream: _ppf_errors
    fields:
    - metadata

- type: kafka_output
  settings:
    topic: extraction_results
    encoding: lumberjack
    producer.acks: all
  subscribe:
  - component: reader
    stream: data

- type: elasticsearch_output
  settings:
    per_stream_settings:
    - stream: _ppf_errors
      index:
        type: daily
        prefix: mytenant-archive-extraction-errors-
      document_json_field: metadata
  subscribe:
  - component: reader
    stream: _ppf_errors
settings: {}

This example can be used to read files archived using the apache_httpd channel. It's composed of 3 elements : - An Extraction input sending a query to Elasticsearch to get metadata. - An Archive Reader reading files from the provided device, using metadata information. - A Kafka Output sending the resulting lines to a kafka topic. - An Elasticsearch Output indexing the failed metadata.

Parameters

  • device_address: String > Description: [Optional] Address to the device where data is stored. Start with file://, ceph_configuration://, http://...
  • metadata_field : String

    Description: [Optional] Metadata field name in input tuple. Default to "metadata".

  • user: String > Description: [Optional] The user used to access Ceph cluster.
  • access-key: String > Description: [Optional] Access key for MinIO cluster.
  • secret-key: String > Description: [Optional] Secret key for MinIO cluster.
  • match_string : String > Description: [Optional] String to be looked for in the extracted lines.
  • match_bloom : String > Description: [Optional] Value to be looked for in the bloom filter fields set when archiving.
  • schema : String > Description: [Optional] Avro schema to be used instead of the one provided in the metadata.
  • hadoop_settings : String > Description: [Optional] Additional hadoop settings when extracting avro/parquet.
  • charset : String

    Description: [Optional] Charset to use when extracting bytes from avro/parquet.