Skip to content

Jobs

A job consists in a directed acyclic graph (DAG) of steps called nodes. Each node can publish some values and subscribe to other node(s) published values. Think of a node as a function with in and out parameters. All that is expressed in the job configuration file. A node can publish any type of data : numbers, datasets, machine-learning model, ...

A node is executed only once during the job lifetime, whenever an other node requests one of its published data for the first time.

Tip

This is in contrast with storm bolts and spouts, executed for every received tuple

Job Execution Type

Spark natively offers the possibility to execute your job in several kinds of mode:

  • client
  • cluster
  • local

With our punchplatform-analytics.sh, these mode are made available to the user, with the exeception of an additional mode: foreground, which is our debug mode.

On a spark production cluster, deployed and configured by our punchplatform-deployer.sh, cleint and cluster mode will have different behaviour.

Below is a small summary of what can be done or cannot be done:

Mode File broadcasting? Background?
foreground True False
client (local) True False
client (remote) True True
cluster False True

Tip

In cluster mode, all files you wish to execute with your job should be present on each node of your spark cluster and in the exact same location.

Here is a graphical representation of these various mode.

Foreground

image

Client (local)

image

Client (remote)

image

Cluster

image

Pml Nodes

Let us start by describing how you define a node. We will describe next how you define the complete graph. Nodes are defined using the hjson format. Here is an example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
{
  job: [
           {
               description:
               '''
               Add a useful description to you node. This will allow PML editors
               and morte generally everyone to have a clear idea of the PML logic.
               Here this example simply generates some data. To be more precise,
               it will generate a spark Dataset<Row>
               '''
               // PML provides many ready to use nodes. Here we use the batch_input
               // that makes it easy to understand and test PML files.
               // Note also how you can add comments to PML files.
               type: batch_input

               // this is a mandatory field to uniquely name your node.
               component: input

               // the settings 
               settings: {
                   input_data: [
                       {
                           age: 21
                           name: phil
                       }
                       {
                           age: 23
                           name: alice
                       }
                       {
                           age: 53
                           name: dimi
                       }
                   ]
                   output_fields: [
                       {
                           type: integer
                           json_field: age
                       }
                       {
                           type: string
                           json_field: name
                       }
                   ]
               }
               // each node publish its value (here a Dataset) onto a stream.
               // This particular batch_input node publishes a Dataset.
               publish: [
                   {
                       // the value is puiblished on a so-called stream. 
                       // This will become clear in the next example when
                       // defining a complete PML.
                       stream: default
                   }
               ]
           }
       ]
}

As you now understood, if you assemble several nodes, you have your complete Graph, that is, your Job.

A job is a simple array of nodes. The graph is implicitly defined by the subscribe-publish relationship.

Published Values

In each node, the publish array defines the value or list of values published by a node. Each value is named, and can be disambiguated by an optional tag. Tags are explained below.

What you must first understand is what are the published values. The rule is simple : most nodes publish a single Dataset. PML purpose is to leverage as much as possible the power of Dataset, Sparl SQL and MLlib. Working with Datasets provides you with both performance and expression power.

This said some other nodes publish only a single value. Think of publishing a count for some operation for example. Here is an example of a Count node:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
    type: count
    component: count
    subscribe: [
        # this must be a dataset
        { 
            component: batch_input
            stream: default 
        }
    ],
    publish: [
        # this is a singleton long value
        { 
            stream: default
        }
    ]
}

Tip

As you can see nothing really explicitly tells you that count node publishes a single value. Refer to the documentation of each node to find outs what it publishes.

Here is a third example . The following node reads a csv file and publishes its contents as a dataset. Each line of the file will be converted to a row. Hence the published value is of type Dataset<Row>.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    {
           description:
           '''
           Read a CSV file and automatically generate the corresponding Dataset
           '''
            type: file_csv_input
            component: input
            settings: {
                file_path : ./AAPL.csv
            }
            publish: [
                {
                    stream: default
                }
            ]
        }

Subscribed Values

The subscribe field defines the list of value a node subscribes to. This basically defines the node input parameters.

Here is an example you are liley to use a lot : the show node. The following is a complete PML that will print out to stdout the Dataset generated by the csv_file_input node:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
  job: [
               {
                  description:
                  '''
                  Read a CSV file and automatically generate the corresponding Dataset
                  '''
                   type: file_csv_input
                   component: input
                   settings: {
                       file_path : ./AAPL.csv
                   }
                   publish: [
                       {
                           stream: default
                       }
                   ]
               }
               {
                   type: show
                   component: show
                   subscribe: [
                       {
                           stream: default
                           component: input
                       }
                   ]
               }
           ]
}

