Extraction Input¶
Introduction¶
This node enables you to make large extraction from an elasticsearch index.
This is the replacement node for elasticsearch_input, which was removed in releases >= DAVE_6.4.X.
Documents returned in each request (hits.hits) are transformed into tuples before being emitted. Each tuple can have their meta information appended (for now only their id metadata can be selected !).
When this node is executed with scrolling enabled, a scroll context is created within your elasticsearch cluster. Scroll context contains basic meta information of your executed query at a given time. In other words, if new documents are appended in your queried index, and your scroll context was created before the newly documents were inserted, they will not be retrieved...
The topology containing this node exits with an exit code 0 when all documents are retrieved successfully (acknowledge) or exit with an exit code 1 if at least one document was not processed as expected (not acknowledge).
Mode(s)¶
Due to the nature on how this node works, executing aggregation queries will not work.
This node is intended to be used as an extraction node !
Available extraction mode are:
- raw: streams the whole result without any selection
- hits_hits: works in conjunction with- output_columnsto select only certain fields you wish to stream... By default, all columns are selected.
Example(s)¶
Below is an example on using this node coupled with a file_output.
runtime_id: generated_by_kibana
tenant: mytenant
version: "6.0"
platform_id: myplatformid
runtime: storm
dag:
  - type: extraction_input
    component: input
    settings:
      index: mytenant-events-*
      id_column: myid
      nodes:
        - localhost
      elastic_settings:
        es.scroll.size: 10000
      output_columns:
        - field: channel
    publish:
      - stream: logs
        fields:
          - log
  - type: punchlet_node
    component: processor
    settings:
      punchlet_code: "{ print(root); }"
    subscribe:
      - component: input
        stream: logs
    publish:
      - stream: logs
        fields:
          - log
  - type: file_output
    component: todisk
    settings:
      destination: file:///tmp/test_elastic_wrapper
      streaming: true
      strategy: at_least_once
      fields:
        - log
      batch_size: 10000
    subscribe:
      - component: processor
        stream: logs
        fields:
          - log
metrics:
  reporters:
    - type: console
