Skip to content

Archiving and Extracting

Abstract

Archiving is simple to setup but lets you choose subtle options. This guide provides you with a walkthrough to understand the various operations from archiving data to extracting it.

Requirements

Make sure you successfully followed the Punchlets Getting Started guide first. This guide assumes you are familiar with topologies and punch injection files.

Generate Logs

All the configuration files used in this guide are located under the conf/samples/archiving folder. Start by injecting logs into a Kafka topic. Simply use the provided injector file.

1
punchplatform-log-injector.sh -c archiving_guide_injector.hjson

This will generate 1024 records into a mytenant_archiving_guide topic. Each record looks like this:

1
2
3
4
{"ip":"128.78.0.8","raw":"20-06-2019 01:18:00 host 128.78.0.8 bob","user":"bob","timestamp":"20-06-2019 01:18:00"}
{"ip":"128.78.3.42","raw":"20-06-2019 01:19:00 host 128.78.3.42 alice","user":"alice","timestamp":"20-06-2019 01:19:00"}
{"ip":"128.78.0.30","raw":"20-06-2019 01:20:00 host 128.78.0.30 ted","user":"ted","timestamp":"20-06-2019 01:20:00"}
...

In order to produce interesting data the injector file generates 1K records, with ip addresses among 16K possible values. This will be useful to illustrate how efficiently you can locate a given ip address in a batch file.

Tip

checkout the injection file, it is self explanatory.

Archive the Logs

