Elastic Input¶
Overview¶
The elastic_input node enable you to fetch data from your elasticsearch cluster as a typed/untyped spark dataset. Two modes are currently available:
- hits_hits
- aggregation
warning
Only one of these mode should be set to true at a time. In case aggregation or hits_hits is set to true, the results will be a spark dataset with an auto inferred schema.
info
If the minimum configuration is set, full raw json data (received from elasticsearch) will be returned by this node within a spark dataset. The number of columns present will be determined on the number of hits present and the size you specified within your elasticsearch query. By default the size is set to 50.
Runtime Compatibility¶
- PySpark : ✅
- Spark : ✅
Examples¶
---
type: punchline
version: '6.0'
runtime: spark
tenant: default
dag:
- type: elastic_input
component: input
settings:
index: mytenant-events-*
publish:
- stream: data
The resulting output will be a set of rows where each of them is of string type and json valid.
Aggregation¶
Say you want to query elasticsearch using
curl -XGET "http://localhost:9200/city_offices/_search" -H 'Content-Type: application/json' -d'
{
"aggregations": {
"cities": {
"terms": {
"field": "city",
"size": 50
},
"aggregations": {
"office_types": {
"terms": {
"field": "office_type"
}
}
}
}
}
}'
Here is how it is done in apunchline:
Several Aggregations¶
Let us now execute severals aggregations. You probably wants to perform aggregations on a limited range of document. Here is an example query that achieves that:
---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: elastic_input
component: input
settings:
index: city_offices
query:
size: 0
aggregations:
cities:
terms:
field: city
size: 50
aggregations:
office_types:
terms:
field: office_types
publish:
- stream: default
Below is the complete configuration file that let you do that:
---
type: punchline
version: '6.0'
runtime: spark
dag:
- component: input
publish:
- stream: data
settings:
aggregation: true
index: mytenant-events*
query:
aggregations:
by_channel:
aggregations:
max_size:
max:
field: size
total_size:
sum:
field: size
terms:
field: vendor
query:
bool:
must:
- range:
"@timestamp":
gte: now-100h
lt: now
size: 0
type: elastic_input
- component: show
subscribe:
- component: input
stream: data
type: show
Parameters¶
Common Settings¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
column | String | false | source | By default is set to "source". This parameter is taken into account only if aggregation and hits_hits are set to false. A dataset of N row will be returned with the source and aggregations results as json format (N is an Integer). |
hits_hits | Boolean | false | false | Returns only the hits result of your elasticsearch query. |
aggregation | Boolean | false | false | Set to true to use only data returned by an aggregation query. |
index | String | true | NONE | The name of your elasticsearch index where data will be fetched. To add a document type, simply append /<type> to your index name. |
port | Integer | false | 9200 | Your Elasticsearch server Port. |
query | String - Json | false | match all | A valid Elasticsearch query. |
nodes | List of String | true | NONE | Hostnames of your elasticsearch nodes. In general, only one hostname is needed. |
timestamp | str(K)-str(V) | false | NONE | Take in a valid timestamp string (UTC). field_name: String [Required] name of the timestamp column and field_value: String [Required] timestamp value. |
output_columns | List of Json | false | NONE | A list of Json Objects where each of them can contain 3 fields: field: String [Required] the field found in your document that you want to include in the resulting dataset, alias: String [Optional] attribute a new name to the field you want to retrieve and type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean) |
elastic_settings | str(K)-str(V) | false | NONE | key-value arguments to control elasticsearch client |
id_column | String | false | id | Name of column where id of each documents. |
empty_dataset_columns | List of Json | false | NONE | Returns a dataset with a schema and data instead of an empty one. |
Elastic Settings¶
Elastic settings | Type | Default value | Description |
---|---|---|---|
es.nodes.path.prefix | String | NONE | /something/to/append in case your elastic servers are behind a proxy |
es.size | String | 50 | size of elastic query or size of each scroll query |
es.scroll | String | false | enable scrolling request |
es.scroll.keepalive | String | 10m | how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc... |
es.net.http.auth.pass | String | NONE | must be used with es.net.http.auth.user |
es.net.http.auth.user | String | NONE | must be used with es.net.http.auth.pass |
es.net.http.auth.token | String | NONE | must be used with es.net.http.auth.token_type |
es.net.http.auth.token_type | String | NONE | must be used with es.net.http.auth.token |
es.net.ssl | String | false | enable ssl |
es.net.ssl.keystore.location | String | NONE | must be a jks , pkcs12 or p12 store and must contain the private and the public key of the node |
es.net.ssl.keystore.pass | String | NONE | do not provide if the keystore is not protected with a password |
es.net.ssl.truststore.location | String | NONE | must be a jks , pkcs12 or p12 store and must contain at least the node certificate and its CA chain, and every other certificate this node should trust |
es.net.ssl.truststore.pass | String | NONE | do not provide if the truststore is not protected with a password |
es.net.ssl.hostname.verification | String | true | Whether the node client should resolve the nodes hostnames to IP addresses or not |
es.max_concurrent_shard_requests | String | NONE | set how max shards elastic_input node can request at a time |
es.nodes.resolve.hostname | String | false | resolve a hostname: be sure that /etc/hosts referenced the proper IP address |
es.doc_type | String | NONE | add doc_type to requested URI, this is a deprecated feature by Elastic |
es.http.timeout | String | 1m | to override the the HTTP connection timeout |
socket_timeout_ms | String | 2m | timeout before reception data from ES ; increase this if request is using many filtering or many indices |