Elasticsearch Spout

The PunchPlatform ElasticsearchSpout reads data from an Elasticsearch index subset of data, selected by a time scope and an optional filtering request.

The data is extracted in “time slices”, ordered based on a chosen timestamp field from Elasticsearch documents. Within each time slice, documents are extracted without any defined ordering.

The Elasticsearch Spout saves in zookeeper its progress so that in case of failure, extraction will not replay already fully processed and acknowledged time slices. The used cluster is the one configured for PunchPlatform admin service in punchplatform.properties

There are two quite different usage of the Spout :

  • Finite time scope, in the case of a “job” in charge of extracting documents from this scope
  • Infinite time scope, where the task reads new documents at the end of the timescope, in “near real-time”

In “finite time scope extraction mode”, when a topology including this spout is started, it automatically creates a job record, that will enable tracking the progress of the data extraction from the PunchPlatform Admin service API and Graphical User Interface (Cf. PunchPlatform Admin Presentation). When the end of the time scope has been extracted and processed successfully by the topology, the job will report its end, and the topology will automatically been killed.

In “unbounded time scope end mode”, the Elasticsearch Spout will NOT be run as part of a job, and will never stop on its own.

Note

IMPORTANT : this spout is not supporting multiple executors or components instances for this spout. (Risks of multiple extraction of the elasticsearch documents and also of premature ending of local topologies : first component to end its task kills the topology even if other component has remaining tasks in its list.)