If you execute this PML you will get the following output:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
$ punchplatform-analytics.sh --job csv_files_to_stdout.hjson
SHOW:
+----------+----------+----------+----------+----------+----------+--------+
|       _c0|       _c1|       _c2|       _c3|       _c4|       _c5|     _c6|
+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
|2017-12-28|171.000000|171.850006|170.479996|171.080002|171.080002|16480200|
|2017-12-29|170.520004|170.589996|169.220001|169.229996|169.229996|25999900|
|2018-01-02|170.160004|172.300003|169.259995|172.259995|172.259995|25555900|
|2018-01-03|172.529999|174.550003|171.960007|172.229996|172.229996|29517900|
|2018-01-04|172.539993|173.470001|172.080002|173.029999|173.029999|22434600|
|2018-01-05|173.440002|175.369995|173.050003|175.000000|175.000000|23660000|
|2018-01-08|174.350006|175.610001|173.929993|174.350006|174.350006|20567800|
|2018-01-09|174.550003|175.059998|173.410004|174.330002|174.330002|21584000|
|2018-01-10|173.160004|174.300003|173.000000|174.289993|174.289993|23959900|
|2018-01-11|174.589996|175.490005|174.490005|175.279999|175.279999|18667700|
|2018-01-12|176.179993|177.360001|175.649994|177.089996|177.089996|25418100|
|2018-01-16|177.899994|179.389999|176.139999|176.190002|176.190002|29565900|
|2018-01-17|176.149994|179.250000|175.070007|179.100006|179.100006|34386800|
|2018-01-18|179.369995|180.100006|178.250000|179.259995|179.259995|31193400|
|2018-01-19|178.610001|179.580002|177.410004|178.460007|178.460007|32425100|
|2018-01-22|177.300003|177.779999|176.600006|177.000000|177.000000|27108600|
|2018-01-23|177.300003|179.440002|176.820007|177.039993|177.039993|32689100|
|2018-01-24|177.250000|177.300003|173.199997|174.220001|174.220001|51105100|
|2018-01-25|174.509995|174.949997|170.529999|171.110001|171.110001|41529000|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 20 rows

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)

If you are familiar with Spark or with any machine learning notebook, you will be at ease.

Disambiguating Parameters

As soon as you will write realistic jobs, you will hit the issue of disambiguating and naming your in and out parameters. Examples will make this clear. Here is a basic pml performing the sum of two values.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
  job: [
           {
               "type": "number_generator",
               "component": "my_first_number_generator",
               "settings": {
                   "value": 1.0
               },
               "publish": [ { "stream": "value" } ]
           },
           {
               "type": "number_generator",
               "component": "my_second_number_generator",
               "settings": {
                   "value": 2.0
               },
               "publish": [ { "stream": "value" } ]
           },
           {
               "type": "number_addition",
               "component": "additioner",
               "subscribe": [
                   {
                       "component": "my_first_number_generator",
                       "stream": "value"
                   },
                   {
                       "component": "my_second_number_generator",
                       "stream": "value"
                   }
               ],
               "publish": [ { "stream": "value" } ]
           }
       ]
}

This works fine, no need to disambiguate anything because the sum is a commutative operation, the additioner does not care about the order of input values it simply has to sum them.

Things are different with a division. There you would need to know which value is the denominator and which is the numerator. Here is how you disambiguate the one from the other using tags :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
    "type": "number_division",
    "component": "divisioner",
    "subscribe": [
        {
            "component": "my_first_number_generator",
            "stream": "value",
            "tag": "numerator"
        },
        {
            "component": "my_second_number_generator",
            "stream": "value",
            "tag": "denominator"
        }
    ],
    "publish": [ { "stream": "value" } ]
}

As you probably guess, the tag field in there makes it possible to associate a given role to each parameter. There roles (i.e. the value of each tag, here ) is specifically defined and documented by each node type. You will find something like this in the number_division node documentation:

For subscriptions:

Tag Mandatory Type Default Comment
numerator Yes Singleton None Numerator value
denominator Yes Singleton None Denominator value

For publications:

Tag Condition Type Default Comment
None None Singleton None The division result

Some nodes require more than tags. Consider the following node executing arbitrary formulaes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
    "type": "number_formula",
    "component": "formula",
    "settings": {
        "formulae": [
            "c = a / b",
            "d = c + 5"
        ]
    },
    "subscribe": [
        {
            "component": "my_first_number_generator",
            "stream": "value",
            "name": "a"
        },
        {
            "component": "my_second_number_generator",
            "stream": "value",
            "name": "b"
        }
    ],
    "publish": [
        {
            "stream": "value",
            "name": "d"
        }
    ]
}

Here we use names to specifically attribute a name to each input parameter. In turn you can refer to it in the node settings. This example also illustrate how the publish section can select what will be the pulished value.

Job Execution

To learn how to use the punchplatform-analytics.sh command to run jobs, please refer to the Execution Jobs section.