Elastic Input¶
Compatible Spark/Pyspark
The elastic_input nodes enable you to fetch data from your elasticsearch cluster and load in inside your spark cluster. Two modes are currently available:
- hits_hits
- aggregation
Warning
Only one of these mode should be set to true at a time. In case aggregation or hits_hits is set to true, the results will be a spark dataset with an auto inferred schema.
Info
If the minimum configuration is set, full raw json data (received from elasticsearch) will be returned by this node within a spark dataset. The number of columns present will be determined on the number of hits present and the size you specified within your elasticsearch query. By default the size is set to 10.
Examples¶
Basic configuration¶
This configuration will output a dataframe with a single column where each rows contains the response from Elasticsearch.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | { job:[ { type: elastic_input component: input settings: { index: mytenant-events-* nodes: [ localhost ] } publish: [ { stream: data } ] } ] } |
Aggregation Query¶
Below is an aggregation configuration.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | { job: [ { type: elastic_input component: input settings: { index: city_offices nodes: [ localhost ] // the query part correspond to the part following the '_search' // part of the query url. See the example below. query: { // Set a zero size to avoid retrieving the matching document // I.e. what you want are the aggregation buckets. size: 0 aggregations: { cities: { terms: { field: city size: 50 } aggregations: { office_types: { terms: { field : office_types } } } } } } } publish: [ { stream: default field: aggregates } ] verbose: true } ] } |
To make it clear, here is the corresponding elasticsearch query.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | curl -XGET "http://localhost:9200/city_offices/_search" -H 'Content-Type: application/json' -d' { "aggregations": { "cities": { "terms": { "field": "city", "size": 50 }, "aggregations": { "office_types": { "terms": { "field": "office_type" } } } } } }' |
Several Aggregations¶
You can execute several aggregations. Here is an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | query: { // Set a zero size to avoid retrieving the matching document // I.e. what you want are the aggregation buckets. size: 0 aggregations: { cities: { terms: { field: city size: 50 } aggregations: { office_types: { terms: { field : office_types } } } } citizens: { nested: { path: citizens } aggs: { occupations: { terms: { field: citizens.occupation size: 50 } } } } } } |
Range Query¶
It is likely you will want to perform aggregations on a limited range of document. Here is an example query that achieves that:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | query: { size: 0 query: { range: { @timestamp: { gte: 2019-01-23T15:00:00.000Z lt: 2019-01-23T15:01:00.000Z } } } aggregations: { by_channel: { terms: { field: channel } aggregations: { max_size: { max: { field: size } } total_size: { sum: { field: size } } } } } } |
Full configuration example¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | { job: [ { component: input publish: [ { stream: data } ] settings: { aggregation: true index: mytenant-events* nodes: [ localhost ] query: { aggregations: { by_channel: { aggregations: { max_size: { max: { field: size } } total_size: { sum: { field: size } } } terms: { field: vendor } } } query: { bool: { must: [ { range: { @timestamp: { gte: now-100h lt: now } } } ] } } size: 0 } } type: elastic_input } { component: show subscribe: [ { component: input stream: data } ] type: show } ] } |
Configuration(s)¶
-
column
: StringDescription: [Optional] By default is set to "source". This parameter is taken into account only if aggregation and hits_hits are set to false. A dataset of N row will be returned with the source and aggregations results as json format (N is an Integer).
-
hits_hits
: BooleanDescription: [Optional] By default is set to false. Returns only the hits result of your elasticsearch query.
-
aggregation
: BooleanDescription: [Optional] By default is set to false. Set to true to use only data returned by an aggregation query.
-
index
: StringDescription: [Required] The name of your elasticsearch index where data will be fetched.
-
port
: IntegerDescription: [Optional] Your Elasticsearch server Port.
-
type
: StringDescription: [Optional] Document type that will be retrieved from your elasticsearch index.
-
query
: JsonDescription: [Optional] A valid Elasticsearch query.
-
nodes
: ListDescription: [Required] Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
-
timestamp
: MapDescription: [Optional] Take in a valid timestamp string (UTC).
1 2
field_name: String [Required] name of the timestamp column field_value: String [Required] timestamp value
-
output_columns
: ListDescription: [Optional] A list of Json Objects where each of them can contain 3 fields:
1 2 3
field: String [Required] the field found in your document that you want to include in the resulting dataset alias: String [Optional] attribute a new name to the field you want to retrieve type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
-
elastic_settings
: MapDescription: [Optional] key-value arguments to control elasticsearch client:
1 2 3 4 5 6 7 8 9 10 11 12 13
es.nodes.path.prefix: /something/to/append in case your elastic servers are behind a proxy es.size: 50 (default) | a_number es.scroll: true | false (default) enable scrolling for current request es.scroll.keepalive: 10m (default) | 1m, 1d, 1y, etc... es.net.ssl: true | false (default) es.net.http.auth.pass: a_password (NULL default) must be used with es.net.http.auth.user es.net.http.auth.user: a_user (NULL default) must be used with es.net.http.auth.pass es.net.http.auth.token: a_token (NULL default) must be used with es.net.http.auth.token_type es.net.http.auth.token_type: tokenApi (NULL default) must be used with es.net.http.auth.token es.max_concurrent_shard_requests: a_number (NULL default) set how max shards elastic_input node can request at a time es.nodes.resolve.hostname: true (default) | false resolve a hostname: be sure that /etc/hosts referenced the proper IP address es.http.timeout: 1m (default) to override the the HTTP connection timeout socket_timeout_ms: 2m (default) timeout before reception data from ES ; increase this if request is using many filtering or many indices
-
id_column
: String
Description: [Optional] specify the name of the column id
-
empty_dataset_columns
: List of JsonDescription: [Optional] Returns a dataset with a schema and data instead of an empty one.