Skip to content

Elastic Stream Output


Like our elastic_batch_output, you can use this node to write to an elasticsearch cluster but in a streaming fashion.

Runtime Compatibility

  • PySpark :
  • Spark :


The following example illustrates an example of inserting data continuously from a kafka broker to an elasticsearch cluster:

type: punchline
version: '6.0'
runtime: spark
- type: kafka_stream_input
  component: input
    topic: mytenant_apache_httpd_archiving
    start_offset_strategy: latest
  - stream: data
- type: sql
  component: sql
    statement: SELECT  CAST(value AS STRING) AS tmp FROM input_data
  - component: input
    stream: data
  - stream: data
- type: elastic_stream_output
  component: elastic
    index: myesstream
    checkpoint_location: "/tmp/my_es_checkpoint"
  - stream: data
    component: sql


Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched. To add a document type, simply append /<type> to your index name.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
output_columns List of Json false NONE A list of Json Objects where each of them can contain 3 fields: field: String [Required] the field found in your document that you want to include in the resulting dataset, alias: String [Optional] attribute a new name to the field you want to retrieve and type: String [Optional] cast the column to a type (string, data, timestamp, long, double, float, boolean)
elastic_settings str(K)-str(V) false NONE key-value arguments to control elasticsearch client. See elasticsearch official documentation.
checkpoint_location String true NONE Absolute path where on to store checkpoint location.

Elastic Settings

Elastic settings Type Default value Description
es.nodes.path.prefix String NONE /something/to/append in case your elastic servers are behind a proxy
es.size String 50 size of elastic query or size of each scroll query
es.scroll String false enable scrolling request
es.scroll.keepalive String 10m how long each scroll query should be kept alive, can be: 1m, 1d, 1y etc... String false enable ssl String NONE must be used with String NONE must be used with String NONE must be used with String NONE must be used with String false enable ssl String NONE must be a jks, pkcs12 or p12 store and must contain the private and the public key of the node String NONE do not provide if the keystore is not protected with a password String NONE must be a jks, pkcs12 or p12 store and must contain at least the node certificate and its CA chain, and every other certificate this node should trust String NONE do not provide if the truststore is not protected with a password String true Whether the node client should resolve the nodes hostnames to IP addresses or not
es.max_concurrent_shard_requests String NONE set how max shards elastic_input node can request at a time
es.nodes.resolve.hostname String false resolve a hostname: be sure that /etc/hosts referenced the proper IP address
es.doc_type String NONE add doc_type to requested URI, this is a deprecated feature by Elastic
es.http.timeout String 1m to override the the HTTP connection timeout
socket_timeout_ms String 2m timeout before reception data from ES ; increase this if request is using many filtering or many indices