Archive Reader Node¶
Overview¶
The archive_reader
node extracts archived data from S3, Ceph or filesystem, using metadata stored in Elasticsearch.
Warning
The input stream suscribed MUST contain a "metadata" field. Subscribing to a stream without metadata field will raise an exception and make your punchline fail.
Filters¶
The Archive Reader will read a batch for every corresponding Metadata received as input. Therefore, filters and selections about dates, pool, tags etc... must be applied in the Elasticsearch query.
type: extraction_input
component: input
settings:
index: mytenant-archive-*
nodes:
- localhost
publish:
- stream: metadata
fields:
- metadata
Once you have filtered your metadata, you can now apply some filters on your data.
Two options are available : match_bloom
and match_string
.
match_bloom
allows you to filter out archives matching a bloom
filter value. Of course, you must have archived your data using bloom filter to use this option.
This option will filter out some files, not the lines in those files.
match_string
allows you to filter out lines containing a specific string. This happens after the bloom filter.
Each line of the opened archives will be processed and only the ones containing the provided string will be kept
in the output dataset.
Credentials¶
When using the archive reader, most of the settings needed for the extraction are contained in the metadata. However,
the credentials are obviously not provided in the metadata. Therefore, you need to provide them in your node settings
(user
for ceph, access_key
and secret_key
for S3). You can also provide some hadoop_settings
if you're using
avro/parquet encoding.
Note
The device address used is the first one found in the metadata. You can force a device_address by providing it in the node settings, but it is your responsability to ensure the archive you're reading are present on the provided address.
Avro/Parquet¶
The avro schema of your archives are provided in the metadata. Therefore, there is no additional settings required for
avro/parquet encoding. However, you can provide a schema
in the node settings to force its use. You can also set some
hadoop_settings
for your extraction.
Error handling¶
To keep trace of the failing metadata while continuing to extract the remaining data, you need to define a _ppf_errors
publish stream. Your failing metadata will automatically be published here, and the extraction will continue. If no
stream is defined, the tuple is failed.
Fault tolerance¶
A Success stream is provided to keep track of already processed metadata in case of punchline failure. You can find a full description of this feature in the extraction documentation.
Subscribe and Publish¶
The archive reader expects and provide some reserved fields and streams.
While you can subscribe from any component, any stream containing any field, a metadata
field is expected as input.
You can choose another field name using the metadata_field
setting.
On every publish stream, the archive reader handles the declared fields with a simple logic : if the field is one of the
input fields, the corresponding input value is provided, allowing propagating input values along the pipeline. If the
field is a reserved field, the corresponding value is set. For any other field, the main value is set.
Here are the different streams, and their reserved fields :
- Line stream : this stream can take any name and will output the extracted line. It allows an optional _ppf_metadata
field containing the batch metadata for this line.
- Error stream : this stream uses the name _ppf_errors
and will emit the metadata when an exception occurs. It allows
a _ppf_error_message
field containing the exception message.
- Success stream : this stream uses the name _ppf_successes
and will emit the metadata when an extraction successfully
ends. It allows a _ppf_lines_cnt
field containing the amount of lines read and a _ppf_bytes_cnt
field containing the amount
bytes read.
Example¶
version: 6.0
type: punchline
runtime: storm
meta:
tenant: mytenant
channel: archive_reader
vendor: archive_reader
dag:
- type: extraction_input
component: input
settings:
index: mytenant-archive-*
nodes:
- localhost
publish:
- stream: metadata
fields:
- metadata
- type: archive_reader_node
component: reader
settings:
device_address: file:///tmp/archive-logs/storage
subscribe:
- component: input
stream: metadata
publish:
- stream: data
fields:
- data
- stream: _ppf_errors
fields:
- metadata
- type: kafka_output
settings:
topic: extraction_results
encoding: lumberjack
producer.acks: all
subscribe:
- component: reader
stream: data
- type: elasticsearch_output
settings:
per_stream_settings:
- stream: _ppf_errors
index:
type: daily
prefix: mytenant-archive-extraction-errors-
document_json_field: metadata
subscribe:
- component: reader
stream: _ppf_errors
settings: {}
This example can be used to read files archived using the apache_httpd channel. It's composed of 3 elements : - An Extraction input sending a query to Elasticsearch to get metadata. - An Archive Reader reading files from the provided device, using metadata information. - A Kafka Output sending the resulting lines to a kafka topic. - An Elasticsearch Output indexing the failed metadata.
Parameters¶
device_address
: String > Description: [Optional] Address to the device where data is stored. Start with file://, ceph_configuration://, http://...metadata_field
: StringDescription: [Optional] Metadata field name in input tuple. Default to "metadata".
user
: String > Description: [Optional] The user used to access Ceph cluster.access-key
: String > Description: [Optional] Access key for MinIO cluster.secret-key
: String > Description: [Optional] Secret key for MinIO cluster.match_string
: String > Description: [Optional] String to be looked for in the extracted lines.match_bloom
: String > Description: [Optional] Value to be looked for in the bloom filter fields set when archiving.schema
: String > Description: [Optional] Avro schema to be used instead of the one provided in the metadata.hadoop_settings
: String > Description: [Optional] Additional hadoop settings when extracting avro/parquet.charset
: StringDescription: [Optional] Charset to use when extracting bytes from avro/parquet.