Skip to content

Extraction Input

Introduction

This node enables you to make large extraction from an elasticsearch index.

This is the replacement node for elasticsearch_input, which was removed in releases >= DAVE_6.4.X.

Documents returned in each request (hits.hits) are transformed into tuples before being emitted. Each tuple can have their meta information appended (for now only their id metadata can be selected !).

When this node is executed with scrolling enabled, a scroll context is created within your elasticsearch cluster. Scroll context contains basic meta information of your executed query at a given time. In other words, if new documents are appended in your queried index, and your scroll context was created before the newly documents were inserted, they will not be retrieved...

The topology containing this node exits with an exit code 0 when all documents are retrieved successfully (acknowledge) or exit with an exit code 1 if at least one document was not processed as expected (not acknowledge).

Mode(s)

Due to the nature on how this node works, executing aggregation queries will not work.

This node is intended to be used as an extraction node !

Available extraction mode are:

  • raw: streams the whole result without any selection
  • hits_hits: works in conjunction with output_columns to select only certain fields you wish to stream... By default, all columns are selected.

Example(s)

Below is an example on using this node coupled with a file_output.

runtime_id: generated_by_kibana
tenant: mytenant
version: "6.0"
platform_id: myplatformid
runtime: storm
dag:
  - type: extraction_input
    component: input
    settings:
      index: mytenant-events-*
      id_column: myid
      nodes:
        - localhost
      elastic_settings:
        es.scroll.size: 10000
      output_columns:
        - field: channel
    publish:
      - stream: logs
        fields:
          - log

  - type: punchlet_node
    component: processor
    settings:
      punchlet_code: "{ print(root); }"
    subscribe:
      - component: input
        stream: logs
    publish:
      - stream: logs
        fields:
          - log

  - type: file_output
    component: todisk
    settings:
      destination: file:///tmp/test_elastic_wrapper
      streaming: true
      strategy: at_least_once
      fields:
        - log
      batch_size: 10000
    subscribe:
      - component: processor
        stream: logs
        fields:
          - log
metrics:
  reporters:
    - type: console

Another usage example can be found in the Archive Extraction documentation;

Metrics

This section describes additional metrics that are published which may help you monitor your punchline easily. These metrics uses the same reporters as the one defined at root level of your punchline configuration.

On Start Success

Metric published if the punchline is initialized and started properly

{
  "init": {
    "process": {
      "name": "elastic_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:12.901Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "application_start_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 281,
    "message": "application started",
    "total_ram_memory_total_mb": 401
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:52:12.713Z",
  "vendor": "thales",
  "name": "extraction.input.application.start",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}

On Stop Success

Metric published if all tuples published by this spout are acknowledged - Exit code: 0

{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "1631@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:59:54.420Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "8e532628-d35d-4660-9418-244580c1dc05",
  "type": "punch",
  "content": {
    "event_type": "application_stop_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 885,
    "message": "application stopped",
    "total_ram_memory_total_mb": 1344
  },
  "platform": {
    "application": "8e532628-d35d-4660-9418-244580c1dc05",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:59:54.419Z",
  "vendor": "thales",
  "name": "extraction.input.application.end",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}

Processing Start

Metric published when this spout start doing some data cleaning before emitting tuple(s)

{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:13.072Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "application_start_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 472,
    "message": "application_start_processing",
    "total_ram_memory_total_mb": 610
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:52:13.070Z",
  "vendor": "thales",
  "name": "extraction.input.processing.start",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}

Processing End

Metric published when data cleaning is over, and the spout is ready to start emitting tuple(s)

{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:15.770Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "application_start_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 485,
    "message": "application_end_processing",
    "total_ram_memory_total_mb": 865
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:52:15.767Z",
  "vendor": "thales",
  "name": "extraction.input.processing.stop",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}

On Failure

Metric published when at least one tuple is not acknowledged - Exit code: 1

{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "2414@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "reason": "an example exception",
  "application.deploy.mode": "foreground",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "af696c02-c367-4866-8f1f-a0e447c3d4f6",
  "type": "punchline",
  "content": {
    "event_type": "application_stop_cmd_failure",
    "level": "ERROR",
    "total_ram_memory_free_mb": 499,
    "message": "application stop failure",
    "total_ram_memory_total_mb": 554
  },
  "platform": {
    "application": "af696c02-c367-4866-8f1f-a0e447c3d4f6",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T09:07:19.618Z",
  "vendor": "thales",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}

Statistics

Metric published every 1 second for user to be able to monitor the spout progression when fetching many documents...

{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "number_of_request_to_es": 1,
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:15.774Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "STATUS",
    "level": "INFO",
    "ram_memory_free_mb": 484,
    "message": "extraction statistics",
    "ram_memory_total_mb": 865
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "total_documents_to_fetch": 14202,
  "@timestamp": "2020-03-31T08:52:15.772Z",
  "vendor": "thales",
  "name": "extraction.input.statistics",
  "total_documents_fetched": 0,
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}

Parameters

Common Settings

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client
id_column String false id Name of column where id of each documents.
mode String false hits_hits extraction mode can either be hits_hits (selection of fields is possible -> see output_columns parameter) or raw.
output_columns List of Map false all columns When using hits_hits mode, allows to select fields and give them aliases.

Elastic Settings

Elastic settings Type Default value Description
es.nodes.path.prefix String NONE /something/to/append in case your elastic servers are behind a proxy
es.scroll String false enable scrolling request
es.scroll.size String 50 size of elastic query or size of each scroll query
es.scroll.keepalive String 10m how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc...
es.net.http.auth.pass String NONE must be used with es.net.http.auth.user
es.net.http.auth.user String NONE must be used with es.net.http.auth.pass
es.net.http.auth.token String NONE must be used with es.net.http.auth.token_type
es.net.http.auth.token_type String NONE must be used with es.net.http.auth.token
es.net.ssl String false enable ssl
es.net.ssl.keystore.location String NONE must be a jks, pkcs12 or p12 store and must contain the private and the public key of the node
es.net.ssl.keystore.pass String NONE do not provide if the keystore is not protected with a password
es.net.ssl.truststore.location String NONE must be a jks, pkcs12 or p12 store and must contain at least the node certificate and its CA chain, and every other certificate this node should trust
es.net.ssl.truststore.pass String NONE do not provide if the truststore is not protected with a password
es.net.ssl.hostname.verification String true Whether the node client should resolve the nodes hostnames to IP addresses or not
es.max_concurrent_shard_requests String NONE set how max shards elastic_input node can request at a time
es.nodes.resolve.hostname String false resolve a hostname: be sure that /etc/hosts referenced the proper IP address
es.doc_type String NONE add doc_type to requested URI, this is a deprecated feature by Elastic
es.http.timeout String 1m to override the the HTTP connection timeout
socket_timeout_ms String 2m timeout before reception data from ES ; increase this if request is using many filtering or many indices

Output Columns

A list of json documents where each element contains the field value. This parameter should be used with mode: hits_hits

Output columns Type Default value Description
field String NONE the field found in your document that you want to include in the resulting stream
alias String NONE the alias you want to give this field in the output tuple.