Jobs
A job consists in a directed acyclic graph (DAG) of steps called
nodes
. Each node can publish some values and subscribe to other node(s)
published values. Think of a node as a function with in and out parameters.
All that is expressed in the job configuration file. A node can publish any type
of data : numbers, datasets, machine-learning model, ...
A node is executed only once during the job lifetime, whenever an other node requests one of its published data for the first time.
Tip
This is in contrast with storm bolts and spouts, executed for every received tuple
Job Execution Type¶
Spark natively offers the possibility to execute your job in several kinds of mode:
- client
- cluster
- local
With our punchplatform-analytics.sh, these mode are made available to the user,
with the exeception of an additional mode: foreground
, which is our debug mode.
On a spark production cluster, deployed and configured by our punchplatform-deployer.sh, cleint and cluster mode will have different behaviour.
Below is a small summary of what can be done or cannot be done:
Mode | File broadcasting? | Background? |
---|---|---|
foreground | True | False |
client (local) | True | False |
client (remote) | True | True |
cluster | False | True |
Tip
In cluster mode, all files you wish to execute with your job should be present on each node of your spark cluster and in the exact same location.
Here is a graphical representation of these various mode.
Foreground¶
Client (local)¶
Client (remote)¶
Cluster¶
Pml Nodes¶
Let us start by describing how you define a node. We will describe next how you define the complete graph. Nodes are defined using the hjson format. Here is an example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | { job: [ { description: ''' Add a useful description to you node. This will allow PML editors and morte generally everyone to have a clear idea of the PML logic. Here this example simply generates some data. To be more precise, it will generate a spark Dataset<Row> ''' // PML provides many ready to use nodes. Here we use the batch_input // that makes it easy to understand and test PML files. // Note also how you can add comments to PML files. type: batch_input // this is a mandatory field to uniquely name your node. component: input // the settings settings: { input_data: [ { age: 21 name: phil } { age: 23 name: alice } { age: 53 name: dimi } ] output_fields: [ { type: integer json_field: age } { type: string json_field: name } ] } // each node publish its value (here a Dataset) onto a stream. // This particular batch_input node publishes a Dataset. publish: [ { // the value is puiblished on a so-called stream. // This will become clear in the next example when // defining a complete PML. stream: default } ] } ] } |
As you now understood, if you assemble several nodes, you have your complete Graph, that is, your Job.
A job is a simple array of nodes. The graph is implicitly defined by the subscribe-publish relationship.
Published Values¶
In each node, the publish
array defines the value or list of values published by a node.
Each value is named, and can be disambiguated by an optional tag. Tags are explained below.
What you must first understand is what are the published values.
The rule is simple : most nodes publish a single Dataset
This said some other nodes publish only a single value. Think of publishing a count for some operation for example. Here is an example of a Count node:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | { type: count component: count subscribe: [ # this must be a dataset { component: batch_input stream: default } ], publish: [ # this is a singleton long value { stream: default } ] } |
Tip
As you can see nothing really explicitly tells you that count node publishes a single value. Refer to the documentation of each node to find outs what it publishes.
Here is a third example . The following node reads a csv file and publishes its contents as a dataset. Each line of the file will be converted to a row. Hence the published value is of type Dataset<Row>
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | { description: ''' Read a CSV file and automatically generate the corresponding Dataset ''' type: file_csv_input component: input settings: { file_path : ./AAPL.csv } publish: [ { stream: default } ] } |
Subscribed Values¶
The subscribe
field defines the list of value a node subscribes to.
This basically defines the node input parameters.
Here is an example you are liley to use a lot : the show node. The following is a complete
PML that will print out to stdout the Dataset generated by the csv_file_input
node:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | { job: [ { description: ''' Read a CSV file and automatically generate the corresponding Dataset ''' type: file_csv_input component: input settings: { file_path : ./AAPL.csv } publish: [ { stream: default } ] } { type: show component: show subscribe: [ { stream: default component: input } ] } ] } |
If you execute this PML you will get the following output:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | $ punchplatform-analytics.sh --job csv_files_to_stdout.hjson SHOW: +----------+----------+----------+----------+----------+----------+--------+ | _c0| _c1| _c2| _c3| _c4| _c5| _c6| +----------+----------+----------+----------+----------+----------+--------+ | Date| Open| High| Low| Close| Adj Close| Volume| |2017-12-28|171.000000|171.850006|170.479996|171.080002|171.080002|16480200| |2017-12-29|170.520004|170.589996|169.220001|169.229996|169.229996|25999900| |2018-01-02|170.160004|172.300003|169.259995|172.259995|172.259995|25555900| |2018-01-03|172.529999|174.550003|171.960007|172.229996|172.229996|29517900| |2018-01-04|172.539993|173.470001|172.080002|173.029999|173.029999|22434600| |2018-01-05|173.440002|175.369995|173.050003|175.000000|175.000000|23660000| |2018-01-08|174.350006|175.610001|173.929993|174.350006|174.350006|20567800| |2018-01-09|174.550003|175.059998|173.410004|174.330002|174.330002|21584000| |2018-01-10|173.160004|174.300003|173.000000|174.289993|174.289993|23959900| |2018-01-11|174.589996|175.490005|174.490005|175.279999|175.279999|18667700| |2018-01-12|176.179993|177.360001|175.649994|177.089996|177.089996|25418100| |2018-01-16|177.899994|179.389999|176.139999|176.190002|176.190002|29565900| |2018-01-17|176.149994|179.250000|175.070007|179.100006|179.100006|34386800| |2018-01-18|179.369995|180.100006|178.250000|179.259995|179.259995|31193400| |2018-01-19|178.610001|179.580002|177.410004|178.460007|178.460007|32425100| |2018-01-22|177.300003|177.779999|176.600006|177.000000|177.000000|27108600| |2018-01-23|177.300003|179.440002|176.820007|177.039993|177.039993|32689100| |2018-01-24|177.250000|177.300003|173.199997|174.220001|174.220001|51105100| |2018-01-25|174.509995|174.949997|170.529999|171.110001|171.110001|41529000| +----------+----------+----------+----------+----------+----------+--------+ only showing top 20 rows root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true) |-- _c4: string (nullable = true) |-- _c5: string (nullable = true) |-- _c6: string (nullable = true) |
If you are familiar with Spark or with any machine learning notebook, you will be at ease.
Disambiguating Parameters¶
As soon as you will write realistic jobs, you will hit the issue of disambiguating and naming your in and out parameters. Examples will make this clear. Here is a basic pml performing the sum of two values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | { job: [ { "type": "number_generator", "component": "my_first_number_generator", "settings": { "value": 1.0 }, "publish": [ { "stream": "value" } ] }, { "type": "number_generator", "component": "my_second_number_generator", "settings": { "value": 2.0 }, "publish": [ { "stream": "value" } ] }, { "type": "number_addition", "component": "additioner", "subscribe": [ { "component": "my_first_number_generator", "stream": "value" }, { "component": "my_second_number_generator", "stream": "value" } ], "publish": [ { "stream": "value" } ] } ] } |
This works fine, no need to disambiguate anything because the sum is a commutative operation, the additioner does not care about the order of input values it simply has to sum them.
Things are different with a division. There you would need to know which value is the denominator and which is the numerator. Here is how you disambiguate the one from the other using tags :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | { "type": "number_division", "component": "divisioner", "subscribe": [ { "component": "my_first_number_generator", "stream": "value", "tag": "numerator" }, { "component": "my_second_number_generator", "stream": "value", "tag": "denominator" } ], "publish": [ { "stream": "value" } ] } |
As you probably guess, the tag field in there makes it possible to associate a given role to each parameter. There roles (i.e. the value of each tag, here ) is specifically defined and documented by each node type. You will find something like this in the number_division node documentation:
For subscriptions:
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
numerator | Yes | Singleton |
None | Numerator value |
denominator | Yes | Singleton |
None | Denominator value |
For publications:
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton |
None | The division result |
Some nodes require more than tags. Consider the following node executing arbitrary formulaes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | { "type": "number_formula", "component": "formula", "settings": { "formulae": [ "c = a / b", "d = c + 5" ] }, "subscribe": [ { "component": "my_first_number_generator", "stream": "value", "name": "a" }, { "component": "my_second_number_generator", "stream": "value", "name": "b" } ], "publish": [ { "stream": "value", "name": "d" } ] } |
Here we use names to specifically attribute a name to each input parameter. In turn you can refer to it in the node settings. This example also illustrate how the publish section can select what will be the pulished value.
Job Execution¶
To learn how to use the punchplatform-analytics.sh
command
to run jobs, please refer to the
Execution Jobs section.