Class FileInput

  • All Implemented Interfaces:
    com.github.punch.api.node.PunchBaseNode, com.github.punch.api.storm.nodes.InputNode, com.github.punch.api.storm.nodes.StormNode, Serializable

    public class FileInput
    extends com.github.punch.api.storm.nodes.PunchInputNode
    The original implementation of the File Spout. This one is NOT able to detect any new file added to a directory path after the Spout initialisation.

    The file spout feeds messages into Storm from one or several files. You can set a directory, in which case all the files in it will be continuously read, or specify a single file.

    Overview

    This spout launches a single dedicated thread per file to scan. If the file is closed/rotated/recreated, it will be continuously reopened and read.

    Spout level settings

    property mandatory type default comment
    path yes string - the path to the file to read. It can be a directory in which case all the files will be read.
    codec no string - The codec to use to read files. Accepted values are: "line", "multiline","json_array", "xml". If empty, FileSpout will try to use correct codec thanks to file extension (*.json, *.xml).
    path.regex no string - In case of a directory path, you can define a regex to only consume some specific files based on their file names. Watch out: the regex format must be java-compatible and do not forget to escape JSON reserved characters.
    read_file_from_start no boolean true By default the files will be read form the start, you can request the file to be tailed only by setting this property to false.
    reader_timeout no long 1000 when no more lines are appended to the file(s), each thread will sleep and periodically recheck if something has been appended. You can set the sleeping timeout in milliseconds.
    queue_size no int 100000 The maximum of lines stored in-memory from all files. When this value is reached, we slow down the file(s) reading. If you deal with very long lines, feel free to set a very low value (e.g. 1000 or even 100) but your processing speed will also decrease.
    load_control no string "none" If set to "rate", the spout will limit the rate to a specified value. You can use this to limit the incoming or outgoing rate of data from/to external systems. Check the "load_control.adaptative" and "load_control.rate" related properties
    load_control.rate no long 10000 Only relevant if load_control is set to "rate". Limit the rate to this number of message per seconds.
    load_control.adaptative no long false If true, the load control will not limit the traffic to load_control.rate message per second, but to less or more as long as the topology is not overloaded. Overload situation is determined by monitoring the Storm tuple traversal time. If that traversal time increases (which occurs quickly as soon as the topology is overloaded), the load controller will limit the allowed rate. This option makes it easy to protect your topology without limiting its rate to a too conservative rate, as it is difficult if not impossible to estimate the maximum rate.
    ordered no boolean false If true, the spout will sequentially read all files from your directory one by one from the oldest to the newest (based on the update timestamp). No parallelism issues, no tailing.

    This is useful to replay log files you extracted using a rolling logger strategy, the oldest files containing the oldest data. If you set the ordered parameter to true it implicitly means to set the tail parameter to false.

    tail no boolean true If false, the spout will read your files just once, without tailing them for ever. Watchout, your file will be proceed in parallel. If you need an ordered read, use the ordered setting instead.
    multiline no boolean false If true, the spout will aggregate subsequent log line using a prefix such as "\t". Refer to the other multiline properties. When set to true, at least one value between regex and delimiter must be defined.
    multiline.regex no String - Set the regex used to determine if a log line is an initial or subsequent line.
    multiline.startswith no String - Set the regex used to determine if a log line is an initial or subsequent line.
    multiline.delimiter no String \n Once completed the aggregated log line is made up from the sequence of each line. You can insert a line delimiter to make further parsing easier.
    multiline.timeout no long 1000 If a single log line is received, it will be forwarded downstream after this timeout, expressed in milliseconds.
    watcher no boolean false Activate the watcher functionality to scrutinize a folder and process any new file. With this parameter, even file added after the topology initialisation will be read.
    watcher.process_existing_files no boolean true Only used in "watcher" mode. If set to true, any already existing file in the folder will be proceed before starting the watcher. Otherwise, only new files will be read.

    Configuration example


    
     {
       "type": "file_spout",
       "spout_settings": {
         "codec": "line",
         "ordered": false,
         "tail": true,
         "read_file_from_start": true,
         "path.regex":".*\\.log",
         "path": "/path/to/directory"
       },
       "storm_settings": {
         "component": "filespout",
         "publish": [
           {
             "stream": "logs",
             "fields": [
               "log",
               "path",
               "last_modified"
             ]
           }
         ]
       }
     }
     
    See Also:
    Serialized Form
    • Nested Class Summary

      Nested Classes 
      Modifier and Type Class Description
      protected static class  FileInput.Item
      Whenever something is read, it is enqueued so that storm can come pick it in nextTuple().
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected LinkedBlockingQueue<FileInput.Item> failed
      Ths queue holds failed tuples, the failed lines of data that must be re-emitted.
      protected LinkedBlockingQueue<FileInput.Item> inputQueue
      Internal FileSpout queue to put read items before doing the actual processing.
      • Fields inherited from class com.github.punch.api.storm.nodes.PunchInputNode

        exitCondition, monitoringActivation, monitoringPeriod
    • Constructor Summary

      Constructors 
      Constructor Description
      FileInput()  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void ack​(Object o)  
      void declare​(com.github.punch.api.storm.streams.StormNodePubSub declarer)  
      protected FileInput.Item drainNextLogs()  
      void emitNextLine​(String line, Map<String,​String> properties)  
      void fail​(Object o)  
      List<File> getListFiles​(File file)
      This method constructs a list of files to process from different format, in subfolders or compressed.
      void nextTuple()
      Callback to feed the topology with tuples.
      void onOpen()
      Setup this spout before it starts receiving data and emitting tuples into the topology.
      void readOneFile​(File f)
      Read a file using with a single or multi thread manner.
      void validate()  
      • Methods inherited from class com.github.punch.api.storm.nodes.PunchInputNode

        getCollector, getExitCondition, getLoadController, getMonitoringPeriod, getNextTupleCallback, isMonitoringActivation, registerNextTupleCallback, setExitCondition, setLoadController, setMonitoringActivation, setMonitoringPeriod, setNextTupleCallback
      • Methods inherited from class com.github.punch.api.storm.nodes.PunchStormNode

        getContext, getLogger, getMetricContext, getMySelf, getOriginatorName, getPlatform, getPublishedStreams, getSettings, getStormSettings, getTopologySettings, getType, setCollector, setContext, setLogger, setMetricContext, setMyPoint, setMySelf, setOriginatorName, setPlatform, setPublishedStreams, setSettings, setStormSettings, setTopologySettings, setType
      • Methods inherited from interface com.github.punch.api.storm.nodes.StormNode

        getComponentConfiguration, getContext, getDataStreams, getErrorStream, getLogger, getMetricContext, getMetricStream, getMySelf, getOriginatorName, getPlatform, getPublishedStreams, getSettings, getStormSettings, getTopologySettings, getType, hasErrorStream, onClose, sendLatencyRecord
    • Field Detail

      • failed

        protected transient LinkedBlockingQueue<FileInput.Item> failed
        Ths queue holds failed tuples, the failed lines of data that must be re-emitted. It is unbounded but it actually is based on the max pending tuple allowed in the storm topology.
    • Constructor Detail

      • FileInput

        public FileInput()
    • Method Detail

      • declare

        public void declare​(com.github.punch.api.storm.streams.StormNodePubSub declarer)
      • onOpen

        public void onOpen()
        Setup this spout before it starts receiving data and emitting tuples into the topology.

        This is called by the Storm runtime at topology activation.

      • nextTuple

        public void nextTuple()
        Callback to feed the topology with tuples. This is called by Storm. A single tuple is returned. If there is none in the spout queue, this method sleeps for a short delay, in order not to keep Storm in a busy loop. Dimitri : can you confirm the "short delay" is still implemented here ? You might have removed it because it caused problems with Storm own flow-control...
      • ack

        public void ack​(Object o)
      • fail

        public void fail​(Object o)
      • readOneFile

        public void readOneFile​(File f)
        Read a file using with a single or multi thread manner. If "ordered" is set to true, the file will be proceed in the current Thread. Otherwise, a new dedicated reading Thread is used.
        Parameters:
        f - the file to read
      • drainNextLogs

        protected FileInput.Item drainNextLogs()
        Returns:
        the next log if any is available from one of our queue
      • getListFiles

        public List<File> getListFiles​(File file)
        This method constructs a list of files to process from different format, in subfolders or compressed.
        Parameters:
        file - file from conf (could be a folder or an archive)
        Returns:
        the list of files