Extraction Reliability
We now know the basics of how to extract archives. However, we may face a few practical issues when it comes to long extraction.
Fault Tolerance¶
For any reason, the punchline may fail during an extraction. The punchline will start again, and all the documents will be read again, which may be inconvenient when extracting a lot of batches.
To overcome this issue, two steps are required : creating a task index and use the archive reader's success stream.
Before the extraction, transfer the targeted metadata to a "task index" with an additional boolean field
(for example named processed
) set to false. This flag field will allow us to mark metadata which has already been
processed by the archive reader. We do so in a dedicated index to keep the source metadata intact.
When getting metadata from the task index, we'll want to filter out the already processed metadata. This can be easily done using a query in the ExtractionInput settings :
type: extraction_input
settings:
index: task-index
id_column: id
nodes:
- localhost
query:
query:
bool:
must:
term:
processed: false
publish:
- stream: metadata
fields:
- metadata
Note that we also provide an alias for the id column using the setting id_column
. This allows us to get the document
id in order to override the processed metadata later.
Now, our archiver reader needs to publish a success stream. To do so, use the reserved stream _ppf_successes
. This
stream publishes the metadata when batch has been successfully extracted :
- type: archive_reader_node
settings: { }
subscribe:
- component: extraction_input
stream: metadata
publish:
- stream: _ppf_successes
fields:
- metadata
The published metadata needs to go through a punchlet node, in order to mark it has processed and move the document id out of the metadata document.
- type: punchlet_node
settings:
punchlet_code: '{
[_ppf_successes][metadata][processed] = true;
[_ppf_successes][id] = [_ppf_successes][metadata][id];
remove([_ppf_successes][metadata][id]);
}'
subscribe:
- component: archive_reader_node
stream: _ppf_successes
publish:
- stream: _ppf_successes
fields:
- metadata
- id
Finally, we can override the metadata in the task index to mark it has processed :
- type: elasticsearch_output
settings:
per_stream_settings:
- stream: _ppf_successes
index:
type: constant
value: task-index
document_json_field: metadata
document_id_field: id
batch_size: 1
subscribe:
- component: punchlet_node
stream: _ppf_successes
Now, everytime an archive batch is read, its metadata is marked as processed. If any failure or restart were to happen, no metadata would be processed twice. Keep in mind that it does not apply to a partially processed metadata. If a failure happens during a batch extraction, this batch will be processed again.
Batch metrics¶
The success stream leverages two reserved fields providing information about the batch that has been extracted :
_ppf_lines_cnt
: the amount of lines actually extracted from this archive (maybe different from batch size when using filters)_ppf_bytes_cnt
: the size in bytes actually extracted from this archive (maybe different from batch size when using filters)
All you have to do is declare them in you stream settings :
- type: archive_reader_node
settings: { }
subscribe:
- component: extraction_input
stream: metadata
publish:
- stream: _ppf_successes
fields:
- metadata
- _ppf_lines_cnt
- _ppf_bytes_cnt
These values should give you insight, on a batch level, of what has been extracted.
Error handling¶
An error may occur when reading an archive. The ArchiveReader provides a reserved _ppf_errors
stream to keep trace of
the failing metadata while continuing to extract the remaining data. Simply define this stream in your node settings.
Your failing metadata will automatically be published here, as well as the exception message, and the extraction will
continue.
- type: archive_reader_node
component: reader
settings: { }
subscribe:
- component: extraction_input
stream: metadata
publish:
- stream: _ppf_errors
fields:
- _ppf_error_message
- metadata