Skip to content

Aggregations

Abstract

Performing continuous aggregations is a common and key function to compute consolidated indicators and metrics. It makes it possible to reduce the data cardinality and to continuously feed elasticsearch indexes with ready-to-be-visualised data that would otherwise require complex and resource intensive queries.

Overview

The pml and plan punch features are a great fit to design and deploy such aggregation jobs on top of your elasticsearch (or any other data store) data. Here is what a complete production system looks like.

image

You typically have the stream pipelines in charge of ingesting your data and (in case you need it) preforming real time alerting. On the other hand you also have batch pipeline here illustrated in yellow and red to perform the many useful batch processing from computing ML models to this chapter's topic of aggregating data.

Here is a more focused illustration of the aggregation pipeline:

image

There are two possible strategies to aggregate elasticsearch data:

  • By periodically fetching a complete (filtered or not) range of primary data. We will refer to this strategy as a batch strategy. You basically fetch a large number of elasticsearch documents, then aggregate them into your pipeline and write back the result to elasticsearch.
  • By periodically performing aggregation elasticsearch requests instead of plain requests. The results of each aggregation request is actually a single document containing possibly many aggregation buckets. These buckets are in turn processed and consolidated into the aggregated index.

Let us see examples of both variants in action.

Tip

These example work on dat produced by the piunch standalone example pipelines. Do not hesitate executing them on your own.

Batch Strategy

Here is a complete example. The steps are:

  1. an pml elastic_batch_input node reads a timeslice of data from the mytenant-events* index.
  2. an aggregation is performed using the spark sql node.
  3. the resulting data is written back to elasticsearch, into the apache_aggregation_metrics index.

Note

this aggregation, and in fact most aggregatiuon, do not require any coding. It only relies on the spark sql power.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
{
  job: [
        {
            type: elastic_batch_input
            component: input
            settings: {
                index: mytenant-events*
                cluster_name: es_search
                nodes: [
                    localhost
                ]
                output_columns: [
                    {
                        type: string
                        field: target.uri.urn
                        alias: target_uri_urn
                    }
                    {
                        type: string
                        field: vendor
                    }
                    {
                        type: string
                        field: web.header.referer
                        alias: web_header_referer
                    }
                ]
                query: {
                    size: 0
                    query: {
                        bool: {
                            must: [
                                {
                                    range: {
                                        @timestamp : {
                                            gte : now-1180m
                                            lt :  now
                                        }
                                    }
                            }
                            ]
                        }
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: (SELECT COUNT(*) AS TOP_5_uri, a.target_uri_urn, current_timestamp() AS timestamp, a.vendor, a.web_header_referer FROM input_data AS a GROUP BY a.target_uri_urn, a.vendor, a.web_header_referer ORDER BY TOP_5_uri DESC LIMIT 5)
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: elastic_batch_output
            component: output
            settings: {
                index: apache_aggregation_metrics
                cluster_name: es_search
                nodes: [ "localhost" ]
            }
            subscribe: [
                {
                    component: sql 
                    stream: data 
                }
            ]
        }
    ]
}

Elasticsearch Input Node

Here is now how you can leverage aggregation elasticsearch queries in the first place.

  1. an pml elastic_input node performs an aggregation request.
  2. the resulting document is actually an array, it is processedusing a punch snippet of code.
  3. the aggregation is performed using the spark sql node.
  4. the resulting data is written back to elasticsearch, into the apache_aggregation_metrics index.

This strategy is the one often used as it benefits from elasticsearch power to perform the frist level of aggregation using all its node, caches and indexing. Hence, only a limited dataset is returned to the spark pml job (in fact only buckets).

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
{
  job: [
        {
            type: elastic_input
            component: input
            settings: {
                index: mytenant-events*
                nodes: [
                    localhost
                ]
                query: {
                    size: 0
                    query: {
                        bool: {
                            must: [
                                {
                                    range: {
                                        @timestamp : {
                                            gte : now-10000m
                                            lt :  now
                                        }
                                    }
                                }
                            ]
                        }
                    }
                    aggregations: {
                        by_channel: {
                            terms: {
                                field: vendor
                            }
                            aggregations: {
                                max_size: {
                                    max: {
                                        field: size
                                    }
                                }
                                total_size: {
                                    sum: {
                                        field: size
                                    }
                                }
                            }
                        }
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: punch
            component: punch
            settings: {
                punchlet_code: 
                    '''
                    {
                        Tuple buckets;
                        convert(root:[source]).into(buckets);
                        buckets:/ = buckets:[aggregations][by_channel][buckets];
                        for (Tuple obj: buckets.asArray()) {
                            // this step is necessary as column names containing "." character is not well supported (Bad practice) in SQL query
                            obj = toFlatTuple().nestedSeparator("_").on(obj);
                            [key] = obj.getByKey("key");
                            [doc_count] = obj.getByKey("doc_count");
                            [max_size] = obj.getByKey("max_size_value");
                            [total_size] = obj.getByKey("total_size_value");
                        }
                    }
                    '''
                output_columns: [
                    {
                        type: string
                        field: key
                    }
                    {
                        type: string
                        field: doc_count
                    }
                    {
                        type: string
                        field: max_size
                    }
                    {
                        type: string
                        field: total_size
                    }
                ]
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish:[
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: SELECT key, doc_count, max_size, total_size, current_timestamp() AS timestamp FROM punch_data
            }
            subscribe: [
                {
                    component: punch
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: elastic_batch_output
            component: output
            settings: {
                index: apache_aggregation_metrics
                cluster_name: es_search
                nodes: [ "localhost" ]
            }
            subscribe: [
                {
                    component: sql 
                    stream: data 
                }
            ]
        }
    ]
}

An aggregation made without the need of a Punch Node...

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
{
  job: [
    {
      component: input
      publish: [
        {
          stream: data
        }
      ]
      settings: {
        aggregation: true
        index: mytenant-events*
        nodes: [
            localhost
        ]
        query: {
          aggregations: {
            by_channel: {
              aggregations: {
                max_size: {
                  max: {
                    field: size
                  }
                }
                total_size: {
                  sum: {
                    field: size
                  }
                }
              }
              terms: {
                field: vendor
              }
            }
          }
          query: {
            bool: {
              must: [
                {
                  range: {
                    @timestamp: {
                      gte: now-1h
                      lt: now
                    }
                  }
                }
              ]
            }
          }
          size: 0
        }
      }
      type: elastic_input
    }
    {
        type: sql
        component: sql
        settings: {
            statement: SELECT aggregation_result.doc_count, aggregation_result.key, aggregation_result.max_size.value AS max_size, aggregation_result.total_size.value AS total_size, doc_count_error_upper_bound, sum_other_doc_count FROM (SELECT explode(buckets) AS aggregation_result, doc_count_error_upper_bound, sum_other_doc_count FROM input_data)
        }
        subscribe: [
            {
                component: input
                stream: data
            }
        ]
        publish: [
            { 
                stream: data
            }
        ]
    }
    {
      component: show
      subscribe: [
        {
          component: sql
          stream: data
        }
      ]
      type: show
    }
  ]
}

Warning

By default, Elasticsearch supports field names containing "dot" or "." character. Which is not the case in the SQL world. It is even consider as a bad practice. It is not recommended to use column name containing "." characters for executing Spark Sql queries.