Spark Punchlines¶
In the previous chapter, we have set up a stream punchline. Stream punchlines can be run in one of the streaming runtime in the Punch : Storm or Shiva. For batch punchlines, the Punch leverages Spark.
In Punch, Spark applications are also punchlines. This seems magic, but the reason is quite simple : a Spark application can be represented as a graph (more precisely a dag for direct acyclic graph). Punchlines are dag of input, processing and output nodes. If these nodes are Spark nodes, then you have a punchline running on Spark.
Spark punchlines makes it possible to use Spark dataframes, SQL, Machine Learning and all spark APIs in your batch applications. You can also use Spark punchlines to do lots of simpler useful applications : aggregating, extracting, filtering your data. The Punch provides Spark nodes to read or write data to or from Elasticsearch, Kafka and the other COTS integrated in the Punch.
Note
Spark punchlines heavily relies on Spark paradigms. Although it is not necessary to be a Spark expert to use it, it helps to be familiar with Spark concepts. In particular when using the Machine Learning libraries.
In this Getting Started guide, we will run a simple example of a Spark punchline.
Reading a CSV file¶
This chapter demonstrate how Punchlines and Spark/Pyspark works together. Go to the sample folder:
cd $PUNCHPLATFORM_CONF_DIR/samples/punchlines/spark/read_file
tree .
├── AAPL.csv
├── csv_to_stdout_pyspark.yaml
└── csv_to_stdout_spark.yaml
There you will find a spark punchlines example that performs a simple operation:
- read a csv file
AAPL.csv
- show it to stdout
Date,Open,High,Low,Close,Adj Close,Volume
2017-12-28,171.000000,171.850006,170.479996,171.080002,171.080002,16480200
2017-12-29,170.520004,170.589996,169.220001,169.229996,169.229996,25999900
2018-01-02,170.160004,172.300003,169.259995,172.259995,172.259995,25555900
2018-01-03,172.529999,174.550003,171.960007,172.229996,172.229996,29517900
...
csv_to_stdout_spark.yaml
csv_to_stdout_pyspark.yaml
Let us have a look at the java spark punchline:
---
type: punchline
version: '6.0'
runtime: spark
dag:
- type: file_input
settings:
path: "./AAPL.csv"
format: csv
options:
#inferSchema: true
header: true
component: input
publish:
- stream: data
- type: show
component: show
subscribe:
- component: input
stream: data
publish: []
Have a look at the pyspark file. Only the runtime attribute differs
That punchline contains two nodes linked through a publish-subscribe relationship. Run it using the following command:
punchlinectl --tenant mytenant start --punchline csv_to_stdout_spark.yaml
punchlinectl --tenant mytenant start --punchline csv_to_stdout_pyspark.yaml
The output looks like this:
# output
| Date | Open | High | Low | Close | Adj Close | Volume |
| 2017-12-28 | 171.000000 | 171.850006 | 170.479996 | 171.080002 | 171.080002 | 16480200 |
| 2017-12-29 | 170.520004 | 170.589996 | 169.220001 | 169.229996 | 169.229996 | 25999900 |
| 2018-01-02 | 170.160004 | 172.300003 | 169.259995 | 172.259995 | 172.259995 | 25555900 |
root
|-- Date: string (nullable = true)
|-- Open: string (nullable = true)
|-- High: string (nullable = true)
|-- Low: string (nullable = true)
|-- Close: string (nullable = true)
|-- Adj Close: string (nullable = true)
|-- Volume: string (nullable = true)
[
{
"name": "input_data",
"title": "SHOW"
}
]
Inferring Types¶
Let us improve our punchline to make it infer the types of the columns, rather than generating Strings. For instance by using the first line as header and inferring the schema of the dataframe dynamically upon reading.
Edit the csv_to_stdout_spark.yaml and uncomment: inferSchema
...
options:
inferSchema: true
header: true
...
Run again your punchline and see now the result:
Show node result: input_data
| Date | Open | High | Low | Close | Adj Close | Volume |
| 2017-12-28... | 171.0 | 171.850006 | 170.479996 | 171.080002 | 171.080002 | 16480200 |
| 2017-12-29... | 170.520004 | 170.589996 | 169.220001 | 169.229996 | 169.229996 | 25999900 |
| 2018-01-02... | 170.160004 | 172.300003 | 169.259995 | 172.259995 | 172.259995 | 25555900 |
root
|-- Date: timestamp (nullable = true)
|-- Open: double (nullable = true)
|-- High: double (nullable = true)
|-- Low: double (nullable = true)
|-- Close: double (nullable = true)
|-- Adj Close: double (nullable = true)
|-- Volume: integer (nullable = true)
[
{
"name": "input_data",
"title": "SHOW"
}
]
We now have types automatically inferred.
SQL¶
Let us now try to calculate the average amount of the Volume
column.
For that we are going to add an SQL statement Node as follows:.
...
- type: sql
component: sql
settings:
statement: SELECT AVG(Volume) AS volume from input_data
publish:
- stream: data
subscribe:
- component: input
stream: data
...
Execute :
punchlinectl -t mytenant start -p csv_to_stdout_spark_sql.yaml
The output is:
Show node result: sql_data
| volume |
| 2.8576825E7 |
root
|-- volume: double (nullable = true)
[
{
"name": "sql_data",
"title": "SHOW"
}
]
Conclusion¶
Spark punchlines are extremely powerful. With a few nodes, you have batch SQL power at hand. The Punch team uses Spark punchlines to enrich production platforms with various extraction and aggregation jobs. No more Spark jobs coding, only pipeline configuration.
Punchlines support both Python (Pyspark) and Java (Spark) based Machine Learning jobs. A dedicated Getting Started on this topic is planned. The punch Standalone already contains various examples.