Another usage example can be found in the Archive Extraction documentation;
Metrics¶
This section describes additional metrics that are published which may help you monitor your punchline easily. These metrics uses the same reporters as the one defined at root level of your punchline configuration.
On Start Success¶
Metric published if the punchline is initialized and started properly
{
  "init": {
    "process": {
      "name": "elastic_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:12.901Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "application_start_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 281,
    "message": "application started",
    "total_ram_memory_total_mb": 401
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:52:12.713Z",
  "vendor": "thales",
  "name": "extraction.input.application.start",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}
On Stop Success¶
Metric published if all tuples published by this spout are acknowledged - Exit code: 0
{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "1631@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:59:54.420Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "8e532628-d35d-4660-9418-244580c1dc05",
  "type": "punch",
  "content": {
    "event_type": "application_stop_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 885,
    "message": "application stopped",
    "total_ram_memory_total_mb": 1344
  },
  "platform": {
    "application": "8e532628-d35d-4660-9418-244580c1dc05",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:59:54.419Z",
  "vendor": "thales",
  "name": "extraction.input.application.end",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}
Processing Start¶
Metric published when this spout start doing some data cleaning before emitting tuple(s)
{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:13.072Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "application_start_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 472,
    "message": "application_start_processing",
    "total_ram_memory_total_mb": 610
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:52:13.070Z",
  "vendor": "thales",
  "name": "extraction.input.processing.start",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}
Processing End¶
Metric published when data cleaning is over, and the spout is ready to start emitting tuple(s)
{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:15.770Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "application_start_cmd",
    "level": "INFO",
    "total_ram_memory_free_mb": 485,
    "message": "application_end_processing",
    "total_ram_memory_total_mb": 865
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T08:52:15.767Z",
  "vendor": "thales",
  "name": "extraction.input.processing.stop",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}
On Failure¶
Metric published when at least one tuple is not acknowledged - Exit code: 1
{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "2414@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "reason": "an example exception",
  "application.deploy.mode": "foreground",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "af696c02-c367-4866-8f1f-a0e447c3d4f6",
  "type": "punchline",
  "content": {
    "event_type": "application_stop_cmd_failure",
    "level": "ERROR",
    "total_ram_memory_free_mb": 499,
    "message": "application stop failure",
    "total_ram_memory_total_mb": 554
  },
  "platform": {
    "application": "af696c02-c367-4866-8f1f-a0e447c3d4f6",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "@timestamp": "2020-03-31T09:07:19.618Z",
  "vendor": "thales",
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}
Statistics¶
Metric published every 1 second for user to be able to monitor the spout progression when fetching many documents...
{
  "init": {
    "process": {
      "name": "extraction_input",
      "id": "933@PUNCH"
    },
    "host": {
      "name": "PUNCH"
    },
    "user": {
      "name": "jonathan"
    }
  },
  "number_of_request_to_es": 1,
  "application.deploy.mode": "foreground",
  "extraction.input.event.timestamp": "2020-03-31T08:52:15.774Z",
  "platform.id": "my-unique-platform-id",
  "job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
  "type": "punch",
  "content": {
    "event_type": "STATUS",
    "level": "INFO",
    "ram_memory_free_mb": 484,
    "message": "extraction statistics",
    "ram_memory_total_mb": 865
  },
  "platform": {
    "application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
    "channel": "default",
    "id": "my-unique-platform-id",
    "tenant": "mytenant"
  },
  "target": {
    "cluster": "foreground",
    "type": "punchline"
  },
  "total_documents_to_fetch": 14202,
  "@timestamp": "2020-03-31T08:52:15.772Z",
  "vendor": "thales",
  "name": "extraction.input.statistics",
  "total_documents_fetched": 0,
  "platform.channel": "default",
  "platform.tenant": "mytenant"
}
Parameters¶
Common Settings¶
| Name | Type | mandatory | Default value | Description | 
|---|---|---|---|---|
| index | String | true | NONE | The name of your elasticsearch index where data will be fetched. | 
| port | Integer | false | 9200 | Your Elasticsearch server Port. | 
| query | String - Json | false | match all | A valid Elasticsearch query. | 
| nodes | List of String | true | NONE | Hostnames of your elasticsearch nodes. In general, only one hostname is needed. | 
| elastic_settings | str(K)-str(V) | false | NONE | key-value arguments to control elasticsearch client | 
| id_column | String | false | id | Name of column where id of each documents. | 
| mode | String | false | hits_hits | extraction mode can either be hits_hits (selection of fields is possible -> see output_columnsparameter) or raw. | 
| output_columns | List of Map | false | all columns | When using hits_hits mode, allows to select fields and give them aliases. | 
Elastic Settings¶
| Elastic settings | Type | Default value | Description | 
|---|---|---|---|
| es.nodes.path.prefix | String | NONE | /something/to/append in case your elastic servers are behind a proxy | 
| es.scroll | String | false | enable scrolling request | 
| es.scroll.size | String | 50 | size of elastic query or size of each scroll query | 
| es.scroll.keepalive | String | 10m | how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc... | 
| es.net.http.auth.pass | String | NONE | must be used with es.net.http.auth.user | 
| es.net.http.auth.user | String | NONE | must be used with es.net.http.auth.pass | 
| es.net.http.auth.token | String | NONE | must be used with es.net.http.auth.token_type | 
| es.net.http.auth.token_type | String | NONE | must be used with es.net.http.auth.token | 
| es.net.ssl | String | false | enable ssl | 
| es.net.ssl.keystore.location | String | NONE | must be a jks,pkcs12orp12store and must contain the private and the public key of the node | 
| es.net.ssl.keystore.pass | String | NONE | do not provide if the keystore is not protected with a password | 
| es.net.ssl.truststore.location | String | NONE | must be a jks,pkcs12orp12store and must contain at least the node certificate and its CA chain, and every other certificate this node should trust | 
| es.net.ssl.truststore.pass | String | NONE | do not provide if the truststore is not protected with a password | 
| es.net.ssl.hostname.verification | String | true | Whether the node client should resolve the nodes hostnames to IP addresses or not | 
| es.max_concurrent_shard_requests | String | NONE | set how max shards elastic_input node can request at a time | 
| es.nodes.resolve.hostname | String | false | resolve a hostname: be sure that /etc/hosts referenced the proper IP address | 
| es.doc_type | String | NONE | add doc_type to requested URI, this is a deprecated feature by Elastic | 
| es.http.timeout | String | 1m | to override the the HTTP connection timeout | 
| socket_timeout_ms | String | 2m | timeout before reception data from ES ; increase this if request is using many filtering or many indices | 
Output Columns¶
A list of json documents where each element contains the field value. This parameter should be used
with mode: hits_hits
| Output columns | Type | Default value | Description | 
|---|---|---|---|
| field | String | NONE | the field found in your document that you want to include in the resulting stream | 
| alias | String | NONE | the alias you want to give this field in the output tuple. |