Skip to content

Extraction Reliability

We now know the basics of how to extract archives. However, we may face a few practical issues when it comes to long extraction.

Fault Tolerance

For any reason, the punchline may fail during an extraction. The punchline will start again, and all the documents will be read again, which may be inconvenient when extracting a lot of batches.

To overcome this issue, two steps are required : creating a task index and use the archive reader's success stream. Before the extraction, transfer the targeted metadata to a "task index" with an additional boolean field (for example named processed) set to false. This flag field will allow us to mark metadata which has already been processed by the archive reader. We do so in a dedicated index to keep the source metadata intact.

When getting metadata from the task index, we'll want to filter out the already processed metadata. This can be easily done using a query in the ExtractionInput settings :

type: extraction_input
settings:
  index: task-index
  id_column: id
  nodes:
    - localhost
  query:
    query:
      bool:
        must:
          term:
            processed: false
  publish:
    - stream: metadata
      fields:
        - metadata

Note that we also provide an alias for the id column using the setting id_column. This allows us to get the document id in order to override the processed metadata later.

Now, our archiver reader needs to publish a success stream. To do so, use the reserved stream _ppf_successes. This stream publishes the metadata when batch has been successfully extracted :

  - type: archive_reader_node
    settings: { }
    subscribe:
      - component: extraction_input
        stream: metadata
    publish:
      - stream: _ppf_successes
        fields:
          - metadata

The published metadata needs to go through a punchlet node, in order to mark it has processed and move the document id out of the metadata document.

- type: punchlet_node
  settings:
    punchlet_code: '{
      [_ppf_successes][metadata][processed] = true;
      [_ppf_successes][id] = [_ppf_successes][metadata][id];
      remove([_ppf_successes][metadata][id]);
      }'
  subscribe:
    - component: archive_reader_node
      stream: _ppf_successes
  publish:
    - stream: _ppf_successes
      fields:
        - metadata
        - id

Finally, we can override the metadata in the task index to mark it has processed :

- type: elasticsearch_output
  settings:
    per_stream_settings:
      - stream: _ppf_successes
        index:
          type: constant
          value: task-index
        document_json_field: metadata
        document_id_field: id
    batch_size: 1
  subscribe:
    - component: punchlet_node
      stream: _ppf_successes

Now, everytime an archive batch is read, its metadata is marked as processed. If any failure or restart were to happen, no metadata would be processed twice. Keep in mind that it does not apply to a partially processed metadata. If a failure happens during a batch extraction, this batch will be processed again.

Batch metrics

The success stream leverages two reserved fields providing information about the batch that has been extracted :

  • _ppf_lines_cnt : the amount of lines actually extracted from this archive (maybe different from batch size when using filters)
  • _ppf_bytes_cnt : the size in bytes actually extracted from this archive (maybe different from batch size when using filters)

All you have to do is declare them in you stream settings :

  - type: archive_reader_node
    settings: { }
    subscribe:
      - component: extraction_input
        stream: metadata
    publish:
      - stream: _ppf_successes
        fields:
          - metadata
          - _ppf_lines_cnt
          - _ppf_bytes_cnt

These values should give you insight, on a batch level, of what has been extracted.

Error handling

An error may occur when reading an archive. The ArchiveReader provides a reserved _ppf_errors stream to keep trace of the failing metadata while continuing to extract the remaining data. Simply define this stream in your node settings. Your failing metadata will automatically be published here, as well as the exception message, and the extraction will continue.

- type: archive_reader_node
  component: reader
  settings: { }
  subscribe:
    - component: extraction_input
      stream: metadata
  publish:
    - stream: _ppf_errors
      fields:
        - _ppf_error_message
        - metadata