Skip to content

HOWTO archive then extract data

Why do that

On a Punchplatform you can leverage the storage capability of Elasticsearch to store your data, and retrieve it later. But indexing and storing the data into Elasticsearch entails higher CPU and storage costs than leveraging the punchplatform archiving service.

Besides, retrieving lots of data from Elasticsearch is not a recommended pattern. Doing that is likely to put your real-time ingestion at risk.

Here comes the punchplatform archiving service: it requires litte storage and CPU resources, yet provide you with flexible ways to retrieve your data should you need to replay it, to provide it to third-party apps or customers, or simply to perform some search in past data.

This HOWTO explains the typical workflow to first archive, then extract your data.

What to do

We explain this workflow using a likely and general use-case: you want to archive data from a source Kafka topic, and archive it into a filesystem or object storage backend. Later on you want to extract part of that archived data and put it back somewhere.

if that example is clear to you, you will be at ease to invent many variants.

Archiving

Because archiving should (i.e must) be reliable, idempotent and scalable, you should arrange to have your data to archive first stored into a Kafka topic.

The component responsible for archiving is the FileBolt<FileBolt>. It works combined with a KafkaSpout<KafkaSpout> that will read that topic.

The FileBolt actually relies on similar concepts: topics and partition. The archived data is organized into topics (sets of events often grouped by business logic) and partitions (a shard of topic).

Info

Starting from Craig 5.x version you need to dedicate your kafka topics if you want to have distinct archive topic. For example, you will have a topic for nominal events and another topic for error events. Data is split into different topics in Kafka and in the archiving system the same way.

Configuring you KafkaSpout

In your KafkaSpout configuration you must use the batch_size parameter to indicate the maximum number of events stored in a single batch. It has impact on performances (large value implies best performances) and on resources (large value implies more memory consumption). A typical value in production is 10000.

Configuring your FileBolt

Here are typical settings:

  • destination : String
    • it can be a Ceph cluster (then use ceph:// syntax)
    • or an emulation of an objects-storage system over a classic file-system if you don\'t have a Ceph cluster (then use ceph_emulator:// syntax)
  • pool : String
    • a pool is an isolated space, its name is usually the tenant name suffixed by
  • topic : String
    • name of topic, usually a business name
  • fields : array of String
    • Storm fields containing data to archive
  • es_cluster_id : String
    • The FileBolt needs an Elasticsearch cluster ID because it writes meta-data about archives during archiving operations. This ID must match a cluster name specified in you punchplatform.properties.

Make sure to read the FileBolt<FileBolt> to understand each of these parameters.

In summary, bolt_settings of your FileBolt looks like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
"bolt_settings": {
  "destination": "indexed_filestore:///tmp/archiving",
  "pool": "mytenant-data",
  "topic": "apache",
  "fields": [
    "log",
    "_ppf_id",
    "_ppf_timestamp"
  ],
  "es_cluster_id": "es_search"
}

Once you have your topology launched, check Kibana to see archiving progress in real time. Archiving metadata are written into a -objects-storage- Elasticsearch index, the content of these docuement is straightforward to understand.

Extraction from an archive

Once your data has successfully been archived, you have the ability to extract it. Two ways can be used:

  • Short-term extraction : use the punchplatform-objects-storage.sh<CLIOperationTips> CLI
  • Long-term resilient extraction : use the ArchiveSpout in a topology

The command-line interpreter is only adequate for short-term extractions or tiny time-scope extractions (see its man page).

This command is only for administrator, and does not allow for resilient large time-scope extractions. Should it stop or be interrupted you will have to restart from scratch.

The recommended extraction process is to rely on a topology using the ArchiveSpout. The ArchiveSpout configuration is very similar to the FileBolt configuration, with an additional extraction time scope setting using the from and to properties. Here is an example:

1
2
3
4
5
6
7
8
"spout_settings": {
  "cluster": "indexed_filestore:///tmp/archiving",
  "pool": "mytenant-data",
  "topic": "apache",
  "from": "2018-01-01T00:00:00+01:00",
  "to": "2018-12-31T23:59:59+01:00",
  "elasticsearch_cluster": "es_search"
}

The ArchiveSpout emits a single tuple for each archived batch, containing multiple events and archive meta-data. This tuple can be used in a downstream bolt in various way.

For example to extract each line from a batch, you could insert in a punchlet like the following:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
  Tuple file = root.getSingleChild();

  String[] lines = file:[data].split("\n");

  // Do not process header (if there is one)
  int payloadStartIndex = 0;
  if (file:[meta][content_has_header]) {
      payloadStartIndex = 1;
  }

 // We have to emit an array of tuples now, so
  // define root:stream as an array.
  root.empty();

  // Process each line
  for (int i = payloadStartIndex; i < lines.length; i++) {

    // Split each field
    String separator = file:[meta][fields_separator];
    String[] fieldValues = lines[i].split(separator);

    String log = fieldValues[0];
    String local_uuid = fieldValues[1];
    String local_timestamp = fieldValues[2];

    // Build a new tuple with these informations
    Tuple event;
    event:[logs][log] = log;
    event:[logs][local_uuid] = local_uuid;
    event:[logs][local_timestamp] = local_timestamp;

    // Compute elasticsearch index name using local timestamp
    String elasticsearch_date = date("yyyy.MM.dd").on(local_timestamp).get();
    event:[logs][es_index] = world:[meta][tenant] + "-events-" + elasticsearch_date;
    root.append(event);

  }
}