Elastic Batch Input¶
Compatible Spark/Pyspark
The elastic_batch_input
node fetches data from an elasticsearch index.
The read data is transformed into a Dataset
Its default behavior is simply to transform the input document into rows. You simply must define the field you are interested in together with their types.
Say you have an index "punch-academy-example" with documents:
1 2 3 4 5 6 7 8 9 10 | { "address": { "street": "clemenceau" }, "name": "phil", "age": 21, "friends": [ "alice" ] } |
Here is how to generate rows containing a single column from the address.street
and age
json field
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | { type: elastic_batch_input component: input settings: { index: academy-example es_cluster: es_search nodes: [ localhost ] output_columns: [ { type: string field: address.street alias: address_street } { type: integer field: age } ] } publish: [ { stream: data } ] } |
The generated dataset will look:
1 2 3 4 5 6 7 | +--------------------+--------------+---+ | id|address_street|age| +--------------------+--------------+---+ |eQeYvWkB3c-USBLvf06M| clemenceau| 21| |egeYvWkB3c-USBLvf06M| clemenceau| 23| |eweYvWkB3c-USBLvf06M| clemenceau| 53| +--------------------+--------------+---+ |
Info
Notice the id
column. By default we provide each document id.
Specifying a query¶
You may want to read only document based on query. Use the query
property:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | settings: { ... query: { bool: { must: [ { range: { age: { gte: 20, lte: 30 } } } ] } } } } |
You then get
1 2 3 4 5 6 | +--------------------+--------------------+---+ | id| address_street|age| +--------------------+--------------------+---+ |eQeYvWkB3c-USBLvf06M| clemenceau | 21| |egeYvWkB3c-USBLvf06M| clemenceau | 23| +--------------------+--------------------+---+ |
Warning
ES-HADOOP columns naming is set by default to the last key of your nested json. For instance: { "key1": { "key2": 1 } } will produce a dataset with a column name "key2" instead of "key1.key2". To bypass this issue, use output_columns parameter and set an alias for the desire column name, for instance "key1_key2".
Advanced Options¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | settings: { // you can control the elasticsearch listening port port: 9200 // Should your document not use the default '_doc' type you can // set it explicitly type: doc // Pass in some elastic properties: elastic_settings: { es.index.read.missing.as.empty: no } } } |
Below is an example on how to use this PML node:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | { runtime_id: my-job-test tenant: job_tenant_test job:[ { description: ''' read all metricbeat documents from local elasticsearch and gennerate a Dataset<Row> out of it ''' type: elastic_batch_input component: input settings: { index: platform-metricbeat-* type: doc nodes: [ localhost ] port : 9200 elastic_settings: { es.index.read.missing.as.empty: yes } id_column: id output_columns: [ { field: beat.version } ] query : { query: { bool: { must: [ { term : { metricset.module : system } } { range : { @timestamp : { gte : now-10m lt : now } } } ] } } } } publish: [ { stream: default } ] } { description: ''' store all data into an elasticsearch index ''' type: elastic_batch_output component: output settings: { # the target index name index: example # the target elasticsearch cluster name. This name if the one defined # in the elasticsearch configuration file. es_cluster: es_search # the document type. type: doc # The name of the column holding the document id id_column: id # Optional : the elasticsearch nodes listening port # port : 9200 # Optional : insertion mode, by default will result to overwrite output_mode: overwrite } subscribe: [ { component: input stream:default } ] } ] |
Configuration(s)¶
-
index
: StringDescription: [Required] The name of your elasticsearch index where data will be fetched.
-
port
: IntegerDescription: [Optional] Your Elasticsearch server Port.
-
type
: StringDescription: [Optional] Document type that will be retrieved from your elasticsearch index.
-
query
: JsonDescription: [Optional] A valid Elasticsearch query.
-
nodes
: ListDescription: [Required] Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
-
elastic_settings
: MapDescription: [Optional] Specify ES-Hadoop options as key-value: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html. Some are them are disabled by default as they are used by our internal code.
-
es_cluster
: StringDescription: [Optional] Name of your Elasticsearch cluster located in your punchplatform.properties.
-
id_column
: StringDescription: [Optional] Name of the column that will be used to store each document id. To be noted that
_id
is a reserved name within Elasticsearch terminology and hence abide yourself from using... By default, we named the fieldid
. -
output_columns
: ListDescription: [Optional] A list of Json Objects where each of them can contain 3 fields:
1 2 3
field: String [Required] the field found in your document that you want to include in the resulting dataset alias: String [Optional] attribute a new name to the field you want to retrieve type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
If
output_columns
field is not set, all data will be published with their default type. -
with_null_values
: BooleanDescription: [Optional] Remove columns whose values are null.
-
empty_dataset_columns
: List of JsonDescription: [Optional] Returns a dataset with a schema and data instead of an empty one.