Skip to content

SQL

Overview

With the sql node you can either execute a single query statement against your incoming dataset(s) or multiple queries statements against multiple(s) dataset(s).

Tips

Your dataset name is a concatenation of your subscribed node. For instance, subscribing from a node that publishes a stream: data and whose component name is input, it's resulting dataset name will be input_data.

Empty dataset(s)

If one of the pushblishers that the sql node is subscribed to returns an empty dataset, sql node will by default will skip all computation and returns immediately an empty dataset to all of it's subscribers...

statement vs statement_list

At least one of the configuration should be set. You cannot set both at the same time.

Runtime Compatibility

  • PySpark :
  • Spark :

Example

Hello World

{
    type: punchline
    version: "6.0"
    runtime: pyspark
    tenant: default
    dag: [
            {
                type: elastic_batch_input
                component: input
                settings: {
                    index: platform-metricbeat-*
                    es_cluster: es_search
                    nodes: [
                        localhost
                    ]
                    output_columns: [
                        {
                            type: string
                            field: metricset.name
                            alias: metricset.name
                        }
                    ]
                    query: {
                        size: 0
                        query: {
                            bool: {
                                must: [
                                    {
                                        exists: {
                                            ftame
                                        }
                                    }
                                    {
                                        range: {
                                            @p : {
                                            -10m
                                           now
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    }
                }
                publish: [
                    {
                        stream: data
                    }
                ]
            }
            {
                type: sql
                component: sql
                settings: {
                    statement:COUNT(`metricset.naTOP_5_mname, `metricset.nNAME FROM input_data G`metricset.name` OTOP_5_mname DESC LIMIT 5
                }
                subscribe: [
                    {
                        component: input
                        stream: data
                    }
                ]
                publish: [
                    { 
                        stream: data
                    }
                ]
            }
            {
                type: show
                component: show
                settings: {
                    truncate: false
                    num_rows: 10
                }
                subscribe: [
                    {
                        component: sql
                        stream: data
                    }
                ]
            }
        ]
}

What did we achieved ?

performing a single query statement on one dataset to get the top 5 on a field that can contains different names...

## Multiple Queries

Notice how we chain the SQL statement queries into smaller ones. This helps producing compact and readable punchlines.

{
    type: punchline
    version: "6.0"
    runtime: spark
    tenant: default
    dag: [
            {
                type: elastic_batch_input
                component: input
                settings: {
                    index: platform-metricbeat-*
                    es_cluster: es_search
                    nodes: [
                        localhost
                    ]
                    output_columns: [
                        {
                            type: string
                            field: metricset.name
                            alias: metricset.name
                        }
                    ]
                    query: {
                        size: 0
                        query: {
                            bool: {
                                must: [
                                    {
                                        exists: {
                                            ftame
                                        }
                                    }
                                    {
                                        range: {
                                            @p : {
                                            -10m
                                           now
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    }
                }
                publish: [
                    {
                        stream: data
                    }
                ]
            }
            {
                type: sql
                component: sql
                settings: {
                    statement_list: [
                    {
                        output_table_name: dataset1
                        statement:COUNT(`metricset.naTOP_5_name, `metricseAS NAME FROM inpGROUP BY `metricseORDER BY TOP_5_naLIMIT 5
                    }
                    {
                        output_table_name: dataset2
                        statement: SELECT TOPFROM dataset1 LIMIT 1
                    }
                    ]
                }
                subscribe: [
                    {
                        component: input
                        stream: data
                    }
                ]
                publish: [
                    { 
                        stream: dataset2
                    }
                ]
            }
            {
                type: show
                component: show
                settings: {
                    truncate: false
                    num_rows: 10
                }
                subscribe: [
                    {
                        component: sql
                        stream: dataset2
                    }
                ]
            }
        ]
}

Custom UDF

Below is an example of a machine learning pipeline by using spark mllib. We first start by loading our generated model in memory and proceeds afterwards to do a bunch of data processing operation on the prediction made by the model.

{
    tenant: default
    channel: default
    version: "6.0"
    runtime: spark
    dag: [
        {
            type: dataset_generator
            component: dummy
            settings: {
                input_data: [
                    { 
                        field1: "[1, 2, 3, 4]"
                    }
                ]
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: processor
            settings: {
                register_udf: [
                    {
                        class_name: org.thaleudf.starter.kit.StrToing
                        functiopunch_convertor
                        schema_ddl: ARRAY<STRING>
                    }
                ]
                statement_list: [
                    {
                        output_table_name: processed
                        statement:punch_convertor(fielddummy_data
                    }
                ]
            }
            subscribe: [
                {
                    stream: data
                    component: dummy
                }
            ]
            publish: [
                {
                    stream: processed
                }
            ]
        }
        {
            type: show
            component: stdout
            settings: {
                truncate: false
            }
            subscribe: [
                {
                    component: processor
                    stream: processed
                }
            ]
        }
    ]
}

Tips

Notice how we register a custom made udf and how we use it afterwards inside our query statement...

You can use our already prepared package to start making your own UDF and use it inside the PunchPlatform: https://github.com/punchplatform/samples

Parameters

Common Settings

Name Type mandatory Default value Description
statement String false NONE Valid Spark Sql query string. Table name are set as follow: component_stream where component is the name of the subscribed component alongside it's stream name.
statement_list List of Json false NONE A list of valid JSONs documents containing the two fields. See advance settings
register_udf List of Json false NONE A list of valid JSONs documents containing the following three fields. See advance settings.

Advanced Settings

Statement List

Statement List Type Default value Description
output_table_name String NONE the name to be set of your query result
statement String NONE valid spark sql query statement

Register UDF

Register UDF Type Default value Description
function_name String NONE the name to be set of your query result.
class_name String NONE valid spark sql query statement.
schema_ddl String NONE valid spark schema in DDL format.