Skip to content

Elastic Input Wrapper

Introduction

This node is an alternative to our elasticsearch_input node. It enables you to make large extraction from an elasticsearch index...

Documents returned in each requests (hits.hits) are transformed into tuples before being emitted. Each tuple can have their meta information appended (for now only their id meta data can be selected !).

When this node is executed with scrolling enabled, a scroll context is created within your elasticsearch cluster. Scroll context contains basic meta information of your executed query at a given time. In other words, if new documents are appended in your queried index and your scroll context was created before the newly documents were inserted, they will not be retrieved...

The topology containing this node exits with an exit code 0 when all documents are retrieved successfully (acknowledge) or exit with an exit code 1 if at least one document was not processed as expected (not acknowledge).

Mode(s)

Due to the nature on how this node works, executing aggregation queries will not work.

This node is intended to be used as an extraction node !

Available extraction mode are:

  • raw: streams the whole result without any selection
  • hits_hits: works in conjunction with output_columns to select only certain fields you wish to stream...

Example(s)

Below is an example on using this node coupled with a file_output.

{
  runtime_id: generated_by_kibana
  tenant: mytenant
  version: "6.0"
  platform_id: myplatformid
  runtime: storm
  dag:
  [
    {
      type: extraction_input
      component: input
      settings:
      {
        index: mytenant-events-*
        id_column: myid
        nodes:
        [
          localhost
        ]
        elastic_settings:
        {
          es.scroll.size: 10000
        }
        output_columns:
        [
          {
            field: channel
          }
        ]
      }
      publish:
      [
        {
          stream: logs
          fields:
          [
            log
          ]
        }
      ]
    }
    {
      type: punchlet_node
      component: processor
      settings:
      {
        punchlet_code: "{ print(root); }"
      }
      subscribe:
      [
        {
          component: input
          stream: logs
        }
      ]
      publish:
      [
        {
          stream: logs
          fields:
          [
            log
          ]
        }
      ]
    }
    {
      type: file_output
      component: todisk
      settings:
      {
        destination: file:///tmp/test_elastic_wrapper
        streaming: true
        strategy: at_least_once
        fields:
        [
          log
        ]
        batch_size: 10000
      }
      subscribe:
      [
        {
          component: processor
          stream: logs
          fields:
          [
            log
          ]
        }
      ]
    }
  ]
  metrics:
  {
    reporters:
    [
      {
        type: console
      }
    ]
  }
}

Metrics

This section describes additionals metrics that are published which may help you monitor your punchline easily. These metrics uses the same reporters as the one defined at root level of your punchline configuration.

On Start Success

Metric published if the punchline is initialized and started properly

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
```punchline
{
  init:
  {
    process:
    {
      name: elastic_input
      id: 933@PUNCH
    }
    host:
    {
      name: PUNCH
    }
    user:
    {
      name: jonathan
    }
  }
  application.deploy.mode: foreground
  extraction.input.event.timestamp: 2020-03-31T08:52:12.901Z
  platform.id: my-unique-platform-id
  job.runtime.id: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
  type: punch
  content:
  {
    event_type: application_start_cmd
    level: INFO
    total_ram_memory_free_mb: 281
    message: application started
    total_ram_memory_total_mb: 401
  }
  platform:
  {
    application: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
    channel: default
    id: my-unique-platform-id
    tenant: mytenant
  }
  target:
  {
    cluster: foreground
    type: punchline
  }
  @timestamp: 2020-03-31T08:52:12.713Z
  vendor: thales
  name: extraction.input.application.start
  platform.channel: default
  platform.tenant: mytenant
}
```

On Stop Success

Metric published if all tuples published by this spout are acknowledged - Exit code: 0

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
```punchline
{
  init:
  {
    process:
    {
      name: extraction_input
      id: 1631@PUNCH
    }
    host:
    {
      name: PUNCH
    }
    user:
    {
      name: jonathan
    }
  }
  application.deploy.mode: foreground
  extraction.input.event.timestamp: 2020-03-31T08:59:54.420Z
  platform.id: my-unique-platform-id
  job.runtime.id: 8e532628-d35d-4660-9418-244580c1dc05
  type: punch
  content:
  {
    event_type: application_stop_cmd
    level: INFO
    total_ram_memory_free_mb: 885
    message: application stopped
    total_ram_memory_total_mb: 1344
  }
  platform:
  {
    application: 8e532628-d35d-4660-9418-244580c1dc05
    channel: default
    id: my-unique-platform-id
    tenant: mytenant
  }
  target:
  {
    cluster: foreground
    type: punchline
  }
  @timestamp: 2020-03-31T08:59:54.419Z
  vendor: thales
  name: extraction.input.application.end
  platform.channel: default
  platform.tenant: mytenant
}
```

Processing Start

Metric published when this spout start doing some data cleaning before emitting tuple(s)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
```punchline
{
  init:
  {
    process:
    {
      name: extraction_input
      id: 933@PUNCH
    }
    host:
    {
      name: PUNCH
    }
    user:
    {
      name: jonathan
    }
  }
  application.deploy.mode: foreground
  extraction.input.event.timestamp: 2020-03-31T08:52:13.072Z
  platform.id: my-unique-platform-id
  job.runtime.id: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
  type: punch
  content:
  {
    event_type: application_start_cmd
    level: INFO
    total_ram_memory_free_mb: 472
    message: application_start_processing
    total_ram_memory_total_mb: 610
  }
  platform:
  {
    application: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
    channel: default
    id: my-unique-platform-id
    tenant: mytenant
  }
  target:
  {
    cluster: foreground
    type: punchline
  }
  @timestamp: 2020-03-31T08:52:13.070Z
  vendor: thales
  name: extraction.input.processing.start
  platform.channel: default
  platform.tenant: mytenant
}
```

