Pygregator Guide

In a PunchPlatform, events (logs, metrics, whatever) pass through multiple components and usually end up in an Elasticsearch cluster. There they can be visualized using dashboarding tools like Kibana.

Users often need to get statistics and indicators computed from these events. For example top-N indicators computed over some time frame (typically a day, a week or a month). Kibana makes that easy using the Elasticsearch aggregation APIs. Unfortunately when you process billions of events, these aggregations take a lot of resources and time to be processed. In practice it quickly becomes impossible to get a result in an acceptable time. Worst, executing these aggregations puts your Elasticsearch cluster at risk, as it has to process hence load large quantities of data.

Note

For that reason, production PunchPlatform Elasticsearch clusters are configured with protections to prevent too complex queries.

A strategy to address this problem is to pre-process intermediate aggregations. Events are processed from Elasticsearch in background, periodically, and intermediate aggregations results are inserted back into Elasticsearch. When users need to compute and visualize indicators, they are computed from these intermediate aggregations.

At the end, indicators are quickly available when users consult their Kibana dashboards. The PunchPlatform offers a lightweight yet powerful tool to automate and perform these intermediary aggregations very easily : punchplatform-pygregator.

Features

Pygregator offers many useful features.

A flexible way to write aggregations

The tool pulls aggregations queries in a standard directory ($PUNCHPLATFORM_CONF_DIR/resources/pygregator/models in your standalone installation). They are written using the standard Elasticsearch syntax. If you want to build a new aggregation model, please:

  • read Elasticsearch aggregations documentation
  • use the standard models already available to make your own
  • write your aggregation in a jinja file. Name them using the following pattern : <ModelName>Agg.json.j2
  • declare your file in the pygregator configuration file (this is explained right next)
  • relaunch the pygregator and have fun

Aggregations models are templatized using Jinja2 syntax. Aggregated variables are valued with pygregator configuration file. This file, available in PUNCHPLATFORM_CONF_DIR/resources/pygregator/config.yml, contains all the necessary informations to perform your aggregation : source and destination Elasticsearch cluster(s), models, logs fields, and the list of the aggregations to be executed by pygregator, including the values of aggregated variables.

For example an aggregation is referenced in a configuration file as follows:

- name: "MyFirstBeautifulAggregation"
  model: "3TermsAgg"
  indices:
    input:
      - "events-mytenant-bluecoat_proxysg-*"
    output: "agg-mytenant-bluecoat_proxysg-"
    output_prefix_pattern: "%Y.%m.%d"
  fields:
    term_field1: "target.uri.category.raw"
    term_field2: "target.uri.url.raw"
    term_field3: "obs.host.ip"
  date_query:
    field: "lmc.input.ts"
    duration: "3d"

Date patterns are supported for input indices names and have to be specified between characters ‘[‘ and ‘]’. For example you can specify:

events-mytenant-bluecoat_proxysg-[yyyy.MM.dd]

Supported date patterns are ‘yyyy’ (years on 4 digits), ‘MM’ (months on 2 digits), ‘ww’ (weeks on 2 digits), ‘dd’ (days on 2 digits), ‘hh’ (hours on 2 digits), ‘mm’ (minutes on 2 digits) and ‘ss’ (seconds on 2 digits).

If you have to query multiple indices including a date pattern, it’s better to use this solution (date patterns) instead of querying a * pattern: pygregator will query only indices potentially containing wanted data.

Be careful: date patterns are also supported for output indices names but format is different, patterns are %Y (years), %m (months) and %d (days). For example you can specify:

output_prefix_pattern: "%Y-%m-%d"

Play aggregations on time ranges

The previous example will play the MyFirstBeautifulAggregation on the whole specified input indices, restricted to last 3 days. Restrict the aggregation to a given time-scope matching an hour, a month or whatever is a common need.

Doing this is very simple: add a section to your aggregation configuration file and specify the events time-stamp field and duration fields. Here is an example to perform an aggregation over three days:

date_query:
  field: "lmc.input.ts"
  duration: "3d"

The available durations are y (year), M (month), w (week), d (day), h (hour), m (minute) and s (second). Remember that aggregations are executed on past events (excluding current period). For example if current hour is 16h37m and if you set “2h” in date_query.duration you will get aggregates for documents between 14h00m and 16h00m.

Instead of specifying a duration you may also specify an absolute time range, setting from and to parameters:

