Skip to content

Extracting

Once archives are stored, with metadata indexed has described in previous steps, we may want to replay this data. This page explains how to do it.

Prerequisites

You need to have metadata stored somewhere. The extraction processed if based on that, this is why this indexation is so important, if not necessary. It's the metadata that contains the information of where is the archive stored and how it is encoded.

Components

Extracting logs is done using a simple Punchline. Two key elements are required :

  • An Extraction Input : First, we need to extract the Metadata from Elasticsearch. This is where we'll be able to set filters to select the batches we're interested in. We're using Elasticsearch here, but any source can be used to store your metadata. Elasticsearch is here recommended as it provides a great query interface, allowing us to select the metadata we're interested in.

  • An Archive Reader : This is where the extraction truly happens. The archive reader will use the metadata provided by the Elasticsearch Input and read each file, line by line, emitting a tuple for each line. The Data can be of any encoding, any compression, any back end. Everything archived by the FileOutput can be extracted using the Archive Reader. All it needs is correctly indexed metadata.

Configuration

Extraction Input

The configuration of this node is pretty basic. The most tricky part is filtering your data, but it's actually nothing more than a classic Elasticsearch query. All you need is an understanding of the Metadata fields and the ES query language.

Let's start with the basic settings :

  type: extraction_input
  component: input
  settings:
    index: mytenant-archive-*
  publish:
  - stream: metadata
    fields:
    - metadata

This is the minimum settings you'll need. The index containing your metadata. This node here will retrieve every metadata you have indexed.

Now, you may want to retrieve only metadata from a specific period, of a specific topic. To do that, we'll use the query setting. Even though every field of the metadata can be used to narrow your selection, we'll only present here the main ones.

The most basic selection is to choose a period. To do that, you need to filter on two fields : batch.latest_ts and batch.earliest_ts. These fields represent the time period of logs contained in this batch. They either linked to the processing time of each log or to the value of the timestamp_field is this one has been set during archiving.

For example to get all logs from last seven days :

query:
  range:
    batch.latest_ts:
      gte: now-7d/d
      lt: now/d

You may want to filter on pool, topic or tags :

query:
  bool:
    must:
      - term:
          archive.pool: mytenant
      - term:
          tags.topic: httpd
      - term:
          tags.color: red

Check the Metadata fields for more.

Archive Reader

Dealing with the Archive Reader configuration is quite simple. Since all the information about your batch comes from the corresponding metadata, all you need to do is provide the device to be used (or let the archive reader use the first one provided in the metadata), and the corresponding credentials if required. For each batch the archive reader will try to open the file in this device, will decode the content and return a dataset of string corresponding to the content's lines. If you do not provide a device address, the first one provided in the metadata will be used.

For example, to read into filesystem :

type: archive_reader_node
settings:
  device_address: file:///tmp/archive-logs/storage
subscribe:
- component: input
  stream: metadata
publish:
- stream: data
  fields:
  - line

Or to read into S3 :

type: archive_reader_node
settings:
  device_address: http://127.0.0.1:9000
  access_key: minioadmin
  secret_key: minioadmin
subscribe:
- component: input
  stream: metadata
publish:
- stream: data
  fields:
  - line

Note that for Avro/Parquet decoding, the schema used when archiving is stored in the metadata, but you can override it with your own schema. Just set it in the schema setting :

type: archive_reader_node
settings:
  device_address: http://127.0.0.1:9000
  access_key: minioadmin
  secret_key: minioadmin
  schema:
    type: record
    namespace: org.thales.punch
    name: example
    fields:
    - name: _ppf_id
      type: string
    - name: _ppf_timestamp
      type: string
    - name: ip
      type: string
subscribe:
- component: input
  stream: metadata
publish:
- stream: data
  fields:
  - line

The Archive Reader provides a match_string option. This option will filter the lines when reading them, meaning you don't have to load the whole file and use a punch node afterwards to filter out the one you really want.

Full configuration example

This configuration allows you to extract logs archived using the apache_httpd channel.

version: "6.0"
type: punchline
runtime: storm
dag:

- type: extraction_input
  component: input
  settings:
    index: mytenant-archive-*
    query:
      range:
        batch.latest_ts:
          gte: now-7d/d
          lt: now/d
  publish:
  - stream: metadata
    fields:
    - metadata

- type: archive_reader_node
  component: reader
  settings: {}
  subscribe:
  - component: input
    stream: metadata
  publish:
  - stream: data
    fields:
    - line
  - stream: _ppf_errors
    fields:
    - metadata

- type: kafka_output
  settings:
    topic: mytenant_apache_httpd_extraction_results
    encoding: lumberjack
  subscribe:
  - component: reader
    stream: data


- type: elasticsearch_output
  settings:
    per_stream_settings:
    - stream: _ppf_errors
      index:
        type: daily
        prefix: mytenant-archive-extraction-errors-
      document_json_field: metadata
  subscribe:
  - component: reader
    stream: _ppf_errors

metrics:
  reporters:
  - type: kafka
settings:
  topology.worker.childopts: -server -Xms256m -Xmx256m

We're basically reading last week's metadata from elasticsearch, then read it using the archive reader (no credentials here so no settings necessary), and send the results to a new kafka topic. If an error occurs, the failed metadata are indexed in Elasticsearch. The whole process is monitored using a kafka reporter. You should be able to see metrics, like how many lines or batches has been successfully read, how much size does that represent, and how many failures you encountered, in your tenant metric index. Archive Reader metrics can be identified with their name starting wit archive.reader.

You're now ready to replay your data ! You can apply a punch node to extract the fields you're interested in, or you can send them to kafka and process them in another punchline. It's up to you !