Processing End

Metric published when data cleaning is over and the spout is ready to start emitting tuple(s)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
```punchline
{
  init:
  {
    process:
    {
      name: extraction_input
      id: 933@PUNCH
    }
    host:
    {
      name: PUNCH
    }
    user:
    {
      name: jonathan
    }
  }
  application.deploy.mode: foreground
  extraction.input.event.timestamp: 2020-03-31T08:52:15.770Z
  platform.id: my-unique-platform-id
  job.runtime.id: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
  type: punch
  content:
  {
    event_type: application_start_cmd
    level: INFO
    total_ram_memory_free_mb: 485
    message: application_end_processing
    total_ram_memory_total_mb: 865
  }
  platform:
  {
    application: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
    channel: default
    id: my-unique-platform-id
    tenant: mytenant
  }
  target:
  {
    cluster: foreground
    type: punchline
  }
  @timestamp: 2020-03-31T08:52:15.767Z
  vendor: thales
  name: extraction.input.processing.stop
  platform.channel: default
  platform.tenant: mytenant
}
```

On Failure

Metric published when at least one tuple is not acknowledged - Exit code: 1

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
```punchline
{
  init:
  {
    process:
    {
      name: extraction_input
      id: 2414@PUNCH
    }
    host:
    {
      name: PUNCH
    }
    user:
    {
      name: jonathan
    }
  }
  reason: an example exception
  application.deploy.mode: foreground
  platform.id: my-unique-platform-id
  job.runtime.id: af696c02-c367-4866-8f1f-a0e447c3d4f6
  type: punchline
  content:
  {
    event_type: application_stop_cmd_failure
    level: ERROR
    total_ram_memory_free_mb: 499
    message: application stop failure
    total_ram_memory_total_mb: 554
  }
  platform:
  {
    application: af696c02-c367-4866-8f1f-a0e447c3d4f6
    channel: default
    id: my-unique-platform-id
    tenant: mytenant
  }
  target:
  {
    cluster: foreground
    type: punchline
  }
  @timestamp: 2020-03-31T09:07:19.618Z
  vendor: thales
  platform.channel: default
  platform.tenant: mytenant
}
```

Statistics

Metric published every 1 second for user to be able to monitor the spout progression when fetching many documents...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
```punchline
{
  init:
  {
    process:
    {
      name: extraction_input
      id: 933@PUNCH
    }
    host:
    {
      name: PUNCH
    }
    user:
    {
      name: jonathan
    }
  }
  number_of_request_to_es: 1
  application.deploy.mode: foreground
  extraction.input.event.timestamp: 2020-03-31T08:52:15.774Z
  platform.id: my-unique-platform-id
  job.runtime.id: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
  type: punch
  content:
  {
    event_type: STATUS
    level: INFO
    ram_memory_free_mb: 484
    message: extraction statistics
    ram_memory_total_mb: 865
  }
  platform:
  {
    application: 5b6a4df0-3747-4c7a-a468-8cf93e979e29
    channel: default
    id: my-unique-platform-id
    tenant: mytenant
  }
  target:
  {
    cluster: foreground
    type: punchline
  }
  total_documents_to_fetch: 14202
  @timestamp: 2020-03-31T08:52:15.772Z
  vendor: thales
  name: extraction.input.statistics
  total_documents_fetched: 0
  platform.channel: default
  platform.tenant: mytenant
}
```

Parameters

Common Settings

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client
id_column String false id Name of colum where id of each documents.
mode String false hits_hits extraction mode can either be hits_hits (selection of fields is possible -> see output_columns parameter) or raw.

Elastic Settings

Elastic settings Type Default value Description
es.nodes.path.prefix String NONE /something/to/append in case your elastic servers are behind a proxy
es.size String 50 size of elastic query or size of each scroll query
es.scroll String false enable scrolling request
es.scroll.keepalive String 10m how long each scroll query should be kept alived, can be: 1m, 1d, 1y etc...
es.net.ssl String false enable ssl
es.net.http.auth.pass String NONE must be used with es.net.http.auth.user
es.net.http.auth.user String NONE must be used with es.net.http.auth.pass
es.net.http.auth.token String NONE must be used with es.net.http.auth.token_type
es.net.http.auth.token_type String NONE must be used with es.net.http.auth.token
es.max_concurrent_shard_requests String NONE set how max shards elastic_input node can request at a time
es.nodes.resolve.hostname String false resolve a hostname: be sure that /etc/hosts referenced the proper IP address
es.doc_type String NONE add doc_type to requested URI, this is a deprecated feature by Elastic
es.http.timeout String  1m to override the the HTTP connection timeout
socket_timeout_ms String 2m  timeout before reception data from ES ; increase this if request is using many filtering or many indices

Output Columns

A list of json documents where each element contains the field value. This parameter should be used with mode: hits_hits

Output columns Type Default value Description
field String NONE the field found in your document that you want to include in the resulting stream