Extraction Input¶
Introduction¶
This node enables you to make large extraction from an elasticsearch index.
This is the replacement node for elasticsearch_input
, which was removed in releases >= DAVE_6.4.X.
Documents returned in each request (hits.hits) are transformed into tuples before being emitted. Each tuple can have their meta information appended (for now only their id metadata can be selected !).
When this node is executed with scrolling enabled, a scroll context is created within your elasticsearch cluster. Scroll context contains basic meta information of your executed query at a given time. In other words, if new documents are appended in your queried index, and your scroll context was created before the newly documents were inserted, they will not be retrieved...
The topology containing this node exits with an exit code 0 when all documents are retrieved successfully (acknowledge) or exit with an exit code 1 if at least one document was not processed as expected (not acknowledge).
Mode(s)¶
Due to the nature on how this node works, executing aggregation queries will not work.
This node is intended to be used as an extraction node !
Available extraction mode are:
raw
: streams the whole result without any selectionhits_hits
: works in conjunction withoutput_columns
to select only certain fields you wish to stream... By default, all columns are selected.
Example(s)¶
Below is an example on using this node coupled with a file_output
.
runtime_id: generated_by_kibana
tenant: mytenant
version: "6.0"
platform_id: myplatformid
runtime: storm
dag:
- type: extraction_input
component: input
settings:
index: mytenant-events-*
id_column: myid
nodes:
- localhost
elastic_settings:
es.scroll.size: 10000
output_columns:
- field: channel
publish:
- stream: logs
fields:
- log
- type: punchlet_node
component: processor
settings:
punchlet_code: "{ print(root); }"
subscribe:
- component: input
stream: logs
publish:
- stream: logs
fields:
- log
- type: file_output
component: todisk
settings:
destination: file:///tmp/test_elastic_wrapper
streaming: true
strategy: at_least_once
fields:
- log
batch_size: 10000
subscribe:
- component: processor
stream: logs
fields:
- log
metrics:
reporters:
- type: console
Another usage example can be found in the Archive Extraction documentation;
Metrics¶
This section describes additional metrics that are published which may help you monitor your punchline easily. These metrics uses the same reporters as the one defined at root level of your punchline configuration.
On Start Success¶
Metric published if the punchline is initialized and started properly
{
"init": {
"process": {
"name": "elastic_input",
"id": "933@PUNCH"
},
"host": {
"name": "PUNCH"
},
"user": {
"name": "jonathan"
}
},
"application.deploy.mode": "foreground",
"extraction.input.event.timestamp": "2020-03-31T08:52:12.901Z",
"platform.id": "my-unique-platform-id",
"job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"type": "punch",
"content": {
"event_type": "application_start_cmd",
"level": "INFO",
"total_ram_memory_free_mb": 281,
"message": "application started",
"total_ram_memory_total_mb": 401
},
"platform": {
"application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"channel": "default",
"id": "my-unique-platform-id",
"tenant": "mytenant"
},
"target": {
"cluster": "foreground",
"type": "punchline"
},
"@timestamp": "2020-03-31T08:52:12.713Z",
"vendor": "thales",
"name": "extraction.input.application.start",
"platform.channel": "default",
"platform.tenant": "mytenant"
}
On Stop Success¶
Metric published if all tuples published by this spout are acknowledged - Exit code: 0
{
"init": {
"process": {
"name": "extraction_input",
"id": "1631@PUNCH"
},
"host": {
"name": "PUNCH"
},
"user": {
"name": "jonathan"
}
},
"application.deploy.mode": "foreground",
"extraction.input.event.timestamp": "2020-03-31T08:59:54.420Z",
"platform.id": "my-unique-platform-id",
"job.runtime.id": "8e532628-d35d-4660-9418-244580c1dc05",
"type": "punch",
"content": {
"event_type": "application_stop_cmd",
"level": "INFO",
"total_ram_memory_free_mb": 885,
"message": "application stopped",
"total_ram_memory_total_mb": 1344
},
"platform": {
"application": "8e532628-d35d-4660-9418-244580c1dc05",
"channel": "default",
"id": "my-unique-platform-id",
"tenant": "mytenant"
},
"target": {
"cluster": "foreground",
"type": "punchline"
},
"@timestamp": "2020-03-31T08:59:54.419Z",
"vendor": "thales",
"name": "extraction.input.application.end",
"platform.channel": "default",
"platform.tenant": "mytenant"
}
Processing Start¶
Metric published when this spout start doing some data cleaning before emitting tuple(s)
{
"init": {
"process": {
"name": "extraction_input",
"id": "933@PUNCH"
},
"host": {
"name": "PUNCH"
},
"user": {
"name": "jonathan"
}
},
"application.deploy.mode": "foreground",
"extraction.input.event.timestamp": "2020-03-31T08:52:13.072Z",
"platform.id": "my-unique-platform-id",
"job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"type": "punch",
"content": {
"event_type": "application_start_cmd",
"level": "INFO",
"total_ram_memory_free_mb": 472,
"message": "application_start_processing",
"total_ram_memory_total_mb": 610
},
"platform": {
"application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"channel": "default",
"id": "my-unique-platform-id",
"tenant": "mytenant"
},
"target": {
"cluster": "foreground",
"type": "punchline"
},
"@timestamp": "2020-03-31T08:52:13.070Z",
"vendor": "thales",
"name": "extraction.input.processing.start",
"platform.channel": "default",
"platform.tenant": "mytenant"
}
Processing End¶
Metric published when data cleaning is over, and the spout is ready to start emitting tuple(s)
{
"init": {
"process": {
"name": "extraction_input",
"id": "933@PUNCH"
},
"host": {
"name": "PUNCH"
},
"user": {
"name": "jonathan"
}
},
"application.deploy.mode": "foreground",
"extraction.input.event.timestamp": "2020-03-31T08:52:15.770Z",
"platform.id": "my-unique-platform-id",
"job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"type": "punch",
"content": {
"event_type": "application_start_cmd",
"level": "INFO",
"total_ram_memory_free_mb": 485,
"message": "application_end_processing",
"total_ram_memory_total_mb": 865
},
"platform": {
"application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"channel": "default",
"id": "my-unique-platform-id",
"tenant": "mytenant"
},
"target": {
"cluster": "foreground",
"type": "punchline"
},
"@timestamp": "2020-03-31T08:52:15.767Z",
"vendor": "thales",
"name": "extraction.input.processing.stop",
"platform.channel": "default",
"platform.tenant": "mytenant"
}
On Failure¶
Metric published when at least one tuple is not acknowledged - Exit code: 1
{
"init": {
"process": {
"name": "extraction_input",
"id": "2414@PUNCH"
},
"host": {
"name": "PUNCH"
},
"user": {
"name": "jonathan"
}
},
"reason": "an example exception",
"application.deploy.mode": "foreground",
"platform.id": "my-unique-platform-id",
"job.runtime.id": "af696c02-c367-4866-8f1f-a0e447c3d4f6",
"type": "punchline",
"content": {
"event_type": "application_stop_cmd_failure",
"level": "ERROR",
"total_ram_memory_free_mb": 499,
"message": "application stop failure",
"total_ram_memory_total_mb": 554
},
"platform": {
"application": "af696c02-c367-4866-8f1f-a0e447c3d4f6",
"channel": "default",
"id": "my-unique-platform-id",
"tenant": "mytenant"
},
"target": {
"cluster": "foreground",
"type": "punchline"
},
"@timestamp": "2020-03-31T09:07:19.618Z",
"vendor": "thales",
"platform.channel": "default",
"platform.tenant": "mytenant"
}
Statistics¶
Metric published every 1 second for user to be able to monitor the spout progression when fetching many documents...
{
"init": {
"process": {
"name": "extraction_input",
"id": "933@PUNCH"
},
"host": {
"name": "PUNCH"
},
"user": {
"name": "jonathan"
}
},
"number_of_request_to_es": 1,
"application.deploy.mode": "foreground",
"extraction.input.event.timestamp": "2020-03-31T08:52:15.774Z",
"platform.id": "my-unique-platform-id",
"job.runtime.id": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"type": "punch",
"content": {
"event_type": "STATUS",
"level": "INFO",
"ram_memory_free_mb": 484,
"message": "extraction statistics",
"ram_memory_total_mb": 865
},
"platform": {
"application": "5b6a4df0-3747-4c7a-a468-8cf93e979e29",
"channel": "default",
"id": "my-unique-platform-id",
"tenant": "mytenant"
},
"target": {
"cluster": "foreground",
"type": "punchline"
},
"total_documents_to_fetch": 14202,
"@timestamp": "2020-03-31T08:52:15.772Z",
"vendor": "thales",
"name": "extraction.input.statistics",
"total_documents_fetched": 0,
"platform.channel": "default",
"platform.tenant": "mytenant"
}
Parameters¶
Common Settings¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
index | String | true | NONE | The name of your elasticsearch index where data will be fetched. |
port | Integer | false | 9200 | Your Elasticsearch server Port. |
query | String - Json | false | match all | A valid Elasticsearch query. |
nodes | List of String | true | NONE | Hostnames of your elasticsearch nodes. In general, only one hostname is needed. |
elastic_settings | str(K)-str(V) | false | NONE | key-value arguments to control elasticsearch client |
id_column | String | false | id | Name of column where id of each documents. |
mode | String | false | hits_hits | extraction mode can either be hits_hits (selection of fields is possible -> see output_columns parameter) or raw. |
output_columns | List of Map | false | all columns | When using hits_hits mode, allows to select fields and give them aliases. |
Elastic Settings¶
Elastic settings | Type | Default value | Description |
---|---|---|---|
es.nodes.path.prefix | String | NONE | /something/to/append in case your elastic servers are behind a proxy |
es.scroll | String | false | enable scrolling request |
es.scroll.size | String | 50 | size of elastic query or size of each scroll query |
es.scroll.keepalive | String | 10m | how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc... |
es.net.http.auth.pass | String | NONE | must be used with es.net.http.auth.user |
es.net.http.auth.user | String | NONE | must be used with es.net.http.auth.pass |
es.net.http.auth.token | String | NONE | must be used with es.net.http.auth.token_type |
es.net.http.auth.token_type | String | NONE | must be used with es.net.http.auth.token |
es.net.ssl | String | false | enable ssl |
es.net.ssl.keystore.location | String | NONE | must be a jks , pkcs12 or p12 store and must contain the private and the public key of the node |
es.net.ssl.keystore.pass | String | NONE | do not provide if the keystore is not protected with a password |
es.net.ssl.truststore.location | String | NONE | must be a jks , pkcs12 or p12 store and must contain at least the node certificate and its CA chain, and every other certificate this node should trust |
es.net.ssl.truststore.pass | String | NONE | do not provide if the truststore is not protected with a password |
es.net.ssl.hostname.verification | String | true | Whether the node client should resolve the nodes hostnames to IP addresses or not |
es.max_concurrent_shard_requests | String | NONE | set how max shards elastic_input node can request at a time |
es.nodes.resolve.hostname | String | false | resolve a hostname: be sure that /etc/hosts referenced the proper IP address |
es.doc_type | String | NONE | add doc_type to requested URI, this is a deprecated feature by Elastic |
es.http.timeout | String | 1m | to override the the HTTP connection timeout |
socket_timeout_ms | String | 2m | timeout before reception data from ES ; increase this if request is using many filtering or many indices |
Output Columns¶
A list of json documents where each element contains the field value. This parameter should be used
with mode: hits_hits
Output columns | Type | Default value | Description |
---|---|---|---|
field | String | NONE | the field found in your document that you want to include in the resulting stream |
alias | String | NONE | the alias you want to give this field in the output tuple. |