Skip to content

Sql

The sql node takes as input a list of spark (subscribed) datasets. It then executes one or several queries to produce, in turn, new datasets.

The execution steps are as follows:

  1. the subscribed Datasets are added as available table(s)
  2. each Statement is executed in the declaration order
  3. new generated Dataset(s) are also added as available table(s)
  4. the selected resulting dataset(s) are published

The sql node only works with incoming datasets.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
{
    type: sql
    component: sql
    settings: {
        statements: [
            output = SELECT * FROM node1_data FULL OUTER JOIN node2_data ON node1_data.id_1 = node2_data.id_2
        ]
    }
    subscribe: [
      {
        component: node1
        stream: data
      }
      {
        camponent: node2
        stream: data
      }
    ]
    publish: [
        // the name must correspond to a created table.
        { 
          stream: output
        }
    ]
}

Below is a full working example of using the Sql Node with an incoming dataset as input (fetched from an elasticsearch cluster).

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
{
  runtime_id: my-job-id
  tenant: job_tenant
  job:
  [
    {
      type: elastic_batch_input
      component: input
      settings:
      {
        index: metricbeat-*
        cluster_name: es_search
        nodes:
        [
          localhost
        ]
        elastic_settings:
        {
          es.index.read.missing.as.empty: yes
        }
        column_id: id
        column_source: source
        output_fields:
        [
          {
            type: string
            field: platform.id
          }
          {
            type: string
            field: @timestamp
          }
        ]
      }
      publish:
      [
        {
          stream: data
        }
      ]
    }
    {
       type: sql
       component: sql
       settings:
       {
         statements:
         [
           data = SELECT extracted.time as eT, platform.id as ePID FROM input_data
         ]
       }
       subscribe:
       [
         {
           component: input
           stream: data
         }
       ]
       publish:
       [
         {
           stream: data
         }
       ]
    }
    {
       type: file_output
       component: output
       settings:
       {
         folder_path: ./extract
         number_of_repartition: 1
         with_header: true
         save_mode: overwrite
         file_output_format: csv
       }
       subscribe:
       [
         {
           component: sql
           stream: data
         }
       ]
     }
  ]
  metrics:
  {
    reporters:
    [
      {
        type: elasticsearch
        hosts:
        [
          {
            host: localhost
            port: 9200
          }
        ]
      }
      {
        type: console
      }
    ]
  }
}