Skip to content

Sql Node

Overview

Compatible Spark/Pyspark

With the sql node you can either execute a single query statement against your incomings dataset(s) or multiple queries statements against multiple(s) dataset(s).

Tips

Your dataset name is a concatenation of your subscribed node. For instance, subscribing from a node that publishes a stream: data and whose component name is input, it's resulting dataset name will be input_data

Example(s)

Example for performing a single query statement on one dateset

Here is a simple example to perform a top 5 on a field that can contains different names...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
{
  job: [
        {
            type: elastic_batch_input
            component: input
            settings: {
                index: metricbeat-*
                cluster_name: es_search
                nodes: [
                    localhost
                ]
                output_columns: [
                    {
                        type: string
                        field: metricset.name
                    }
                ]
                query: {
                    size: 0
                    query: {
                        bool: {
                            must: [
                                {
                                    exists: {
                                        field: metricset.name
                                    }
                                }
                                {
                                    range: {
                                        @timestamp : {
                                            gte : now-10m
                                            lt :  now
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: SELECT COUNT(`metricset.name`) AS TOP_5_mname, `metricset.name` AS NAME FROM input_data GROUP BY `metricset.name` ORDER BY TOP_5_mname DESC LIMIT 5
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: show
            component: show
            settings: {
                truncate: false
                num_rows: 10
            }
            subscribe: [
                {
                component: sql
                stream: data
                }
            ]
        }
    ]
}

Example of Executing multiple queries statements at a time

Tips

Notice how we chain the SQL statement queries into smaller ones... This simplifies by a big margin readability !

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
{
  job: [
        {
            type: elastic_batch_input
            component: input
            settings: {
                index: metricbeat-*
                cluster_name: es_search
                nodes: [
                    localhost
                ]
                output_columns: [
                    {
                        type: string
                        field: metricset.name
                    }
                ]
                query: {
                    size: 0
                    query: {
                        bool: {
                            must: [
                                {
                                    exists: {
                                        field: metricset.name
                                    }
                                }
                                {
                                    range: {
                                        @timestamp : {
                                            gte : now-10m
                                            lt :  now
                                        }
                                    }
                                }
                            ]
                        }
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement_list: [
                  {
                    output_table_name: dataset1
                    statement: SELECT COUNT(`metricset.name`) AS TOP_5_mname, `metricset.name` AS NAME FROM input_data GROUP BY `metricset.name` ORDER BY TOP_5_mname DESC LIMIT 5
                  }
                  {
                    output_table_name: dataset2
                    statement: SELECT COUNT(`metricset.name`) AS TOP_5_mname, `metricset.name` AS NAME FROM dataset1 LIMIT 1
                  }
                ]
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: dataset2
                }
            ]
        }
        {
            type: show
            component: show
            settings: {
                truncate: false
                num_rows: 10
            }
            subscribe: [
                {
                component: sql
                stream: dataset2
                }
            ]
        }
    ]
}

Example of using custom UDF

Below is an example of a machine learning pipeline by using spark mllib. We first start by loading our generated model in memory and proceeds afterwards to do a bunch of data processing operation on the prediction made by the model...

Tips

Notice how we register a custom made udf and how we use it afterwards inside our query statement...

Tips

You can use our already prepared package to start making your own UDF and use it inside the PunchPlatform: https://github.com/punchplatform/samples

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
{
  job:
  [
      {
          type: elastic_batch_input
          component: input
          settings: 
          {
              cluster_name: es_search
              nodes: [
                  localhost
              ]
              index: mytenant-events-*
              output_columns : [
                {
                  field: init.host.ip
                  alias : init_host_ip
                  type : string
                }
                {
                  field: session.out.byte
                  alias : session_out_byte
                  type : integer
                }
              ]
          }
          publish: [
              {
                  stream: data
              }
          ]
      }
      {
        type: file_model_input
          component: model_loader
          settings: 
          {
               file_path: model.bin
          }
          publish: [ 
            { 
              stream: model
            } 
          ]
          subscribe: []
      }
      {
        type: mllib_model
          component: executor
          settings: {}
          publish: [
            { 
              stream: data 
            }
          ]
          subscribe: [
          {
            component: input
            stream: data
            alias: input_data
          }
          {
            component: model_loader
            stream: model
            alias: model
          }
          ]
      }
      {
          type: sql
          component: sql
          settings: {
              register_udf: [
                {
                  function_name: myOwnFunc
                  class_name: org.thales.punch.pl.plugins.sql.udf.KmeansSqdist
                  schema_ddl: Double
                }
              ]
              statement_list: [
                {
                  output_table_name: extract_analytics_json
                  statement: SELECT json_tuple(output,'analytics') AS output_analytics, init_host_ip, session_out_byte, init_host_ip_tokenized, init_host_ip_hashed, full_vector, features AS source_features, prediction AS source_prediction FROM executor_data
                }
                {
                  output_table_name: features_prediction_as_string
                  statement: SELECT json_tuple(output_analytics, 'features', 'prediction'), init_host_ip, init_host_ip_tokenized, session_out_byte, init_host_ip_hashed, source_features, source_prediction FROM extract_analytics_json
                }
                {
                  output_table_name: cast_columns_to_proper_types
                  statement: SELECT punch_str_to_array_double(c0) AS output_features, CAST(c1 AS INTEGER) AS output_prediction, init_host_ip, init_host_ip_tokenized, session_out_byte, init_host_ip_hashed, source_features, source_prediction FROM features_prediction_as_string
                }
                {
                  output_table_name: calculate_kmeans_sqdist
                  statement: SELECT myOwnFunc(output_features, output_prediction, 'model.bin', 4) AS kmeans_sqdist, init_host_ip, session_out_byte FROM cast_columns_to_proper_types                  
                }
              ]
          }
          subscribe: [
              {
                  component: executor
                  stream: data
              }
          ]
          publish: [
            {
                stream: extract_analytics_json
            }
            {
                stream: features_prediction_as_string
            }
            {
                stream: cast_columns_to_proper_types
            }
            { 
                stream: calculate_kmeans_sqdist
            }
          ]
      }
      {
          type: show
          component: show
          settings: {
              show_schema: true
              truncate: false
          }
          subscribe: [
            {
                component: sql
                stream: calculate_kmeans_sqdist
            }
          ]
      }
  ]
  spark_settings:
    {
      spark.files: ./model.bin
      spark.executor.memory: 1g
      spark.executor.cores: "2"
      spark.executor.instances: "2"
    }
}

Warning

At least one of the configuration should be set. But you cannot set both at the sametime.

Configuration(s)

  • statement: String

    Description: [NOT REQUIRED] Valid Spark Sql query string. Table name are set as follow: component_stream where component is the name of the suscribed component alongside it's stream name.

  • statement_list: List(Map(output_table_name: String, statement: String))

    Description: [NOT REQUIRED] A list of valid JSONs documents containing the following two fields:

    • output_table_name: [REQUIRED] the name to be set of your query result
    • statement: [REQUIRED] valid spark sql query statement
  • register_udf: List(Map(function_name: String, class_name: String, schema_ddl: String))

    Description: [NOT REQUIRED] A list of valid JSONs documents containing the following two fields:

    • function_name: [REQUIRED] the name to be set of your query result
    • class_name: [REQUIRED] valid spark sql query statement
    • schema_ddl: [REQUIRED] valid spark schema in DDL format