Skip to content

Python Elastic Input

Before you start...

Before using...

This node is intended to be used when spark features are not required. For instance, in a pipeline, where you are not manipulating Spark's dataframe at all...

The resulting output of this node is a list of python dictionaries. Each dictionary is the result queried against an elasticsearch cluster.

In contrast to classic input node type, this one can be subscribed to a node outputting a list of string. Each element of the list will be used to query elasticsearch.

In case a query is defined by this node and at the same time it is subscribed to another one publishing a list of queries: the list of queries alongside the query set up on this node will be used on your elasticsearch cluster.

PySpark ->

Spark ->

Examples

Use-cases

Our "hello world" punchline configuration.

beginner_use_case.punchline

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
        {
            type: python_elastic_input
            component: python_elastic_input
            settings: {
                nodes: [
                    localhost
                ]
                index: platform-metricbeat-*
                query: {
                    query: {
                        match_all: {}
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: python_elastic_output
            component: python_elastic_output
            settings: {
                nodes: [
                    localhost
                ]
                index: singlequery
            }
            subscribe: [
                {
                    stream: data
                    component: python_elastic_input
                }
            ]
        }
    ]
}

When this configuration is executed, only parameters sets in settings key will be taken into account.

Check it out with the instructions below:

CONF=beginner_use_case.punchline
punchlinectl start -p $CONF

Multiple Queries requests

Below is an example where our Python Elastic Input Node is subscribed to a node publishing a list of string and each element of the list is a query to be executed against an elasticsearch cluster. The result is then saved in a new index.

Note

Each line of your file should be a valid elasticsearch query

intermediate_use_case.punchline

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
        {
            type: python_file_input
            component: queries
            publish: [
                {
                    stream: data
                }
            ]
            settings: {
                file_path: /full/path/to/file/query
            }
        }
        {
            type: python_elastic_input
            component: python_elastic_input
            settings: {
                index: mydata
                nodes: [
                    localhost
                ]
            }
            subscribe: [
                {
                    stream: data
                    component: queries
                }
            ]
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: python_elastic_output
            component: python_elastic_output
            settings: {
                nodes: [
                    localhost
                ]
                index: multiquerytest
            }
            subscribe: [
                {
                    stream: data
                    component: python_elastic_input
                }
            ]
        }
    ]
}

let's excute it with the command below:

CONF=intermediate_use_case.punchline
punchlinectl start -p $CONF

Select custom fields

This node can also be configured in order to retrieve only specific fields from Elasticsearch response. Additionnal fields like for example a timestamp or the number of document in requested index can also be retrieved. This feature is usefull if you want to bench Elasticsearch.

advanced_use_case.punchline

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
        {
            type: python_elastic_input
            component: python_elastic_input
            settings: {
                nodes: [
                    localhost
                ]
                output_fields : [
                    took
                    hits.total.value
                ]
                timestamp_field : true
                count_field :true
                publish_query : true
                node_id : mynode
                index: platform-metricbeat-*
                query: {
                    query: {
                        match_all: {}
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: python_show
            component: python_show
            subscribe: [
                {
                    stream: data
                    component: python_elastic_input
                }
            ]
        }
    ]
}

Parameters

Common Settings

Name Type mandatory Default value Description
index String true NONE The name of your elasticsearch index where data will be fetched.
port Integer false 9200 Your Elasticsearch server Port.
query String - Json false match all A valid Elasticsearch query.
nodes List of String true NONE Hostnames of your elasticsearch nodes. In general, only one hostname is needed.
type String false NONE document type that will be retrieved from elasticsearch index
timestamp_field Boolean false false adds a timestamp field to your json document.
output_fields List of String false NONE List of fields retrieved from Elasticsearch response. ie hits.hits, etc...
count_field Boolean false false If true, add a count field in response which count total number of document in index.
node_id String false NONE If set, add a node_id field in response which set a id to current node. Must be unique in order to differentiate requests in visualization.

Advanced Settings

No advance settings for this node