date_query:
  field: "lmc.input.ts"
  from: "2017-06-01T00:00:00.000"
  to: "2017-06-30T23:59:59.000"

Warning

Be careful, from and to dates have to be specified in UTC. If you’re using Kibana to visualize your data, don’t forget it translates dates in your timezone, so from and to dates may not match events timestamps.

Next you can split the resulting time range into multiple time frames, specifying frame parameter. For example to split it into frames of 4 hours:

date_query:
  field: "lmc.input.ts"
  from: "2017-06-01T00:00:00.000"
  to: "2017-06-30T23:59:59.000"
  frame: "4h"

Or, with duration parameter:

date_query:
  field: "lmc.input.ts"
  duration: "1M"
  frame: "4h"

The frame parameter supports same syntax as duration parameter.

Filter results

Another way to reduce your aggregation scopes is to use Elasticsearch filters.

Syntax in pygregator configuration file is field: value. For example you can specify a filter to aggregate only the action=OBSERVED data. This is illustrated next:

- name: "MyFirstBeautifulAggregation"
  model: "3TermsAgg"
  indices:
    input:
      - "events-mytenant-bluecoat_proxysg-*"
    output: "agg-mytenant-bluecoat_proxysg-"
    output_prefix_pattern: "%Y.%m.%d"
  fields:
    term_field1: "target.uri.category.raw"
    term_field2: "target.uri.url.raw"
    term_field3: "obs.host.ip"
  date_query:
    field: "lmc.input.ts"
    duration: "3d"
  filters:
    - action: "OBSERVED"

As you can see, filters are specified as a list of filters. Be careful, it is understood as a logical AND, not a logical OR.

You also can use range filters instead of exact filters to only select ranges. To do it specify a field and criterias in a range_filters section. For example you can filter documents matching app.return.code between 400 and 499:

- name: "MyFirstBeautifulAggregation"
  model: "3TermsAgg"
  indices:
    input:
      - "events-mytenant-bluecoat_proxysg-*"
    output: "agg-mytenant-bluecoat_proxysg-"
    output_prefix_pattern: "%Y.%m.%d"
  fields:
    term_field1: "target.uri.category.raw"
    term_field2: "target.uri.url.raw"
    term_field3: "obs.host.ip"
  date_query:
    field: "lmc.input.ts"
    duration: "3d"
  filters:
    - action: "OBSERVED"
  range_filters:
    - app.return.code:
        gte: 400
        lt: 500

OR filters can be implemented in two ways. If the field is not an analyzed string and an exact match is wanted, a term_value_or entry can be added:

- name: "MyFirstBeautifulAggregation"
  ...
  term_value_or:
    - "target.uri.category": ["Email", "sharing"]

For the other and more complex cases, you can use a query_string entry:

- name: "MyFirstBeautifulAggregation"
  ...
  query_string: "target.uri.category: (\"Suspicious\" OR \"Uncategorized\")"

Warning

Be careful when using this parameter: a query string is an arbitrary query (Lucene format) executed on an Elasticsearch cluster, so its impact has to be carefully studied before execution.

Field existence requirement can be added:

- name: "MyFirstBeautifulAggregation"
  ...
  exists: ["init.usr.name"]

One-shot and continuous execution

By default an aggregation is executed only once (one-shot mode). It enables pygregator to be called by a cron for example, regularly or at a specific time.

If you want to use pygregator as a daemon managing internally regular executions, it’s also possible. A period can be specified by aggregation. It will be called at launch and every period. For example, to replay previous aggregation every 30 seconds, please specify:

- name: "MyFirstBeautifulAggregation"
  model: "3TermsAgg"
  indices:
    input:
      - "events-mytenant-bluecoat_proxysg-*"
    output: "agg-mytenant-bluecoat_proxysg-"
    output_prefix_pattern: "%Y.%m.%d"
  fields:
    term_field1: "target.uri.category.raw"
    term_field2: "target.uri.url.raw"
    term_field3: "obs.host.ip"
  date_query:
    field: "lmc.input.ts"
    duration: "3d"
  period: "30s"

The available period are y (year), M (month), w (week), d (day), h (hour), m (minute) and s (second) (like duration parameter).

You can add a safety delay to make sure you are not processing time ranges where not all the logs have yet arrived:

- name: "MyFirstBeautifulAggregation"
  ...
  safety_delay: "6h"

In case of continuous execution, this will result in executing the aggregation right away on data old enough to be right oustide the safety delay zone.

