Skip to content

FileReaderBolt

The Files reader bolt receives filenames tuple, and for each one, reads line from the said file, and emit a tuple for each line. At the end of each file extraction (or if an error is encountered while reading a file), a report tuple is emitted (if the bolt is configured for this). The emitted line tuples are anchored to the 'file' tuple, so that if any emitted line tuple fails further in the topology, the file tuple will be reported as failed to the Topology spout. The same is true for the extraction reports themselves.

The files reader bolt supports GZIP format of input file. Each line is supposed finished by a carriage return character, except the last one of the file.

Here is a sample configuration (for a full topology sample, please refer to /samples/jobs/inject_log_files_job_topology.json)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
          "type" : "logs_files_reader_bolt",
          "bolt_settings" : {
              "filename_input_storm_field" : "file_name",
              "fileid_input_storm_field " : "task_id",
              "uncompress" : true,
              "data_output_stream_id" : "logs",
              "line_data_output_field" : "log",
              "line_id_output_field" : "local_uuid",
              "line_id_pattern" : "%f-%l",

              "extraction_reporting" : {
                 "storm_stream" : "extraction_reports",
                 "lines_count_field" : "lines",
                 "error_message_field" : "error_message"
              }

          },
          "storm_settings" : {
            "executors": 1,
            "component" : "files_reader",
            "publish" : [ 
            { 
                "stream" : "logs" , 
                "fields" : ["log","local_uuid","local_timestamp"]
              },
             { 
                "stream" : "extraction_reports" , 
                "fields" : ["file_name","lines","linenum", "task_id", "error_message"]
              },
            ],
            "subscribe" : [ 
              { 
                "component" : "tasks_spout", 
                "stream" : "tasks", 
                "grouping": "localOrShuffle"
              }
            ] 
          }
  }

Streams and fields

The FilesReader bolt subscribes to a stream which provides 'files' records with a name and id.

It publishes one or two streams :

  • a mandatory stream that will receive 'lines' tuples

    This stream can publish the following fields :

    • the extracted line
    • a repeatable 'unique line id' built based on the input file id and the line number
    • any other field coming from the file tuple (you just have to publish them in the storm settings of the bolt)
  • an optional stream that will receive success/failure reports on extracted files.

    This stream can publish the following fields :

    • the count of extracted lines in the file (at the moment of the error, in case of an error ; at the end of extraction in case of success)
    • an error message (if an error caused the report)
    • any other field coming from the file tuple (you just have to publish them in the storm settings of the bolt)

Parameters

  • filename_input_storm_field String

    name of the storm input stream that conveys the absolute file path of the file to extract (interpreted on the machine running the storm bolt)

  • fileid_input_storm_field: String

    name of the storm input stream that conveys the unique file id that will be used to construct the unique line tuple id. If not provided, same value as ‘filename_input_storm_field’ will be used

  • uncompress: Boolean false

    If true, indicates input file is in GZIP format.

  • data_output_stream_id String

    name of the published storm stream that will be used to emit “line” tuples

  • line_data_output_field String

    name of the storm output field in the data stream, that will receive the extracted content of the line

  • line_id_output_field String

    name of the storm output field in the data stream, that will receive the generated unique identifier of the line. Using this as a unique id for documents allows to handle replay of extraction process in an idempotent way, avoiding duplicates in the final storage

  • line_id_pattern: String : "%f-%l"

    pattern for automatic generation of unique line ids in the output data tuples. ‘%f’ are replaced automatically with input file id, and ‘%l’ automatically with line number (starting at 0 in the file)

  • extraction_reporting: settings dictionary

    If present, then activates the ‘extraction reports’ feature (see above).

  • extraction_reporting.storm_stream String (if settings section is present)

    name of the published storm stream that will be used to emit “extraction report” tuples

  • extraction_reporting.lines_count_field String (if settings section is present)

    name of the published storm field that will receive count of extracted lines in “extraction report” tuples tuples

  • extraction_reporting.error_message_field String (if settings section is present)

    name of the published storm field that will receive count of extracted lines in “extraction report” tuples tuples

Metrics

See metrics_filereader_bolt