Skip to content

Sql

Before you start...

Before using...

With the sql node you can either execute a single query statement against your incoming dataset(s) or multiple queries statements against multiple(s) dataset(s).

tips

Your dataset name is a concatenation of your subscribed node. For instance, subscribing from a node that publishes a stream: data and whose component name is input, it's resulting dataset name will be input_data.

Empty dataset(s)

If one of the pushblishers that the sql node is subscribed to returns an empty dataset, sql node will by default will skip all computation and returns immediately an empty dataset to all of it's subscribers...

statement vs statement_list

At least one of the configuration should be set. You cannot set both at the same time.

Pyspark ->

Spark ->

Examples

Use-cases

Our "hello world" punchline configuration.

beginner_use_case.punchline

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
            {
                type: elastic_batch_input
                component: input
                settings: {
                    index: platform-metricbeat-*
                    es_cluster: es_search
                    nodes: [
                        localhost
                    ]
                    output_columns: [
                        {
                            type: string
                            field: metricset.name
                            alias: metricset.name
                        }
                    ]
                    query: {
                        size: 0
                        query: {
                            bool: {
                                must: [
                                    {
                                        exists: {
                                            field: metricset.name
                                        }
                                    }
                                    {
                                        range: {
                                            @timestamp : {
                                                gte : now-10m
                                                lt :  now
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    }
                }
                publish: [
                    {
                        stream: data
                    }
                ]
            }
            {
                type: sql
                component: sql
                settings: {
                    statement: SELECT COUNT(`metricset.name`) AS TOP_5_mname, `metricset.name` AS NAME FROM input_data GROUP BY `metricset.name` ORDER BY TOP_5_mname DESC LIMIT 5
                }
                subscribe: [
                    {
                        component: input
                        stream: data
                    }
                ]
                publish: [
                    { 
                        stream: data
                    }
                ]
            }
            {
                type: show
                component: show
                settings: {
                    truncate: false
                    num_rows: 10
                }
                subscribe: [
                    {
                        component: sql
                        stream: data
                    }
                ]
            }
        ]
}

run beginner_use_case.punchline by using the command below:

CONF=beginner_use_case.punchline
punchlinectl start -p $CONF
What did we achieved ?

performing a single query statement on one dataset to get the top 5 on a field that can contains different names...

Executing multiple queries statements at a time

Tips

Notice how we chain the SQL statement queries into smaller ones... This simplifies by a big margin readability !

intermediate_use_case.punchline

{
    type: punchline
    version: "6.0"
    runtime: spark
    tenant: default
    dag: [
            {
                type: elastic_batch_input
                component: input
                settings: {
                    index: platform-metricbeat-*
                    es_cluster: es_search
                    nodes: [
                        localhost
                    ]
                    output_columns: [
                        {
                            type: string
                            field: metricset.name
                            alias: metricset.name
                        }
                    ]
                    query: {
                        size: 0
                        query: {
                            bool: {
                                must: [
                                    {
                                        exists: {
                                            field: metricset.name
                                        }
                                    }
                                    {
                                        range: {
                                            @timestamp : {
                                                gte : now-10m
                                                lt :  now
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    }
                }
                publish: [
                    {
                        stream: data
                    }
                ]
            }
            {
                type: sql
                component: sql
                settings: {
                    statement_list: [
                    {
                        output_table_name: dataset1
                        statement: SELECT COUNT(`metricset.name`) AS TOP_5_name, `metricset.name` AS NAME FROM input_data GROUP BY `metricset.name` ORDER BY TOP_5_name DESC LIMIT 5
                    }
                    {
                        output_table_name: dataset2
                        statement: SELECT TOP_5_name FROM dataset1 LIMIT 1
                    }
                    ]
                }
                subscribe: [
                    {
                        component: input
                        stream: data
                    }
                ]
                publish: [
                    { 
                        stream: dataset2
                    }
                ]
            }
            {
                type: show
                component: show
                settings: {
                    truncate: false
                    num_rows: 10
                }
                subscribe: [
                    {
                        component: sql
                        stream: dataset2
                    }
                ]
            }
        ]
}

let's excute it with the command below:

CONF=intermediate_use_case.punchline
punchlinectl start -p $CONF

Using custom UDF

Below is an example of a machine learning pipeline by using spark mllib. We first start by loading our generated model in memory and proceeds afterwards to do a bunch of data processing operation on the prediction made by the model...

Tips

Notice how we register a custom made udf and how we use it afterwards inside our query statement...

Tips

You can use our already prepared package to start making your own UDF and use it inside the PunchPlatform: https://github.com/punchplatform/samples

{
    tenant: default
    channel: default
    version: "6.0"
    runtime: spark
    dag: [
        {
            type: dataset_generator
            component: dummy
            settings: {
                input_data: [
                    { 
                        field1: "[1, 2, 3, 4]"
                    }
                ]
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: processor
            settings: {
                register_udf: [
                    {
                        class_name: org.thales.punch.udf.starter.kit.StrToArrayString
                        function_name: punch_convertor
                        schema_ddl: ARRAY<STRING>
                    }
                ]
                statement_list: [
                    {
                        output_table_name: processed
                        statement: SELECT punch_convertor(field1) from dummy_data
                    }
                ]
            }
            subscribe: [
                {
                    stream: data
                    component: dummy
                }
            ]
            publish: [
                {
                    stream: processed
                }
            ]
        }
        {
            type: show
            component: stdout
            settings: {
                truncate: false
            }
            subscribe: [
                {
                    component: processor
                    stream: processed
                }
            ]
        }
    ]
}

Parameters

Common Settings

Name Type mandatory Default value Description
statement String false NONE Valid Spark Sql query string. Table name are set as follow: component_stream where component is the name of the subscribed component alongside it's stream name.
statement_list List of Json false NONE A list of valid JSONs documents containing the two fields. See advance settings
register_udf List of Json false NONE A list of valid JSONs documents containing the following three fields. See advance settings.

Advanced Settings

Statement List

Statement List Type Default value Description
output_table_name String NONE the name to be set of your query result
statement String NONE valid spark sql query statement

Register UDF

Register UDF Type Default value Description
function_name String NONE the name to be set of your query result.
class_name String NONE valid spark sql query statement.
schema_ddl String NONE valid spark schema in DDL format.