Extracting
Once archives are stored, with metadata indexed has described in previous steps, we may want to replay this data. This page explains how to do it.
Prerequisites¶
You need to have metadata stored somewhere. The extraction processed if based on that, this is why this indexation is so important, if not necessary. It's the metadata that contains the information of where is the archive stored and how it is encoded.
Components¶
Extracting logs is done using a simple Punchline. Two key elements are required :
-
An Extraction Input : First, we need to extract the Metadata from Elasticsearch. This is where we'll be able to set filters to select the batches we're interested in. We're using Elasticsearch here, but any source can be used to store your metadata. Elasticsearch is here recommended as it provides a great query interface, allowing us to select the metadata we're interested in.
-
An Archive Reader : This is where the extraction truly happens. The archive reader will use the metadata provided by the Extraction Input and read each file, line by line, emitting a tuple for each line. The Data can be of any encoding, any compression, any back end. Everything archived by the FileOutput can be extracted using the Archive Reader. All it needs is correctly indexed metadata.
Configuration¶
Extraction Input¶
The configuration of this node is pretty basic. The most tricky part is filtering your data, but it's actually nothing more than a classic Elasticsearch query. All you need is an understanding of the Metadata fields and the ES query language.
Let's start with the basic settings :
type: extraction_input
component: input
settings:
index: mytenant-archive-*
publish:
- stream: metadata
fields:
- metadata
This is the minimum settings you'll need. The index containing your metadata. This node here will retrieve every metadata you have indexed.
Now, you may want to retrieve only metadata from a specific period, of a specific topic. To do that, we'll use the query
setting. Even though every field of the metadata can be used to narrow your selection, we'll only present here the main
ones.
The most basic selection is to choose a period. To do that, you need to filter on two fields :
batch.latest_ts
and batch.earliest_ts
. These fields represent the time period of logs contained in this batch.
They either linked to the processing time of each log or to the value of the timestamp_field
is this one has been set
during archiving.
For example to get all logs from last seven days :
query:
query:
range:
batch.latest_ts:
gte: now-7d/d
lt: now/d
You may want to filter on pool, topic or tags :
query:
query:
bool:
must:
- term:
archive.pool: mytenant
- term:
tags.topic: httpd
- term:
tags.color: red
Check the Metadata fields for more.
Archive Reader¶
Dealing with the Archive Reader configuration is quite simple. Since all the information about your batch comes from the corresponding metadata, all you need to do is provide the device to be used (or let the archive reader use the first one provided in the metadata), and the corresponding credentials if required. For each batch the archive reader will try to open the file in this device, will decode the content and return a dataset of string corresponding to the content's lines. If you do not provide a device address, the first one provided in the metadata will be used.
For example, to read into filesystem :
type: archive_reader_node
settings:
device_address: file:///tmp/archive-logs/storage
subscribe:
- component: input
stream: metadata
publish:
- stream: data
fields:
- line
Or to read into S3 :
type: archive_reader_node
settings:
device_address: http://127.0.0.1:9000
access_key: minioadmin
secret_key: minioadmin
subscribe:
- component: input
stream: metadata
publish:
- stream: data
fields:
- line
Note that for Avro/Parquet decoding, the schema used when archiving is stored in
the metadata, but you can override it with your own schema. Just set it in the schema
setting :
type: archive_reader_node
settings:
device_address: http://127.0.0.1:9000
access_key: minioadmin
secret_key: minioadmin
schema:
type: record
namespace: org.thales.punch
name: example
fields:
- name: _ppf_id
type: string
- name: _ppf_timestamp
type: string
- name: ip
type: string
subscribe:
- component: input
stream: metadata
publish:
- stream: data
fields:
- line
The Archive Reader provides a match_string
option. This option will filter the lines when reading them, meaning you
don't have to load the whole file and use a punch node afterwards to filter out the one you really want.
Full configuration example¶
This configuration allows you to extract logs archived using the apache_httpd
channel.
version: "6.0"
type: punchline
runtime: storm
dag:
- type: extraction_input
component: input
settings:
index: mytenant-archive-*
query:
query:
range:
batch.latest_ts:
gte: now-7d/d
lt: now/d
publish:
- stream: metadata
fields:
- metadata
- type: archive_reader_node
component: reader
settings: {}
subscribe:
- component: input
stream: metadata
publish:
- stream: data
fields:
- line
- stream: _ppf_errors
fields:
- metadata
- type: kafka_output
settings:
topic: mytenant_apache_httpd_extraction_results
encoding: lumberjack
subscribe:
- component: reader
stream: data
- type: elasticsearch_output
settings:
per_stream_settings:
- stream: _ppf_errors
index:
type: daily
prefix: mytenant-archive-extraction-errors-
document_json_field: metadata
subscribe:
- component: reader
stream: _ppf_errors
metrics:
reporters:
- type: kafka
settings:
topology.worker.childopts: -server -Xms256m -Xmx256m
We're basically reading last week's metadata from elasticsearch, then read it using the archive reader (no credentials
here so no settings necessary), and send the results to a new kafka topic. If an error occurs, the failed metadata are
indexed in Elasticsearch. The whole process is monitored using a kafka reporter. You should be able to see metrics, like
how many lines or batches has been successfully read, how much size does that represent, and how many failures you
encountered, in your tenant metric index. Archive Reader metrics can be identified with their name starting wit
archive.reader
.
You're now ready to replay your data ! You can apply a punch node to extract the fields you're interested in, or you can send them to kafka and process them in another punchline. It's up to you !