Skip to content

Python Elastic Input

Overview

This node can be used for simple use cases where you do not need to manipulate dataframe APIs.

The resulting output of this node is a list of python dictionaries. Each dictionary is the result queried against an elasticsearch cluster.

In contrast to classic input node type, this one can subscribe to a node publishing a list of strings. Each element of the list will be used to query elasticsearch.

In case a query is defined by this node and at the same time it is subscribed to another one publishing a list of queries: the list of queries alongside the query set up on this node will be used on your elasticsearch cluster.

Runtime Compatibility

  • PySpark :
  • Spark :

Examples

Basic

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
        {
            type: python_elastic_input
            component: python_elastic_input
            settings: {
                nodes: [
                    localhost
                ]
                index: platform-metricbeat-*
                query: {
                    query: {
                        match_all: {}
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: python_elastic_output
            component: python_elastic_output
            settings: {
                nodes: [
                    localhost
                ]
                index: singlequery
            }
            subscribe: [
                {
                    stream: data
                    component: python_elastic_input
                }
            ]
        }
    ]
}

Multiple Queries

Below is an example where our Python Elastic Input Node subscribes to a node publishing a list of strings, each element of the list is a query to be executed against an elasticsearch cluster. The result is then saved in a new index.

Note

Each line of your file should be a valid elasticsearch query

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
        {
            type: python_file_input
            component: queries
            publish: [
                {
                    stream: data
                }
            ]
            settings: {
                file_path: /full/path/to/file/query
            }
        }
        {
            type: python_elastic_input
            component: python_elastic_input
            settings: {
                index: mydata
                nodes: [
                    localhost
                ]
            }
            subscribe: [
                {
                    stream: data
                    component: queries
                }
            ]
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: python_elastic_output
            component: python_elastic_output
            settings: {
                nodes: [
                    localhost
                ]
                index: multiquerytest
            }
            subscribe: [
                {
                    stream: data
                    component: python_elastic_input
                }
            ]
        }
    ]
}

Custom Fields Selection

This node can also be configured in order to retrieve only specific fields from Elasticsearch response. Additionnal fields like for example a timestamp or the number of document in requested index can also be retrieved. This feature is usefull if you want to bench Elasticsearch.

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
        {
            type: python_elastic_input
            component: python_elastic_input
            settings: {
                nodes: [
                    localhost
                ]
                output_fields : [
                    took
                    hits.total.value
                ]
                timestamp_field : true
                count_field :true
                publish_query : true
                node_id : mynode
                index: platform-metricbeat-*
                query: {
                    query: {
                        match_all: {}
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: python_show
            component: python_show
            subscribe: [
                {
                    stream: data
                    component: python_elastic_input
                }
            ]
        }
    ]
}

Parameters

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
type String false NONE document type that will be retrieved from elasticsearch index
timestamp_field Boolean false false adds a timestamp field to your json document.
output_fields List of String false NONE List of fields retrieved from Elasticsearch response. ie hits.hits, etc...
count_field Boolean false false If true, add a count field in response which count total number of document in index.
node_id String false NONE If set, add a node_id field in response which set a id to current node. Must be unique in order to differentiate requests in visualization.