Skip to content

HOWTO replay logs from files to elasticsearch

Why do that

This document describes how to import JSON files easily to elasticsearch.

The method has a light approach: Job approach (one shot).

What to do

Uncompress data

First uncompressed data with the following command:

gzip -d *.gz

Create a topology with the following example

The settings to update are:

  • path: path to read files
  • index: elasticsearch index name

Example: FilesToES_topology.json

{
  "spouts" : [
    {
      "type" : "file_spout",
      "settings" : { 
       "read_file_from_start" : true,
       "path" : "/tmp/extraction_files/",
       "load_control" : "rate",
       "load_control.rate" : 500,
       "load_control.adaptative" : true
     },
     "storm_settings" : 
     {
       "executors": 1,
       "component" : "file_spout",
       "publish" : 
       [ 
         { 
           "stream" : "logs" , 
           "fields" : 
           [
             "log"
           ]
         }
       ] 
     }
   }
 ],  
"bolts" : [
   {
     "type": "elasticsearch_bolt",
     "settings": {
       "cluster_id": "es_search",
       "per_stream_settings" : [
         {
           "stream" : "logs",          
           "index" : { "type" : "constant" , "value" : "mytenant-events" }          
         }
       ]
     },
     "storm_settings": {
       "component": "ES_bolt",
       "subscribe": [ { "component": "file_spout", "stream": "logs" } ]
     }
   }
  ],
  "exception_catcher_bolt" : {
    "punchlet" : "standard/common/exception_handler.punch",
    "executors" : 1
  }, 

  "storm_settings" : {
    "metrics_consumers": [ "org.apache.storm.metric.LoggingMetricsConsumer" ],
    "topology.builtin.metrics.bucket.size.secs": 30,
    "supervisor.monitor.frequency.secs" : 60,
    "topology.max.spout.pending" : 50000,
    "topology.enable.message.timeouts": true,
    "topology.message.timeout.secs" : 30,
    "topology.worker.childopts" : "-server -Xms2048m -Xmx2048m", 
    "topology.receiver.buffer.size": 32,
    "topology.executor.receive.buffer.size": 16384,
    "topology.executor.send.buffer.size": 16384,
    "topology.transfer.buffer.size": 32,
    "topology.worker.shared.thread.pool.size": 4,
    "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
    "topology.spout.wait.strategy": "org.apache.storm.spout.SleepSpoutWaitStrategy",
    "topology.sleep.spout.wait.strategy.time.ms": 50,
    "topology.workers" : 1
  }
}

Start the extraction

Simply, run the following command:

punchlinectl <topology_name>.json