HOWTO replay logs from files to elasticsearch
Why do that¶
This document describes how to import JSON files easily to elasticsearch.
The method has a light approach: Job approach (one shot).
What to do¶
Uncompress data¶
First uncompressed data with the following command:
gzip -d *.gz
Create a topology with the following example¶
The settings to update are:
- path: path to read files
- index: elasticsearch index name
Example: FilesToES_topology.json
{
"spouts" : [
{
"type" : "file_spout",
"settings" : {
"read_file_from_start" : true,
"path" : "/tmp/extraction_files/",
"load_control" : "rate",
"load_control.rate" : 500,
"load_control.adaptative" : true
},
"storm_settings" :
{
"executors": 1,
"component" : "file_spout",
"publish" :
[
{
"stream" : "logs" ,
"fields" :
[
"log"
]
}
]
}
}
],
"bolts" : [
{
"type": "elasticsearch_bolt",
"settings": {
"cluster_id": "es_search",
"per_stream_settings" : [
{
"stream" : "logs",
"index" : { "type" : "constant" , "value" : "mytenant-events" }
}
]
},
"storm_settings": {
"component": "ES_bolt",
"subscribe": [ { "component": "file_spout", "stream": "logs" } ]
}
}
],
"exception_catcher_bolt" : {
"punchlet" : "standard/common/exception_handler.punch",
"executors" : 1
},
"storm_settings" : {
"metrics_consumers": [ "org.apache.storm.metric.LoggingMetricsConsumer" ],
"topology.builtin.metrics.bucket.size.secs": 30,
"supervisor.monitor.frequency.secs" : 60,
"topology.max.spout.pending" : 50000,
"topology.enable.message.timeouts": true,
"topology.message.timeout.secs" : 30,
"topology.worker.childopts" : "-server -Xms2048m -Xmx2048m",
"topology.receiver.buffer.size": 32,
"topology.executor.receive.buffer.size": 16384,
"topology.executor.send.buffer.size": 16384,
"topology.transfer.buffer.size": 32,
"topology.worker.shared.thread.pool.size": 4,
"topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
"topology.spout.wait.strategy": "org.apache.storm.spout.SleepSpoutWaitStrategy",
"topology.sleep.spout.wait.strategy.time.ms": 50,
"topology.workers" : 1
}
}
Start the extraction¶
Simply, run the following command:
punchlinectl <topology_name>.json