In case of one-shot execution, a warning will be logged if the aggregation timespan overlaps with the safety timespan. This is because events logs might not be all available at the time of aggregation.

Multiple-levels aggregations

Aggregations can be executed at multiple levels. It means that a fine-grain aggregation can be played every day to get statistics on events for one day for example, then a second-level aggregation can be executed to aggregate these aggregation results to get statistics on the full year.

It has to be compared to pipeline aggregations (internal Elasticsearch implementation) which can also compute aggregation results in one request. Pipeline aggregations are more effective if events are already available but in case of a system collecting events in streaming we may prefer to execute small aggregations in streaming (or almost) and aggregate these intermediate results after.

You can define these multiple-levels aggregations in a single pygregator configuration file. To guarantee execution order (necessary if an aggregation works on another aggregation’s results), you can specify a priority:

- name: "MyFirstBeautifulAggregation"
  model: "3TermsAgg"
  indices:
    input:
      - "events-mytenant-bluecoat_proxysg-*"
    output: "agg-mytenant-bluecoat_proxysg-"
    output_prefix_pattern: "%Y.%m.%d"
  fields:
    term_field1: "target.uri.category.raw"
    term_field2: "target.uri.url.raw"
    term_field3: "obs.host.ip"
  priority: 20

Before to execute aggregations, pygregator checks their priority, executes the lowest first and the highest last (a low value means a high priority). So to be sure an aggregation will be executed first, set its priority to 0.

A priority is greater or equal to 0 and stored as an integer. By default (if no specified) an aggregation has a priority of 1000.

Note

If a period and a priority have been specified for a set of aggregations, the execution order will be guaranteed by priority values at first execution, but next it will be guaranteed by period values.

Because of internal Elasticsearch behavior, aggregations results are not available in Elasticsearch before a few seconds or minutes (according to its configuration). It can cause troubles: an aggregation working on a first aggregation’s results may not see these results. In this case, you should use waiting_time parameter. It sets a number of seconds pygregator will wait before to execute the aggregation. For example:

- name: "MyFirstBeautifulAggregation"
  model: "3TermsAgg"
  indices:
    input:
      - "events-mytenant-bluecoat_proxysg-*"
    output: "agg-mytenant-bluecoat_proxysg-"
    output_prefix_pattern: "%Y.%m.%d"
  fields:
    term_field1: "target.uri.category.raw"
    term_field2: "target.uri.url.raw"
    term_field3: "obs.host.ip"
  priority: 20
  waiting_time: 30

Of course this parameter must be set on the second level aggregation.

Warning

Parameter waiting_time is ignored if a period has been specified.

Architecture

Pygregator output format

By default, Pygregator stores multiple documents in Elasticsearch for a unique aggregation. Each document represents an aggregated result for a particular set of variables values.

For example if we play our previous aggregation we will get a document for each set of URL-Category-IP values:

{
  "meta": {
    "name": "MyFirstBeautifulAggregation",
    "model": "3TermsAgg",
    "input": "http://localhost:9200",
    "index": "events-mytenant-bluecoat_proxysg-*",
    "duration": 1944.0839290618896,
    "date": {
      "agg": "2017-06-08T16:15:46.000",
      "from": "2017-06-05T00:00:00.000",
      "to": "2017-06-08T00:00:00.000"
    }
  },
  "aggregation": {
    "target_uri_url_raw_val": "https://www.google.com",
    "target_uri_category_raw_val": "WebAds/Analytics",
    "obs_host_ip_val": "10.43.2.101",
    "session_output_bytes": 12564,
    "metric" : 29,
    "errors" : {
      "target_uri_category_raw_doc_count_error_upper_bound" : 12,
      "target_uri_category_raw_sum_other_doc_count" : 8425
    }
  }
}

The aggregation section contains following values:

<variable>_val String Variable value (multiple values if multiple variables). This value can match a bucket (in case of bucket aggregation or a metric if it has been specified (for example a sum, an average, …) in addition to docs count.
<variable>_range String Variable value (multiple values if multiple variables) if aggregation type is range
metric Number Count of matched documents
errors JSON object Aggregation errors (may occur in case of terms aggs)
errors.<variable>_doc_count_error_upper_bound Number Number of documents of the high/top bucket which is not returned in aggregation result in the worst case. This information is really important if not null, we could potentially have a bucket not returned with a number of documents higher than buckets returned. To be sure this problem does not appear, set a high value to “size” parameter. Only terms aggregations are concerned.
errors.<variable>_sum_other_doc_count Number Sum of the document counts for all buckets that are not part of the response. Only terms aggregations are concerned.

Note

Metrics (except docs count) are represented as <variable>_var (like bucket values). It could be disturbing, it has been designed like that because of exploitation operations (Kibana visualization building interface). This format could evolve in the future (metrics could be placed in a metric section).

The meta section contains informations about aggregation: aggregation name, model (as defined in resources), input Elasticsearch cluster endpoint

name String Aggregation name (as defined in configuration file)
model String Aggregation model (as defined in resources)
input String Input Elasticsearch cluster endpoint
index String Input Elasticsearch index name
duration Number Aggregation request duration in ms (for performance analysis)
date JSON object Contains date informations (see next lines)
date.agg String Aggregation result insertion date
date.from String First date of aggregation time scope (if date_query section has been specified in aggregation configuration)
date.to String Last date of aggregation time scope (if date_query section has been specified in aggregation configuration)

If an aggregation request fails, then only one record will be inserted in Elasticsearch for the whole aggregation. It will contain error message (as returned by Elasticsearch) in an error field.

The choice to store many documents for each aggregation permits to exploit aggregations results in a flexible way, for example with Kibana (instead of aggregation results directly returned by Elasticsearch). Of course this choice has an impact on storage, this aspect is explained in System considerations section.

Note

Following option could be implemented in a near future: directly write compact Elasticsearch aggregation results.

Supported Elasticsearch aggregations

Because of a results re-writing, pygregator only supports a subset of Elasticsearch aggregations types for the moment.

Syntax used to write an aggregation is Elasticsearch 2.4 syntax.

Supported metrics aggregations:

Aggregation type In Kibana
doc_count (default) Count
Avg Average
Cardinality Unique count
Min Min
Max Max
Sum Sum

Supported buckets aggregations:

Aggregation type In Kibana
Terms Terms
Range Range
Date Range Date Range
IP Range IPv4 Range
Missing <Not possible>

You can also define pipeline aggregations with an additional mandatory parameter: “buckets_path”. The “buckets_path” has the same definition and structure as defined by Elasticsearch. Pygregator will retrieve in the aggregation results, the value corresponding to the path.

Accuracy

Please pay attention to aggregations accuracy, especially when you process terms aggregations. Data is sharded so by design Elasticsearch results have not an infinite accuracy (see Elasticsearch documentation). By default it computes aggregations keeping only top 10 buckets, ignoring lower results and potentially generating accuracy troubles.

This accuracy can be increased using size parameter (see 3TermsAgg.json.j2 model). Be careful it has an impact on performances (request are slower with a high size) and on storage (more aggregations results).

High availability, idempotency and replay

Pygregator is not distributed, so it’s not natively highly available. So high availability has to be managed at a higher level. Two implementations are possible:

  • active-active : you could use for example systemd or supervisord to ensure pygregator is always running in multiples machines
  • active-passive : you could use for example a distributed crontab to ensure pygregator is running in one machine in your infrastructure

Active-active mode is possible with a low impact on your infrastructure: aggregations requests will be executed several times (there is a CPU loss) but results will be stored once in your cluster (so no impact on storage). Indeed for each result an ID is built from aggregation parameters and variable values.

This mechanism also permits to replay aggregations for example to update results when new data arrive in a dataset already aggregated. It’s based on unique ID generation : in a few words some aggregation parameters and results are concatenated then hashed using MD5 algorithm. In some cases, if you generate many aggregation results in a single index, collisions could occur. In this case you should disable MD5 hashing (and only use concatenation), specifying this line in your pygregator configuration file:

id_generation_method: "concatenation"

Please note that by default pygregator persists its internal state in Elasticsearch (see Monitoring and recovery section) so it doesn’t replay aggregates on time ranges already successfully processed. To do it please use following force_replay option (in pygregator configuration file). For example:

- name: "MyFirstBeautifulAggregation"
  model: "3TermsAgg"
  indices:
    input:
      - "events-mytenant-bluecoat_proxysg-*"
    output: "agg-mytenant-bluecoat_proxysg-"
    output_prefix_pattern: "%Y.%m.%d"
  fields:
    term_field1: "target.uri.category.raw"
    term_field2: "target.uri.url.raw"
    term_field3: "obs.host.ip"
  date_query:
    field: "lmc.input.ts"
    duration: "3d"
  force_replay: true

Scalability

Pygregator is not distributed, so it’s not highly scalable. It’s not a big deal because aggregation requests are executed by Elasticsearch (not by pygregator itself) and this one of course is scalable.

In fact you can scale pygregator launching multiple instances on several machines with different aggregations to compute, but you cannot scale a single aggregation request in pygregator. The only problem that can happen is a request returning many - many - results. In this case the machine hosting pygregator must contain enough memory (proportional to the number of results) (see next section) to compute it.

Monitoring and recovery

Pygregator stores its internal state in Elasticsearch. This state can be used for two use-cases:

  1. Monitoring pygregator by an external application
  2. Restart-from-last-success in case of failure (when a failure occurs on an aggregation, pygregator stops to execute it). At starting phase it checks if aggregation state exists and restart from last success.

There is one state stored in a Elasticsearch index named .pygregator for each aggregation (defined in pygregator configuration file). A state is structured as following:

{
  # Aggregation parameters, as defined in pygregator configuration file
  "aggregation_parameters" : {
    # Logical aggregation name
    "name": "MyFirstBeautifulAggregation",
    # Aggregation model
    "model": "3TermsAgg",
    # Input Elasticsearch endpoint
    "input_elasticsearch_endpoint": "http://localhost:9200",
    # Input Elasticsearch index
    "input_index": "events-mytenant-bluecoat_proxysg-*",
    # Date query (see "Play aggregations on time ranges" section)
    "date_query": {
      "duration": "1d",
      "frame": "3h"
    }
  },
  # Host running aggregation
  "host": "LMCSOC01P",
  # Last timerange successfully processed
  "last_timerange_processed": {
    "from": "2017-06-05T00:00:00.000",
    "to": "2017-06-08T00:00:00.000"
  },
  # Last aggregation status update date
  "last_update": "2017-06-08T16:15:46.000",
  # Last execution information
  "message" : "Aggregation succeeded"
}

System considerations and performances

Storage

Aggregations impact on storage is strongly dependant on:

  1. data variability: number of aggregation results is directly proportional to data variability. For example if you proceed a term aggregation on a category field, you will get as many results as there are different categories in your data set. For a low-variable field you will get a small results dataset, for a high-variable field (for example an IP) you will get a big results dataset.

    Note

    As explained previously, accuracy can be arbitrary setted, so even if you have a high-variability dataset you can avoid big results dataset by setting a low accuracy (it’s a compromise between storage and accuracy).

  2. time-scope grain: if you play aggregations many times on small time-scope you will, of course, get a high number of results. So be sure of usefulness of your aggregations before to choose time-scope grain.

Please note that data volume does not have impact on storage (it has impact on RAM and CPU only).

Storage also depends on Elasticsearch mapping set for aggregations results. For example be sure to disable fields analysis if you don’t use it.

RAM and CPU

What is important to remember is that it’s Elasticsearch whose performing aggregations, not pygregator who is only responsible to manage higher-levels services (like results re-writing). So almost all RAM and CPU resources used for aggregations use-cases are used by the Elasticsearch cluster.

However pygregator also need resources to work. In particular it needs RAM to store aggregations results. Indeed it doesn’t send results one by one to Elasticsearch but uses bulk requests (sending a lot of results into one REST call). It stores its big bulk requests into memory before to send it so it consumes memory, depending on aggregations complexity.

Note

Following option could be implemented in a near future: write bulk requests on disk instead of memory before sending it to Elasticsearch.

Limits

Pygregator is single-thread, so it cannot use the whole capacity of a multi-core machine. To do it, you could dispatch pygregator configuration file into multiple files and launch pygregator multiple times.

Another point is memory usage: it might be important if there are a lot of aggregations results: pygregator gets the HTTP results returned by Elasticsearch, then transform them to dictionaries, compute results and store future bulk requests in memory.

Note

Memory usage could be optimized in the future (JSON decoding streaming, …).

Pygregator doesn’t support multiple metrics aggregation in the same (bucket) aggregation, it will be implemented in a near future.

It also doesn’t support fields renaming: if a user configure an aggregation on fields strawberry and banana, aggregation values will be strawberry_val and banana_val (instead of arbitrary names).

Performance

Aggregations requests may take more or less time to be executed, following its complexity. If you nest multiple aggregations on a dataset with a high variability it will take a long time to have a result.

Request duration (in milli-seconds) is available as a meta information in each aggregation record written on Elasticsearch. Its JSON key is meta.duration.