Skip to content

Spark Punchlines

Let us move on to the spark world. On the punch, spark applications are simply (again) punchlines. This seems magic, but the reason is quite simple : a spark application can be represented as a graph (more precisely a dag for direct acyclic graph). Punchlines are dags of input, processing and output nodes. If these nodes are spark compatible nodes, there you have it your punchline will run on spark.

A great characteristics of (spark) punchlines is to expose all spark SQL and machine learning api, nodes and stages. In turn you can use punchline to design stream or batch applications, benefiting from spark dataframes, SQL, machine learning and all spark APIs. Punchlines are particularly well equiped with ready to use node to read or write data to/from elasticsearch, kafka and the other COTS integrated in the punch.

Besides machine learning, you can also do lots of simpler useful applications : aggregating, extracting, filtering your data.

Note

spark punchlines heavily relies on Spark paradigms. Although it is not necessary to be a Spark expert to use it, it helps to be familiar with spark concepts. In particular to start using the Machine Learning libraries.

In this getting started guide, we will illustrate a very simple example.

Reading a CSV file

First go to the following folder:

cd $PUNCHPLATFORM_CONF_DIR/samples/punchlines/spark/

There you will find a spark punchlines example that performs a very simple operation: it reads in a csv file and show it to stdout. The data is stored in the the files/AAPL.csv file:

Date,Open,High,Low,Close,Adj Close,Volume
2017-12-28,171.000000,171.850006,170.479996,171.080002,171.080002,16480200
2017-12-29,170.520004,170.589996,169.220001,169.229996,169.229996,25999900
2018-01-02,170.160004,172.300003,169.259995,172.259995,172.259995,25555900
2018-01-03,172.529999,174.550003,171.960007,172.229996,172.229996,29517900
2018-01-04,172.539993,173.470001,172.080002,173.029999,173.029999,22434600
2018-01-05,173.440002,175.369995,173.050003,175.000000,175.000000,23660000
2018-01-08,174.350006,175.610001,173.929993,174.350006,174.350006,20567800
2018-01-09,174.550003,175.059998,173.410004,174.330002,174.330002,21584000
2018-01-10,173.160004,174.300003,173.000000,174.289993,174.289993,23959900
2018-01-11,174.589996,175.490005,174.490005,175.279999,175.279999,18667700
2018-01-12,176.179993,177.360001,175.649994,177.089996,177.089996,25418100
2018-01-16,177.899994,179.389999,176.139999,176.190002,176.190002,29565900
2018-01-17,176.149994,179.250000,175.070007,179.100006,179.100006,34386800
2018-01-18,179.369995,180.100006,178.250000,179.259995,179.259995,31193400
2018-01-19,178.610001,179.580002,177.410004,178.460007,178.460007,32425100
2018-01-22,177.300003,177.779999,176.600006,177.000000,177.000000,27108600
2018-01-23,177.300003,179.440002,176.820007,177.039993,177.039993,32689100
2018-01-24,177.250000,177.300003,173.199997,174.220001,174.220001,51105100
2018-01-25,174.509995,174.949997,170.529999,171.110001,171.110001,41529000
2018-01-26,172.000000,172.000000,170.059998,171.509995,171.509995,37687500

The punchline is defined as a simple hjson file, files/csv_files_to_stdout.hjson. Here is its content:

{
    type: punchline
    tenant: default
    version: "6.0"
    channel: default
    runtime: spark
    dag: [
        {
            description:
            '''
            Read a CSV file and automatically generate the corresponding Dataset
            '''
                type: file_input
                component: input
                settings: {
                    format: csv
                    file_name: AAPL.csv
                }
                publish: [
                {
                    stream: default
                }
            ]
        }
        {
            type: show
            component: show
            subscribe: [
                {
                    stream: default
                    component: input
                }
            ]
            publish: [
                {
                    stream: default
                }
            ]
        }
    ]
    settings: {
      spark.files: ./AAPL.csv
    }
}

Info

HJSON stands for human readable json. It makes configuration files easier to read and understamd. This format is json compatible and if you prefer you can use plain json.

The punchline file contains two (so-called) nodes. A spark application is a in fact a directed graph of nodes, linked the one with each other through a publish subscribe relationship. Run it using the following command:

cd files/
punchlinectl start -p csv_files_to_stdout.pml
OUTPUT of component:input stream:default

| _c0        | _c1        | _c2        | _c3        | _c4        | _c5        | _c6      |

| Date       | Open       | High       | Low        | Close      | Adj Close  | Volume   |
| 2017-12-28 | 171.000000 | 171.850006 | 170.479996 | 171.080002 | 171.080002 | 16480200 |
| 2017-12-29 | 170.520004 | 170.589996 | 169.220001 | 169.229996 | 169.229996 | 25999900 |
| 2018-01-02 | 170.160004 | 172.300003 | 169.259995 | 172.259995 | 172.259995 | 25555900 |
| 2018-01-03 | 172.529999 | 174.550003 | 171.960007 | 172.229996 | 172.229996 | 29517900 |
| 2018-01-04 | 172.539993 | 173.470001 | 172.080002 | 173.029999 | 173.029999 | 22434600 |
| 2018-01-05 | 173.440002 | 175.369995 | 173.050003 | 175.000000 | 175.000000 | 23660000 |
| 2018-01-08 | 174.350006 | 175.610001 | 173.929993 | 174.350006 | 174.350006 | 20567800 |
| 2018-01-09 | 174.550003 | 175.059998 | 173.410004 | 174.330002 | 174.330002 | 21584000 |
| 2018-01-10 | 173.160004 | 174.300003 | 173.000000 | 174.289993 | 174.289993 | 23959900 |

INPUT map for component:show stream:default

