Skip to content

Aggregations

Abstract

Performing continuous aggregations is a common and key function to compute consolidated indicators and metrics. It makes it possible to reduce the data cardinality and to continuously feed elasticsearch indices with ready-to-be-visualised data that would otherwise require complex and resource intensive queries.

Overview

The Punchline and plan features are a great fit to design and deploy such aggregation jobs on top of your elasticsearch (or any other data store) data. Here is what a complete production system looks like.

image

You typically have the stream pipelines in charge of ingesting your data and (in case you need it) preforming real time alerting. On the other hand you also have batch pipeline here illustrated in yellow and red to perform the many useful batch processing from computing ML models to this chapter's topic of aggregating data.

Here is a more focused illustration of the aggregation pipeline:

image

There are two possible strategies to aggregate elasticsearch data:

  • By periodically fetching a complete (filtered or not) range of primary data. We will refer to this strategy as a batch strategy. You basically fetch a large number of elasticsearch documents, then aggregate them into your pipeline and write back the result to elasticsearch.
  • By periodically performing aggregation elasticsearch requests instead of plain requests. The results of each aggregation request is actually a single document containing possibly many aggregation buckets. These buckets are in turn processed and consolidated into the aggregated index.

Let us see examples of both variants in action.

Tip

These example work on data produced by the punch standalone example pipelines. Do not hesitate executing them on your own.

Batch Strategy

Here is a complete example. The steps are:

  1. an elastic_input node reads a timeslice of data from the mytenant-events* index.
  2. an aggregation is performed using the spark sql node.
  3. the resulting data is written back to elasticsearch, into the apache_aggregation_metrics index.

Note

this aggregation, and in fact most aggregation, do not require any coding. It only relies on the spark sql power.

