SQL¶
Overview¶
With the sql node you can either execute a single query statement against your incoming dataset(s) or multiple queries statements against multiple(s) dataset(s).
Tips
Your dataset name is a concatenation of your subscribed node. For instance, subscribing from a node that publishes a stream: data
and whose component name is input
, it's resulting dataset name will be input_data
.
Empty dataset(s)
If one of the pushblishers that the sql
node is subscribed to returns an empty dataset, sql node will by default will skip all computation and returns immediately an empty dataset to all of it's subscribers...
statement vs statement_list
At least one of the configuration should be set. You cannot set both at the same time.
Runtime Compatibility¶
- PySpark : ✅
- Spark : ✅
Example¶
Hello World¶
{
type: punchline
version: "6.0"
runtime: pyspark
tenant: default
dag: [
{
type: elastic_input
component: input
settings: {
index: platform-metricbeat-*
nodes: [
localhost
]
output_columns: [
{
type: string
field: metricset.name
alias: metricset.name
}
]
query: {
size: 0
query: {
bool: {
must: [
{
exists: {
ftame
}
}
{
range: {
@p : {
-10m
now
}
}
}
]
}
}
}
}
publish: [
{
stream: data
}
]
}
{
type: sql
component: sql
settings: {
statement:COUNT(`metricset.naTOP_5_mname, `metricset.nNAME FROM input_data G`metricset.name` OTOP_5_mname DESC LIMIT 5
}
subscribe: [
{
component: input
stream: data
}
]
publish: [
{
stream: data
}
]
}
{
type: show
component: show
settings: {
truncate: false
num_rows: 10
}
subscribe: [
{
component: sql
stream: data
}
]
}
]
}
What did we achieved ?
performing a single query statement on one dataset to get the top 5 on a field that can contains different names...
## Multiple Queries
Notice how we chain the SQL statement queries into smaller ones. This helps producing compact and readable punchlines.
{
type: punchline
version: "6.0"
runtime: spark
tenant: default
dag: [
{
type: elastic_input
component: input
settings: {
index: platform-metricbeat-*
nodes: [
localhost
]
output_columns: [
{
type: string
field: metricset.name
alias: metricset.name
}
]
query: {
size: 0
query: {
bool: {
must: [
{
exists: {
ftame
}
}
{
range: {
@p : {
-10m
now
}
}
}
]
}
}
}
}
publish: [
{
stream: data
}
]
}
{
type: sql
component: sql
settings: {
statement_list: [
{
output_table_name: dataset1
statement:COUNT(`metricset.naTOP_5_name, `metricseAS NAME FROM inpGROUP BY `metricseORDER BY TOP_5_naLIMIT 5
}
{
output_table_name: dataset2
statement: SELECT TOPFROM dataset1 LIMIT 1
}
]
}
subscribe: [
{
component: input
stream: data
}
]
publish: [
{
stream: dataset2
}
]
}
{
type: show
component: show
settings: {
truncate: false
num_rows: 10
}
subscribe: [
{
component: sql
stream: dataset2
}
]
}
]
}
Custom UDF¶
Below is an example of a machine learning pipeline by using spark mllib. We first start by loading our generated model in memory and proceeds afterwards to do a bunch of data processing operation on the prediction made by the model.
{
tenant: default
channel: default
version: "6.0"
runtime: spark
dag: [
{
type: dataset_generator
component: dummy
settings: {
input_data: [
{
field1: "[1, 2, 3, 4]"
}
]
}
publish: [
{
stream: data
}
]
}
{
type: sql
component: processor
settings: {
register_udf: [
{
class_name: org.thaleudf.starter.kit.StrToing
functiopunch_convertor
schema_ddl: ARRAY<STRING>
}
]
statement_list: [
{
output_table_name: processed
statement:punch_convertor(fielddummy_data
}
]
}
subscribe: [
{
stream: data
component: dummy
}
]
publish: [
{
stream: processed
}
]
}
{
type: show
component: stdout
settings: {
truncate: false
}
subscribe: [
{
component: processor
stream: processed
}
]
}
]
}
Tips
Notice how we register a custom made udf and how we use it afterwards inside our query statement...
You can use our already prepared package to start making your own UDF and use it inside the PunchPlatform: https://github.com/punchplatform/samples
Parameters¶
Common Settings¶
Name | Type | mandatory | Default value | Description |
---|---|---|---|---|
statement | String | false | NONE | Valid Spark Sql query string. Table name are set as follow: component_stream where component is the name of the subscribed component alongside it's stream name. |
statement_list | List of Json | false | NONE | A list of valid JSONs documents containing the two fields. See advance settings |
register_udf | List of Json | false | NONE | A list of valid JSONs documents containing the following three fields. See advance settings. |
Advanced Settings¶
Statement List¶
Statement List | Type | Default value | Description |
---|---|---|---|
output_table_name | String | NONE | the name to be set of your query result |
statement | String | NONE | valid spark sql query statement |
Register UDF¶
Register UDF | Type | Default value | Description |
---|---|---|---|
function_name | String | NONE | the name to be set of your query result. |
class_name | String | NONE | valid spark sql query statement. |
schema_ddl | String | NONE | valid spark schema in DDL format. |