| _c0        | _c1        | _c2        | _c3        | _c4        | _c5        | _c6      |

| Date       | Open       | High       | Low        | Close      | Adj Close  | Volume   |
| 2017-12-28 | 171.000000 | 171.850006 | 170.479996 | 171.080002 | 171.080002 | 16480200 |
| 2017-12-29 | 170.520004 | 170.589996 | 169.220001 | 169.229996 | 169.229996 | 25999900 |
| 2018-01-02 | 170.160004 | 172.300003 | 169.259995 | 172.259995 | 172.259995 | 25555900 |
| 2018-01-03 | 172.529999 | 174.550003 | 171.960007 | 172.229996 | 172.229996 | 29517900 |
| 2018-01-04 | 172.539993 | 173.470001 | 172.080002 | 173.029999 | 173.029999 | 22434600 |
| 2018-01-05 | 173.440002 | 175.369995 | 173.050003 | 175.000000 | 175.000000 | 23660000 |
| 2018-01-08 | 174.350006 | 175.610001 | 173.929993 | 174.350006 | 174.350006 | 20567800 |
| 2018-01-09 | 174.550003 | 175.059998 | 173.410004 | 174.330002 | 174.330002 | 21584000 |
| 2018-01-10 | 173.160004 | 174.300003 | 173.000000 | 174.289993 | 174.289993 | 23959900 |

Show node result: input_default

| _c0        | _c1        | _c2        | _c3        | _c4        | _c5        | _c6      |

| Date       | Open       | High       | Low        | Close      | Adj Close  | Volume   |
| 2017-12-28 | 171.000000 | 171.850006 | 170.479996 | 171.080002 | 171.080002 | 16480200 |
| 2017-12-29 | 170.520004 | 170.589996 | 169.220001 | 169.229996 | 169.229996 | 25999900 |

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)

[
  {
    "name": "input_default",
    "title": "SHOW"
  }
]

There you got it. What this simple example shows is how daringly simple, concise and clear it is to design an arbitrary spark pipeline using the punch. Better: the punch ships in with a pipeline graphical editor. Checkout http://localhost:5601/app/punchplatform.

Inferring Types

Let us improve our punchline to make it infer the types of the columns, rather than generating Strings. For instance by using the first line as header and inferring the schema of the dataframe dynamically upon reading.

{
    type: punchline
    tenant: default
    version: "6.0"
    channel: default
    runtime: spark
    dag: [
        {
            description:
            '''
            Read a CSV file and automatically generate the corresponding Dataset
            '''
                type: file_input
                component: input
                settings: {
                    format: csv
                    file_name: AAPL.csv
                    options: {
                        header: true
                        inferSchema: true
                    }
                }
                publish: [
                {
                    stream: default
                }
            ]
        }
        {
            type: show
            component: show
            subscribe: [
                {
                    stream: default
                    component: input
                }
            ]
            publish: [
                {
                    stream: default
                }
            ]
        }
    ]
    settings: {
      spark.files: ./AAPL.csv
    }
}

The output results are now:

....
....
Show node result: input_default

| Date          | Open       | High       | Low        | Close      | Adj Close  | Volume   |

| 2017-12-28... | 171.0      | 171.850006 | 170.479996 | 171.080002 | 171.080002 | 16480200 |
| 2017-12-29... | 170.520004 | 170.589996 | 169.220001 | 169.229996 | 169.229996 | 25999900 |
| 2018-01-02... | 170.160004 | 172.300003 | 169.259995 | 172.259995 | 172.259995 | 25555900 |

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)

[
  {
    "name": "input_default",
    "title": "SHOW"
  }
]

This is better, we now have types automatically inferred.

SQL

Let us now try to calculate the average amount of the Volume column. For that we are going to add an Sql statement Node as follows:.

{
    type: punchline
    tenant: default
    version: "6.0"
    channel: default
    runtime: spark
    dag: [
        {
            description:
            '''
            Read a CSV file and automatically generate the corresponding Dataset
            '''
                type: file_input
                component: input
                settings: {
                    format: csv
                    file_name: AAPL.csv
                    options: {
                        header: true
                        inferSchema: true
                    }
                }
                publish: [
                {
                    stream: default
                }
            ]
        }
        {
            type: sql
            component: sql_statement
            settings: {
                statement: SELECT AVG(Volume) from input_default
            }
            subscribe: [
                {
                    component: input
                    stream: default
                }
            ]
            publish: [
                { 
                    stream: default
                }
            ]
        }
        {
            type: show
            component: show
            subscribe: [
                {
                    stream: default
                    component: sql_statement
                }
            ]
            publish: [
                {
                    stream: default
                }
            ]
        }
    ]
    settings: {
      spark.files: ./AAPL.csv
    }
}

The output is now

...
...
Show node result: sql_statement_default

| avg(Volume) |

| 2.8576825E7 |

root
 |-- avg(Volume): double (nullable = true)

[
  {
    "name": "sql_statement_default",
    "title": "SHOW"
  }
]

More Examples

The PunchPlatform public github comes with many more spark examples which can help you to easily understand PML concepts. These examples are available on this Github repository https://github.com/punchplatform/samples/. Examples are sorted by type in subfolders which contains configuration and a quick README description to explain what the punchline do and how to launch it.

To get these examples simply type in:

git clone https://github.com/punchplatform/samples.git

Conclusions

Sprk punchlines are extremely powerful. With a few nodes, you have stream and batch SQL power at hand. The punch team now uses only spark punchlines to enrich production platforms with various extraction and aggregation jobs. No more coding, which in turn makes it a lot easier to upgrade our customer platforms every year.

Last, punchlines supports both python (pyspark) and java (spark) based machine learning. A dedicated getting started on this topic is planned. The punch standalone already contains various examples.