Skip to content

Spark Punchlines and Plan

Abstract

This track covers the fundamentals of Spark punchlines.

These punchlines can be used as applications, but more often are used as part of a "plan" which is the application layer, repeating the batch punchline.

Punchline structure overview

You can similarly define Spark/Pyspark punchlines which have a similar configuration.

At first, it may seem that it is the same as Storm-like punchline, but for batches.

But read some more. This is a scalability and distribution story of processing/joining big datasets

Stream or Batch

A punchline is an application that processes data. A streaming punchline runs forever and is typically used to continuously collect, transform and/or saves the data from various sources up to some sink(s).

Batch punchlines also process data but run on a finite dataset. Batch punchlines starts, do what they do, then ends. There are many use cases for batch punchlines: aggregations, extractions, machine learning etc..

The beauty of the punch is that you have a single format to deal with both kinds of punchlines.

It is yet more true with Spark punchline, because some spark nodes are streaming-oriented (Kafka stream input/output ) and produce successive micro-batches, so that a typical "batch" punchline can in fact run for ever.

The simplest batch exemple

Let's see a simple punchline reading a csv file.

Key points

  • SQL power node is tremendous because it adds

    • scalable joining capability to aggregations
    • easy filtering by expressions
    • partitioning/grouping/windowing capabilities (grouping start and stop of sessions for example)
  • there is a punch node (if you need to move fields around)

  • the 'show' node is very useful when doing some testing or design. It shows both sample of data and the structure (column names, fields)

But very used simple aggregation is just ES Aggregation => sql explode => ES records;

The 'explode' function of SparkQL allows to split a dictionary of results (like ES aggregation produces) in a table of records. Then we can juste write those records to a new Elasticsearch indices.

E.g. Compute the number of logs of last hour per each type of device.

Example

Browse through the simple Plan syntax overview

Key points

  • A plan defines a schedule for executing a periodic spark or pyspark punchline
  • The plan can compute one or several variables at each execution, based on the current schedule date.
  • The variables are used to cusomize a "template" of punchline for each timeslice computing.
  • The plan stores in elasticsearch a "cursor" of last successful execution. This allows to retry failed timeslices automatically.

Because the plan is an standard punch application, it can run in a channel. You can find a working aggregation application in the aggregation channel of the mytenant tenant. It perfoms periodic aggregations of the maximum and average log size, computed separately for each device "vendor" (apache, cisco, symantec...).

Here are the channel structure:

stop_by_tenant: true
version: "6.0"
start_by_tenant: true

applications:
- args:
  - start
  - --plan
  - plan.yaml
  - --template
  - punchline.yaml
  - --runtime
  - spark
  - --spark-cluster
  - common
  - --deploy-mode
  - client
  - --last-committed  # persistence
  cluster: common
  shiva_runner_tags:
  - common
  name: plan-aggregation
  runtime: shiva
  command: planctl
The plan:
version: '6.0'
name: aggregation
model:
  metric_index: mytenant-metrics
  plan_logs_index: platform-plan-logs*
  input_index: mytenant-events-*
  output_index: mytenant-aggregations
  dates:
    day:
      offset: -PT1m
      format: yyyy.MM.dd
    from:
      offset: -PT1m
      format: yyyy-MM-dd'T'HH:mmZ
    to:
      format: yyyy-MM-dd'T'HH:mmZ
settings:
  cron: '*/1 * * * *'
  persistence:
  - type: elasticsearch
    index_name: platform-plan-cursor
metrics:
  reporters:
  - type: kafka
and the templated punchline:
runtime: spark
metrics:
  reporters:
  - type: kafka
dag:
- type: elastic_input
  settings:
    query:
      query:
        bool:
          must:
          - range:
              '@timestamp':
                lt: '{{ to }}'
                gte: '{{ from }}'
      aggregations:
        by_channel:
          terms:
            field: vendor
          aggregations:
            total_size:
              sum:
                field: size
            max_size:
              max:
                field: size
    index: '{{ input_index }}'
    aggregation: true
    timestamp:
      field_value: '{{ to }}'
      field_name: timestamp
  component: input
  publish:
  - stream: data
- settings:
    statement: SELECT timestamp, aggregation_result.doc_count, aggregation_result.key,
      aggregation_result.max_size.value AS max_size, aggregation_result.total_size.value
      AS total_size, doc_count_error_upper_bound, sum_other_doc_count FROM (SELECT
      explode(buckets) AS aggregation_result, doc_count_error_upper_bound, sum_other_doc_count,
      timestamp FROM input_data)
  component: sql
  subscribe:
  - component: input
    stream: data
  publish:
  - stream: data
  type: sql
- settings:
    index:
      type: constant
      value: '{{ output_index }}-{{ day }}'
  component: output
  subscribe:
  - component: sql
    stream: data
  type: elastic_output
type: punchline
version: '6.0'
tenant: mytenant