{
  type: punchline
  version: "6.0"
  runtime: spark
  tenant: default
  dag: [
        {
            type: elastic_input
            component: input
            settings: {
                index: mytenant-events*
                nodes: [
                    localhost
                ]
                output_columns: [
                    {
                        type: string
                        field: target.uri.urn
                        alias: target_uri_urn
                    }
                    {
                        type: string
                        field: vendor
                    }
                    {
                        type: string
                        field: web.header.referer
                        alias: web_header_referer
                    }
                ]
                query: {
                    size: 0
                    query: {
                        bool: {
                            must: [
                                {
                                    range: {
                                        @timestamp : {
                                            gte : now-1180m
                                            lt :  now
                                        }
                                    }
                            }
                            ]
                        }
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: (SELECT COUNT(*) AS TOP_5_uri, a.target_uri_urn, current_timestamp() AS timestamp, a.vendor, a.web_header_referer FROM input_data AS a GROUP BY a.target_uri_urn, a.vendor, a.web_header_referer ORDER BY TOP_5_uri DESC LIMIT 5)
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: elastic_output
            component: output
            settings: {
                index: {
                    type: constant
                    value: apache_aggregation_metrics
                }
                http_hosts: [ 
                    {
                        host: localhost
                        port: 9200
                    }
                ]
            }
            subscribe: [
                {
                    component: sql 
                    stream: data 
                }
            ]
        }
    ]
}

Elasticsearch Input Node

Here is now how you can leverage aggregation elasticsearch queries in the first place.

  1. an elastic_input node performs an aggregation request.
  2. the resulting document is an array, it is processed using a punch snippet of code.
  3. the aggregation is performed using the spark sql node.
  4. the resulting data is written back to elasticsearch, into the apache_aggregation_metrics index.

This strategy is the one often used as it benefits from elasticsearch power to perform the first level of aggregation using all its node, caches and indexing. Hence, only a limited dataset is returned to the spark pml job (in fact only buckets).

{
  type: punchline
  version: "6.0"
  runtime: spark
  tenant: default
  dag: [
        {
            type: elastic_input
            component: input
            settings: {
                index: mytenant-events*
                nodes: [
                    localhost
                ]
                query: {
                    size: 0
                    query: {
                        bool: {
                            must: [
                                {
                                    range: {
                                        @timestamp : {
                                            gte : now-10000m
                                            lt :  now
                                        }
                                    }
                                }
                            ]
                        }
                    }
                    aggregations: {
                        by_channel: {
                            terms: {
                                field: vendor
                            }
                            aggregations: {
                                max_size: {
                                    max: {
                                        field: size
                                    }
                                }
                                total_size: {
                                    sum: {
                                        field: size
                                    }
                                }
                            }
                        }
                    }
                }
            }
            publish: [
                {
                    stream: data
                }
            ]
        }
        {
            type: punch
            component: punch
            settings: {
                punchlet_code: 
                    '''
                    {
                        Tuple buckets;
                        convert(root:[source]).into(buckets);
                        buckets:/ = buckets:[aggregations][by_channel][buckets];
                        for (Tuple obj: buckets.asArray()) {
                            // this step is necessary as column names containing "." character is not well supported (Bad practice) in SQL query
                            obj = toFlatTuple().nestedSeparator("_").on(obj);
                            [key] = obj.getByKey("key");
                            [doc_count] = obj.getByKey("doc_count");
                            [max_size] = obj.getByKey("max_size_value");
                            [total_size] = obj.getByKey("total_size_value");
                        }
                    }
                    '''
                output_columns: [
                    {
                        type: string
                        field: key
                    }
                    {
                        type: string
                        field: doc_count
                    }
                    {
                        type: string
                        field: max_size
                    }
                    {
                        type: string
                        field: total_size
                    }
                ]
            }
            subscribe: [
                {
                    component: input
                    stream: data
                }
            ]
            publish:[
                {
                    stream: data
                }
            ]
        }
        {
            type: sql
            component: sql
            settings: {
                statement: SELECT key, doc_count, max_size, total_size, current_timestamp() AS timestamp FROM punch_data
            }
            subscribe: [
                {
                    component: punch
                    stream: data
                }
            ]
            publish: [
                { 
                    stream: data
                }
            ]
        }
        {
            type: elastic_output
            component: output
            settings: {
                index: apache_aggregation_metrics
                nodes: [ 
                    {
                        host: localhost
                        port: 9200
                    }
                ]
            }
            subscribe: [
                {
                    component: sql 
                    stream: data 
                }
            ]
        }
    ]
}

An aggregation made without the need of a Punch Node...

{
  type: punchline
  version: "6.0"
  runtime: spark
  tenant: default
  dag: [
    {
      component: input
      publish: [
        {
          stream: data
        }
      ]
      settings: {
        aggregation: true
        index: mytenant-events*
        nodes: [
            localhost
        ]
        query: {
          aggregations: {
            by_channel: {
              aggregations: {
                max_size: {
                  max: {
                    field: size
                  }
                }
                total_size: {
                  sum: {
                    field: size
                  }
                }
              }
              terms: {
                field: vendor
              }
            }
          }
          query: {
            bool: {
              must: [
                {
                  range: {
                    @timestamp: {
                      gte: now-1h
                      lt: now
                    }
                  }
                }
              ]
            }
          }
          size: 0
        }
      }
      type: elastic_input
    }
    {
        type: sql
        component: sql
        settings: {
            statement: SELECT aggregation_result.doc_count, aggregation_result.key, aggregation_result.max_size.value AS max_size, aggregation_result.total_size.value AS total_size, doc_count_error_upper_bound, sum_other_doc_count FROM (SELECT explode(buckets) AS aggregation_result, doc_count_error_upper_bound, sum_other_doc_count FROM input_data)
        }
        subscribe: [
            {
                component: input
                stream: data
            }
        ]
        publish: [
            { 
                stream: data
            }
        ]
    }
    {
      component: show
      subscribe: [
        {
          component: sql
          stream: data
        }
      ]
      type: show
    }
  ]
}

Warning

By default, Elasticsearch supports field names containing "dot" or "." character. Which is not the case in the SQL world. It is even consider as a bad practice. It is not recommended to use column name containing "." characters for executing Spark Sql queries.