Now that you have your (1024) logs into Kafka, archive them into an indexed file store. Before we execute the archiving topology, let us have a look at the topology settings. The Kafka spout simply reads our topic. The file bolt is in charge of archiving. It is configured with an additional bloom filtering capability. Last we catch the indexing metadata generated by the file bolt and we index it into elasticsearch. That last step will be used later to efficiently query our archived data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
{
  "tenant": "mytenant",
  "channel": "demo",
  "name": "test_archive_topology",
  "spouts": [
    {
      "type": "kafka_spout",
      "spout_settings": {
        "topic": "mytenant_archiving_guide_1",
        "start_offset_strategy": "earliest",
        "brokers": "local",
        "batch_size": 8
      },
      "storm_settings": {
        "component": "spout",
        "publish": [
          {
            "stream": "logs",
            "fields": [ "raw", "ip", "user", "timestamp" ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "file_bolt",
      "bolt_settings": {
        "destination" : "file:///tmp/storage",
        "pool" : "mytenant",
        "topic" : "demo",
        "file_prefix_pattern" : "%{partition}/%{date}",
        "compression_format" : "NONE",
        "create_root" : true,
        "fields": [
          "raw"
        ],
        "bloom_filter_fields": [
          "ip"
        ],
        "bloom_filter_expected_insertions" : 10000,
        "bloom_filter_false_positive_probability" :  0.1,
        "timestamp_field" : "timestamp"
      },
      "storm_settings" : {
        "component": "bolt",
        "subscribe": [
          {
            "component": "spout",
            "stream": "logs"
          }
        ],
        "publish": [
          {
            "stream": "logs",
            "fields": [
              "metadata"
            ]
          }
        ]
      }
    },
    {
      "type": "elasticsearch_bolt",
      "bolt_settings": {
        "cluster_name": "es_search",
        "per_stream_settings": [
          {
            "stream" : "logs",
            "index": {
              "type": "daily",
              "prefix": "mytenant-archive-"
            },
            "document_json_field": "metadata"
          }
        ]
      },
      "storm_settings": {
        "component" : "elastic",
        "subscribe": [
          {
            "component": "bolt",
            "stream": "logs"
          }
        ]
      }
    }
  ],
  "storm_settings": {
    "topology.worker.childopts": "-server -Xms256m -Xmx256m"
  }
}

In particular note in there :

  • timestamp_field : We want to use the timestamp incoming field as indication For future search. You may not have such a timestamp at hand but using the punch you typically have one. It is extremely useful so as to perform search based on the real timestamp event.
  • bloom_filter_fields : Bloom filtering is activated on the field "ip"
  • bloom_filter_expected_insertions : an estimate of the number of ip addresses
  • bloom_filter_false_positive_probability : we target to accept 10 % of false positive

For the sake of simplicity, the option create_root as been activated in this topology. This option allows root directory (here /tmp/storage/) to be automatically created. We do not recommend to activate this option for anything else than tests. Setting this option to true may create unwanted directories if destinations are mistyped. Furthermore, not creating root directory allows to check that destination cluster is reachable.

To start topology, simply run the following command:

1
punchplatform-topology.sh --start-foreground -m light -t archiving_guide_topology.hjson

For the sake of the example it is configured to generate batch files containing 8 records. Of course this is a very small number but it will help us illustrating the various settings without filling your laptop with huge files. Since we processed 1024 logs, we thus generated 128 batch files. Checkout the resulting archive folder. You will have the following structure:

1
2
3
4
5
6
7
8
9
/tmp/storage/
└── mytenant
    └── 0
        └── 2019.12.05
            ├── puncharchive-demo-ZgCV1W4BT3gZiUK0mKWY-0-1575541381475.csv
            ├── ...
            └── puncharchive-demo-ZgCV1W4BT3gZiUK0mKWY-0-1575541381602.csv

3 directories, 128 files

The "%{partition}/%{date}" settings is the one responsible for that structure. Here we have only a single Kafka partition (0).

Query the Archive

In the following we go through the essential commands provided by the punchplatform-archive-client.sh cli tool. It allows you to query and extract your archived data.

!!! warning This tool may not work as expected if Metadata stored in Elasticserach doesn't comply to the archive mapping. You can find this template in $PUNCHPLATFORM_CONF_DIR/resources/elasticsearch/templates/platform/mapping_archive.json. The prefix provided in elasticsearch_bolt needs to be of form *-archive-* to apply this template. If you want to apply a custom prefix, you need to update the template.

Topic Status

Start by querying the status of your topic. Time filter here takes data for the last 20 days. Notice the short date notation. You can provide ISO dates as well.

1
2
3
4
5
punchplatform-archive-client.sh topic-status \
    --destination file:///tmp/storage  \
    --from-date -20d --to-date now \
    --pool mytenant --topic demo \
    --es-index mytenant-archive
1
2
3
4
5
6
7
8
9
Topic: demo
Batches Count: 128
Tuples Count: 1024
Earliest Tuple: 2019-12-05T12:01:22+01:00[Europe/Paris]
Latest Tuple: 2019-12-05T12:01:23+01:00[Europe/Paris]
Uncompressed Size: 44.3 kB
Effective Stored Size: 44.3 kB
Compression Ratio: 1.0
1/Compression Ratio: 1.0

List Topics

List the last 20 days topic, together with some detailed info:

1
2
3
4
5
punchplatform-archive-client.sh list-topics \
    --destination file:///tmp/storage  \
    --from-date -20d --to-date now \
    --pool mytenant --details
    --es-index mytenant-archive
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
Pool                          : mytenant
Topics Number                 : 1

Topic                         : demo
Batch Size                    : 128
Tuple Count                   : 1024
Earliest                      : 2019-05-20T00:05+02:00[Europe/Paris]
Latest                        : 2019-05-23T13:20+02:00[Europe/Paris]
Uncompressed Size             : 56.7 kB
Effective Stored Size         : 56.7 kB
Compression Ratio             : 1.000000
1/Compression Ratio           : 1.000000

List Batch Files

List the last 20 days batch files for a given topic:

1
2
3
4
5
punchplatform-archive-client.sh list-objects \
    --destination file:///tmp/storage \
    --from-date -20d --to-date now \
    --pool mytenant --topic demo \
    --es-index mytenant-archive
1
2
3
4
5
6
mytenant/0/2019.12.05/puncharchive-demo-fYi41W4Btd7z87AblKC6-0-1575543674519.csv
mytenant/0/2019.12.05/puncharchive-demo-fYi41W4Btd7z87AblKC6-0-1575543674520.csv
mytenant/0/2019.12.05/puncharchive-demo-fYi41W4Btd7z87AblKC6-0-1575543674521.csv
mytenant/0/2019.12.05/puncharchive-demo-fYi41W4Btd7z87AblKC6-0-1575543674522.csv
...
mytenant/0/2019.12.05/puncharchive-demo-fYi41W4Btd7z87AblKC6-0-1575543674646.csv

Using Bloom Filters

If you activated bloom filters on one or several fields, you can extract efficiently only those batches that contain a target value. In our example bloom filter was set on the ip incoming tuple field. Here is how you can request for batches that contain a given ip:

1
2
3
4
5
6
punchplatform-archive-client.sh list-objects  \
    --destination file:///tmp/storage \
    --from-date -30d --to-date now \
    --pool mytenant --topic demo \
    --match 128.78.18.47 \
    --es-index mytenant-archive
1
mytenant/0/2019.12.05/puncharchive-demo-fYi41W4Btd7z87AblKC6-0-1575543674595.csv

In this case only a single batch contain the target ip. This in turn allows you to perform efficient data extractions.

Tip

If the result is empty, it's because IPs are randomly generated for this example. You can get these IP by using the command below and then copy one of them in --match args grep -r 128.78.18 /tmp/storage/