Elasticsearch Stream Output¶
Compatible Spark/Pyspark
Overview¶
Like our elastic_batch_output
, you can use this node to write continuously to your elasticsearch cluster...
Example(s)¶
Below, we illustrate an example on inserting data continuously from a kafka broker to an elasticsearch cluster:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | { job: [ { type: kafka_stream_input component: input settings: { topic: mytenant_apache_httpd_archiving brokers: [ { host : localhost port : 9092 } ] start_offset_strategy : latest } publish: [ { stream: data } ] } { type: sql component: sql settings: { statement: SELECT CAST(value AS STRING) AS tmp FROM input_data } subscribe: [ { component: input stream: data } ] publish: [ { stream: data } ] } { type: elastic_stream_output component: elastic settings: { nodes: [ localhost ] port: 9200 index: myesstream type: _doc checkpoint_location: /tmp/my_es_checkpoint } subscribe: [ { stream: data component: sql } ] } ] } |
Configuration(s)¶
-
nodes
: ListDescription: [REQUIRED] a list of hostname.
-
port
: IntegerDescription: [REQUIRED] port to be used on your elasticsearch cluster.
-
index
: StringDescription: [REQUIRED] name of the index where data will be inserted...
-
type
: StringDescription: [REQUIRED] each document inserted will be tagged as a document type.
-
checkpoint_location
: StringDescription: [REQUIRED] path where recovery data will be store in case you want to resume from a failure...
-
elastic_settings
: MapDescription: [Optional] Specify ES-Hadoop options as key-value: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html. Some are them are disabled by default as they are used by our internal code.