public abstract class AbstractFileInput
extends org.thales.punch.libraries.storm.api.BaseInputNode
This spout launches a single dedicated thread per file to scan. If the file is closed/rotated/recreated, it will be continuously reopened and read.
property | mandatory | type | default | comment |
---|---|---|---|---|
path | yes | string | - | the path to the file to read. It can be a directory in which case all the files will be read. |
codec | no | string | - | The codec to use to read files. Accepted values are: "line", "multiline","json_array", "xml". If empty, FileSpout will try to use correct codec thanks to file extension (*.json, *.xml). |
path.regex | no | string | - | In case of a directory path, you can define a regex to only consume some specific files based on their file names. Watchout: the regex format must be java-compatible and do not forget to escape JSON reserved characters. |
read_file_from_start | no | boolean | true | By default the files will be read form the start, you can request the file to be tailed only by setting this property to false. |
reader_timeout | no | long | 1000 | when no more lines are appended to the file(s), each thread will sleep and periodically recheck if something has been appended. You can set the sleeping timeout in milliseconds. |
queue_size | no | int | 100000 | The maximum of lines stored in-memory from all files. When this value is reached, we slow down the file(s) reading. If you deal with very long lines, feel free to set a very low value (e.g. 1000 or even 100) but your processing speed will also decrease. |
load_control | no | string | "none" | If set to "rate", the spout will limit the rate to a specified value. You can use this to limit the incoming or outgoing rate of data from/to external systems. Check the "load_control.adaptative" and "load_control.rate" related properties |
load_control.rate | no | long | 10000 | Only relevant if load_control is set to "rate". Limit the rate to this number of message per seconds. |
load_control.adaptative | no | long | false | If true, the load control will not limit the traffic to load_control.rate message per second, but to less or more as long as the topology is not overloaded. Overload situation is determined by monitoring the Storm tuple traversal time. If that traversal time increases (which occurs quicly as soon as the topology is overloaded), the load controller will limit the allowed rate. This option makes it easy to protect your topology without limiting its rate to a too conservative rate, as it is difficult if not impossible to estimate the maximum rate. |
ordered | no | boolean | false | If true, the spout will sequentially read all files from your directory
one by one from the oldest to the newest (based on the update timestamp). No
parallelism issues, no tailing.
This is useful to replay log files you extracted using a rolling logger strategy, the oldest files containing the oldest data. If you set the ordered parameter to true it implicitely means to set the tail parameter to false. |
tail | no | boolean | true | If false, the spout will read your files just once, without tailing them for ever. Watchout, your file will be proceed in parallel. If you need an ordered read, use the ordered setting instead. |
multiline | no | boolean | false | If true, the spout will aggregate subsequent log line using a prefix such as "\t". Refer to the other multiline properties. When set to true, at least one value between regex and delimiter must be defined. |
multiline.regex | no | String | - | Set the regex used to determine if a log line is an initial or subsequent line. |
multiline.startswith | no | String | - | Set the regex used to determine if a log line is an initial or subsequent line. |
multiline.delimiter | no | String | \n | Once completed the aggregated log line is made up from the sequence of each line. You can insert a line delimiter to make further parsing easier. |
multiline.timeout | no | long | 1000 | If a single log line is received, it will be forwarded downstream after this timeout, expressed in milliseconds. |
watcher | no | boolean | false | Activate the watcher functionality to scrutinize a folder and process any new file. With this parameter, even file added after the topology initialisation will be read. |
watcher.process_existing_files | no | boolean | true | Only used in "watcher" mode. If set to true, any already existing file in the folder will be proceed before starting the watcher. Otherwise, only new files will be read. |
{
"type": "file_spout",
"spout_settings": {
"codec": "line",
"ordered": false,
"tail": true,
"read_file_from_start": true,
"path.regex": ".*\\.log",
"path": "/path/to/directory"
},
"storm_settings": {
"component": "filespout",
"publish": [
{
"stream": "logs",
"fields": [
"log",
"path",
"last_modified"
]
}
]
}
}
Modifier and Type | Class and Description |
---|---|
protected static class |
AbstractFileInput.Item
Whenever something is read, it is enqueued so that stom can come pick it in
nextTuple().
|
Modifier and Type | Field and Description |
---|---|
protected LinkedBlockingQueue<AbstractFileInput.Item> |
failed
Ths queue holds failed tuples, the failed lines of data that must be
reemitted.
|
protected LinkedBlockingQueue<AbstractFileInput.Item> |
inputQueue
Internal FileSpout queue to put read items before doing the actual
processing.
|
protected static Map<String,LinkedBlockingQueue<AbstractFileInput.Item>> |
inputQueues
Keep track of internal queues based on the Storm component ID.
|
protected boolean |
orderedRead
If set to true, every files will be proceed one by one in a single thread
starting with the oldest files first.
|
protected String |
path
The file or folder path to consume.
|
protected Pattern |
pathRegexPattern
The regualar expression pattern that any file must match to be proceed.
|
protected boolean |
processAlreadyExisting
If true, the watcher will start by process any already existing file is the
path.
|
protected int |
queueSize
The maximum amount of input lines we accept to accumulate in our server
queues.
|
Constructor and Description |
---|
AbstractFileInput(org.thales.punch.libraries.storm.api.NodeSettings spoutSettings,
org.apache.logging.log4j.Logger subLogger)
Ctor.
|
Modifier and Type | Method and Description |
---|---|
void |
ack(Object o) |
protected AbstractFileInput.Item |
drainNextLogs() |
void |
emitNextLine(String line,
Map<String,String> properties) |
void |
fail(Object o) |
List<File> |
getListFiles(File file)
This method constructs a list of files to process from different format, in
subfolders or compressed.
|
void |
nextTuple()
Callback to feed the topology with tuples.
|
void |
open(Map stormSettings,
org.apache.storm.task.TopologyContext topologyContext,
org.apache.storm.spout.SpoutOutputCollector collector) |
void |
readOneFile(File f)
Read a file using with a single or multi thread manner.
|
close, deactivate, declareOutputFields, getPublishedStreams, regulate, sendLatencyRecord
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
protected static Map<String,LinkedBlockingQueue<AbstractFileInput.Item>> inputQueues
protected transient LinkedBlockingQueue<AbstractFileInput.Item> inputQueue
protected transient LinkedBlockingQueue<AbstractFileInput.Item> failed
protected int queueSize
protected String path
protected Pattern pathRegexPattern
protected final boolean orderedRead
protected final boolean processAlreadyExisting
Only used when "watcher" is set to true.
public AbstractFileInput(org.thales.punch.libraries.storm.api.NodeSettings spoutSettings, org.apache.logging.log4j.Logger subLogger)
spoutSettings
- the spout settingssubLogger
- a loggerpublic void open(Map stormSettings, org.apache.storm.task.TopologyContext topologyContext, org.apache.storm.spout.SpoutOutputCollector collector)
open
in interface org.apache.storm.spout.ISpout
open
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void readOneFile(File f)
f
- the file to readprotected AbstractFileInput.Item drainNextLogs()
public void nextTuple()
public void ack(Object o)
ack
in interface org.apache.storm.spout.ISpout
ack
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void fail(Object o)
fail
in interface org.apache.storm.spout.ISpout
fail
in class org.thales.punch.libraries.storm.api.BaseInputNode
Copyright © 2023. All rights reserved.