Spark Punchlines and Plan¶
Abstract
This track covers the fundamentals of Spark punchlines.
These punchlines can be used as applications, but more often are used as part of a "plan" which is the application layer, repeating the batch punchline.
Punchline structure overview¶
You can similarly define Spark/Pyspark punchlines which have a similar configuration.
At first, it may seem that it is the same as Storm-like punchline, but for batches.
But read some more. This is a scalability and distribution story of processing/joining big datasets
Stream or Batch¶
A punchline is an application that processes data. A streaming punchline runs forever and is typically used to continuously collect, transform and/or saves the data from various sources up to some sink(s).
Batch punchlines also process data but run on a finite dataset. Batch punchlines starts, do what they do, then ends. There are many use cases for batch punchlines: aggregations, extractions, machine learning etc..
The beauty of the punch is that you have a single format to deal with both kinds of punchlines.
It is yet more true with Spark punchline, because some spark nodes are streaming-oriented (Kafka stream input/output ) and produce successive micro-batches, so that a typical "batch" punchline can in fact run for ever.
The simplest batch exemple¶
Let's see a simple punchline reading a csv file.
Key points
-
SQL power node is tremendous because it adds
- scalable joining capability to aggregations
- easy filtering by expressions
- partitioning/grouping/windowing capabilities (grouping start and stop of sessions for example)
-
there is a punch node (if you need to move fields around)
-
the 'show' node is very useful when doing some testing or design. It shows both sample of data and the structure (column names, fields)
But very used simple aggregation is just ES Aggregation => sql explode => ES records;
The 'explode' function of SparkQL allows to split a dictionary of results (like ES aggregation produces) in a table of records. Then we can juste write those records to a new Elasticsearch indices.
E.g. Compute the number of logs of last hour per each type of device.
Example¶
Browse through the simple Plan syntax overview
Key points
- A plan defines a schedule for executing a periodic spark or pyspark punchline
- The plan can compute one or several variables at each execution, based on the current schedule date.
- The variables are used to cusomize a "template" of punchline for each timeslice computing.
- The plan stores in elasticsearch a "cursor" of last successful execution. This allows to retry failed timeslices automatically.
Because the plan is an standard punch application, it can run in a channel.
You can find a working aggregation application in the aggregation
channel of the
mytenant
tenant. It perfoms periodic aggregations of the maximum and average log size,
computed separately for each device "vendor" (apache, cisco, symantec...).
Here are the channel structure:
stop_by_tenant: true
version: "6.0"
start_by_tenant: true
applications:
- args:
- start
- --plan
- plan.yaml
- --template
- punchline.yaml
- --runtime
- spark
- --spark-cluster
- common
- --deploy-mode
- client
- --last-committed # persistence
cluster: common
shiva_runner_tags:
- common
name: plan-aggregation
runtime: shiva
command: planctl
version: '6.0'
name: aggregation
model:
metric_index: mytenant-metrics
plan_logs_index: platform-plan-logs*
input_index: mytenant-events-*
output_index: mytenant-aggregations
dates:
day:
offset: -PT1m
format: yyyy.MM.dd
from:
offset: -PT1m
format: yyyy-MM-dd'T'HH:mmZ
to:
format: yyyy-MM-dd'T'HH:mmZ
settings:
cron: '*/1 * * * *'
persistence:
- type: elasticsearch
index_name: platform-plan-cursor
metrics:
reporters:
- type: kafka
runtime: spark
metrics:
reporters:
- type: kafka
dag:
- type: elastic_input
settings:
query:
query:
bool:
must:
- range:
'@timestamp':
lt: '{{ to }}'
gte: '{{ from }}'
aggregations:
by_channel:
terms:
field: vendor
aggregations:
total_size:
sum:
field: size
max_size:
max:
field: size
index: '{{ input_index }}'
aggregation: true
timestamp:
field_value: '{{ to }}'
field_name: timestamp
component: input
publish:
- stream: data
- settings:
statement: SELECT timestamp, aggregation_result.doc_count, aggregation_result.key,
aggregation_result.max_size.value AS max_size, aggregation_result.total_size.value
AS total_size, doc_count_error_upper_bound, sum_other_doc_count FROM (SELECT
explode(buckets) AS aggregation_result, doc_count_error_upper_bound, sum_other_doc_count,
timestamp FROM input_data)
component: sql
subscribe:
- component: input
stream: data
publish:
- stream: data
type: sql
- settings:
index:
type: constant
value: '{{ output_index }}-{{ day }}'
component: output
subscribe:
- component: sql
stream: data
type: elastic_output
type: punchline
version: '6.0'
tenant: mytenant