Skip to content

HOWTO extract logs from Elasticsearch and write them in files

Why do that

This is a typical user use case: extracting data from elasticsearch makes it easy to work on the data, or to provide a detailed report to third-party analysts.

Prerequisites

  • The data in your Elasticsearch cluster must be available. (state GREEN in / _cat/indices).
  • The storage on all devices where you perform the extraction must be adequate.

What to do

To write logs in files you can use the file bolt, but because the file bolt is working using a batch strategy, the easiest way to do this is first store the extracted logs in a kafka topic, then use a second job to transfer these logs to files.

Besides, going through a Kafka topic is robust and scalable should you need to extract lots of logs. In this howto we will explain how to do this.

Configure your Elasticsearch-to-Kafka topology

Start from the following example topology to extract logs and write them to Kafka. Adapt it for your need. For the sake of the example we will work on the metric index. If you have a standalone at hand, you shoud have 'events-mytenant-*' indices with ready to use data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
{
    "tenant" : "mytenant",
    "name" : "extract_job_es_to_kafka",
    "spouts" : [
        { 
            "type" : "elasticsearch_spout",
            "spout_settings" : {
              "cluster_id" : "es_search",
              "index" : "mytenant-metrics-*",
              "type": "doc",
              # this is convenient to detect your target 
              # index indeed exists. If not your topology
              # will then exit with a clear error message.
              "es.index.read.missing.as.empty": false,
              # your are likely to be interested in the document
              # unique id. 
              "es.read.metadata" : true
            },
            "storm_settings" : {
              "component" : "extractor_spout"
            }
        }
    ],
    "bolts" : [
      {
        # This punch bolt will extract interesting parts from the elastic
        # document and emit these to the next (kafka) bolt.
        "type" : "punch_bolt",
        "bolt_settings" : {
          # checkout the punchlet described hereafter
          "punchlet" : "jobs/get_timestamp.punch"
        },
        "storm_settings" : {
          "executors": 1,
          "component" : "punch",
          "publish" : [
            {
              "stream" : "logs",
              "fields" : ["id", "@timestamp"]
            }
          ],
          "subscribe" : [
            {
              "component" : "extractor_spout"
              # this first bolt subscribes to the default stream 
            }
          ]
        }
      },
      {
           "type" : "kafka_bolt",
           "bolt_settings" : {
              "topic" : "mytenant_es_to_kafka_extraction",
              "producer.acks": "all",
              "producer.batch.size": 16384,
              "producer.type" : "sync",
              "producer.linger.ms": 0,
              "brokers" : "local",
              "encoding" : "lumberjack"
           },
           "storm_settings" : {
              "component" : "kafka",
              "subscribe" : [
                {
                  "component" : "punch",
                  "stream" : "logs"
                }
              ]
            }
        }
    ],
    "storm_settings" : {
        "topology.max.spout.pending" : 6000,
        "topology.enable.message.timeouts": true,
        "topology.worker.childopts": "-Xms512m -Xmx512m"
    }
}

The topology just illustrated uses a specific punchlet to extract the timestamp from the logs. In turn this timestamp will be used to name files. The Here is the punchlet:

1
2
3
4
5
6
7
8
9
{
  // In this very simple example we only extract the id snd the timestamp 
  // of each elasticseaerch (metric) document). 
  [logs][id]         = [default][doc][_metadata][_id];
  [logs][@timestamp] = [default][doc][@timestamp];

  // To debug and check what you have you can include this:
  // print(root);
}

To perform the extraction, run the following command:

1
$ punchplatform-topology.sh <YOUR_TOPOLOGY>

You now have a Kafka topic filled with the logs you want.

Hint

You can check your topic content using the punchplatform log injector;

sh $ punchplatform-log-injector.sh --kafka-consumer -brokers local -topic mytenant_es_to_kafka_extraction --earliest -v

Configure your Kafka-to-Files topology

Start next from the following example topology:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
{
    "tenant" : "mytenant",
    "channel" : "jobs",
    "name" : "extract_kafka_to_file",
    "spouts" : [
      {
            "type" : "kafka_spout",
            "spout_settings" : {
              "topic" : "mytenant_es_to_kafka_extraction",
              "start_offset_strategy": "last_committed",
              "brokers": "local",
              # Watchout the batch_size and batch_interval are the two 
              # essential settings. Basically we require here to have 10000
              # logs per file. 
              "batch_size": 10000,

              # .. and to flush the last file after 5 minutes whenever there are
              # no more logs arriving.
              "batch_interval": "1m"
              "load_control" : "none",
              "brokers" : "local"
            },
            "storm_settings" : {
              "executors": 1,
              "component" : "kafka_spout",
              "publish" : [ 
                { 
                  "stream" : "logs", 
                  "fields" : ["id", "@timestamp"] 
                } 
              ]
            }
        }
    ],
    "bolts" : [
      {
           "type" : "file_bolt",
           "bolt_settings" : {
              # make sure the target folder exists.
              "destination": "file:///tmp/archive-logs"
              "separator" : ";;;",
              "add_header" : true
           },
           "storm_settings" : {
              "executors": 1,
              "component" : "file",
              "subscribe" : [ 
                { 
                  "component" : "kafka_spout", 
                  "stream" : "logs", 
                  "grouping": "localOrShuffle"
                }
              ] 
            }
      }
    ],
    "storm_settings" : {
        # whenever you use the file bolt, use at least a few hundredsMb
        # as it relies on direct memory allocated memory.  
        "topology.worker.childopts": "-Xms512m -Xmx512m",
    }
}

To perform the extraction, simply run the following command:

1
$ punchplatform-topology.sh <YOUR_TOPOLOGY>

You now have your logs in the '/tmp/archive-logs'.