The Elasticsearch spout has two implemented error-management behaviour you can choose from:

  • Default behaviour :

    if any tuple emitted fails, then the elasticsearch extraction timeslices are ‘rewinded’ to the timeslice that generated the failed tuple, and the extraction starts again at this timeslice (and will replay also the following timeslices, even if they had been successfully processed and acknowledged)

    With this behaviour, the topology is “stuck” if any single tuple causes a systematic failure in the topology. You will then have to correct your topology (faulty parser or kafka/es unavailability may be) and restart it so that it can go on from the faulty timeslice on.

    The logic of the Elasticsearch spout in this mode is illustrated next:

    ../../../../_images/ElasticSearchSpoutContract.png

    This is an example of extraction job topology that extracts error logs (may be for re-parsing and reinjecting them to remove errors)

    {
        "tenant" : "myTenant",
        "channel" : "jobs",
        "name" : "add_field",
        "spouts" : [
             {
                "type" : "elasticsearch_spout",
                "spout_settings" : {
                  "es_client_type" : "client",
                  "es_cluster_name" : "es_search",
                  "es_cluster_nodes_and_ports" : "localhost:9300",
                  "index_name" : "events-mytenant-2016.02.09",
                  "from_datetime" : "2016-02-09T14:40:34+0100",
                  "to_datetime" : "2016-02-09T15:10:34+0100",
                  "timestamp_field" : "mytenant.input.ts",
    
                  # in this example, we fetch log documents with "error" type, and field "chanel" starting with an "a"
                  "filtering_request" : "_type:\"log\" AND _type:\"error\" AND channel:\"/a.*/\",
    
                  "es_id_storm_output_field_name" : "log_id",
                  "es_type_storm_output_field_name" : "es_type",
                  "es_document_storm_output_field_name" : "log",
                  "es_index_storm_output_field_name" : "es_index"
    
                },
                "storm_settings" : {
                  "executors": 1,
                  "component" : "extractor_spout",
                  "publish" : [
                    {
                      "stream" : "logs" ,
    
                      "fields" : ["log","es_type", "log_id", "es_index"]
                    }
                  ]
                }
            }
        ],
    
  • Non-blocking behaviour, with report :

    This behaviour is used when the ElasticsearchSpout is used as a Tasks spout and we prefer to have tasks flagged “FAILED” in a safe manner, but go on processing other tasks . In this case, you define a reporting stream that will receive tuples each time an extracted document (a task) has been successfully acknowledged or notified as failed. Using these tuple, you can for example persist success or failure of the task by overwriting the original task document. This allows you to replay failed tasks using a new job, with a filter set on the task record status in Elasticsearch

    This is an example of extraction near-real-time unbounded topology, that manages tasks :

    {
        "type" : "elasticsearch_spout",
        "spout_settings" : {
          "es_client_type" : "client",
          "es_cluster_nodes_and_ports" : "localhost:9300",
          "es_cluster_name" : "es_search",
          "index_name" : "tasks",
          "from_datetime" : "2015-02-21T14:40:34+0100",
          "to_datetime" : "2019-02-28T15:10:34+0100",
          "xfiltering_request" : "job.id:\"e1\"  AND NOT status.value:\"DONE\"",
          "timeslice_size_ms" : 60000,
          "timestamp_field" : "job.creation_ts",
          "es_client_type" : "client",
    
    
          "es_id_storm_output_field_name" : "task_id",
          "es_type_storm_output_field_name" : "task_type",
          "es_document_storm_output_field_name" : "task",
          "es_index_storm_output_field_name" : "es_index",
    
          "fields_extraction" : {
              "file_name" : {
                  "es_field" : "[file][path]"
              },
              "file_timestamp" : {
                  "es_field" : "[file][creation_ts]"
              },
              "topic":{
                  "es_field" : "[target_topic]"
              }
          },
    
          "acknowledgement_reporting" : {
            "storm_stream_id" : "tasks_updates",
            "boolean_ack_storm_field_name" : "acked",
            "replay_failed_timeslices" : false
          }
    
        },
        "storm_settings" : {
          "executors": 1,
          "component" : "tasks_spout",
          "publish" : [
            {
              "stream" : "tasks" ,
              "fields" : ["task","task_type", "task_id", "es_index", "file_name", "file_timestamp","topic"]
            },
            {
              "stream" : "tasks_updates" ,
              "fields" : ["task","task_type", "task_id", "es_index", "acked", "file_timestamp"]
            }
    
          ]
        }
    }
    

Parameters

Parameter Mandatory Type Default Comment
es_client_type no String transport Use “node” to use embedded node client where the spout will include itself as a member of the source Elasticsearch cluster, or “transport” to use the transport client using the cluster API. The node client is more efficient, but requires to communicate in both directions with the cluster nodes. Use transport if you are not on the same network or need only spout-to- cluster connection direction.
es_cluster_nodes_and_ports yes String comma-separated list of <host/address>:<port> of the Elasticsearch servers
es_cluster_name yes String internal name of the Elasticsearch cluster (see punchplatform.properties file)
index_name yes String   The name of index (or Elasticsearch alias) from which documents must be fetched by the spout.
from_datetime yes Date string   The start of the time scope to extract, with an iso 8601 format with seconds and timezone. e.g.:2016-02-09T14:40:34+0100
to_datetime no Date string   The end of the time scope to extract, with same format as ‘from_datetime’ parameter. Mandatory if fetch_documents_older_than_seconds setting is not used. The presence of this parameter triggers the “finite time scope extraction job” mode of the spout, including automatic stop of the job at end of extraction.
fetch_documents_older_than_seconds no Integer (s)   This parameter is mutually exclusive with to_datetime. Its presence trigger the “infinite time scope near-real-time extraction mode” of the spout. The spout will wait for next timeslice to extract to be be fully older than this setting before actually fetching the thimeslice document. The purpose is to allow for some time to be sure documents to process have arrived in the source elasticsearch, before acquiring the timeslice documents, because a timeslice is only extracted ONE TIME (except failure scenarii).
timeslice_size_ms no Integer (ms) 900000ms (15 min) The width of each “scroll” request to Elasticsearch. This is the size of processing that will be retried in case of failure. Inside this slice, the logs will be extracted with no defined ordering.
scroll_size no Integer 10000 The number of logs that each individual request will retrieve, within a timeslice.
timestamp_field yes String   The name of the field inside elasticsearch documents, on which the time scope selection shall be applied.
sorted no boolean true If true, then documents will be fully sorted in ascending order of the timestamp field ; if false, the documents will be extracted by timestamp-ordered time slices, but within each timeslice, documents will be extracted without timestamp ordering ; this is much less CPU consuming when time slices may contain large amounts of documents.
filtering_request no String   An optional filter, that can be provided to extract only some of the elasticsearch documents from the designated time scope. This filter string must be a request expressed following Elasticsearch “query DSL” - cf https://www.elastic.co/guide/en/elasticsearch/reference/1.4 /query-dsl-query-string-query.html#query-string-syntax. Example of value if you want to extract/replay apache logs that have encountered a processing problem : “channel:”apache” and _type:”error”“
job_id no String <topologyId> This job id is used for persisting the job progress, so as to not re-do it totally in case of restart. If not specified, then the internal Storm topology name is used as job id. This means if a job topology running in storm restarts due to a failure, then the job will go on anyway from the persisted state BUT if USER restarts the same job by stopping this topology and starting a new one with same configuration, then the persisted state will be found and reused only if a jobId has been set here.

Optional remote-controlled on/off regulation.

See Optional : External inhibition/enabling through UDP commands

Streams And fields

The ElasticsearchSpout emits in the topology 4-fields-tuple over a single stream. The emitted fields are :

Field Type Description
log String the json document, as contained in the _source field of the elasticsearch source document
es_type String the elasticsearch document type (_type field of the elasticsearch source document)
es_id String the document id, unique within an elasticsearch index; this can be used if necessary to re-index a modified document, after having the topology amend it (for example to add a new document field, or recompute some document field value)
es_index String the elasticsearch index name; this can be useful either to know from which index a document was extracted (when extracting from an elasticsearch alias that may encompass multiple elasticsearch indexes ; it can also be used to send new documents, or corrected documents in the source elasticsearch index.).