Skip to content

Pml python API

Punch Machine Learning has just been updated with an all new backend flavour : Python. This page describe the new opportunities offered by this variant of the PML engine.

Introducing Pyspark

If you are familiar with the standard Java/Scala PML you might have noticed that it's based on Spark, this new version of PML keep this tradition and is based on Pyspark. This is the python implementation of all that is proposed by Scala spark, but in python.

What does it changes for me ?

PML pyspark uses the same configuration files (.pml) as the traditional PML, all nodes available in java will be available here too, if you design a configuration file for the java PML, it should be able to be executed with the python backend. So a user that only configure pml files will almost no differences when using this new feature.

But, what if you would like to go further, to have your own specific nodes that can cover more unique needs, let's say deep learning for example? This is what this engine was designed to answer by introducing python, a user friendly and powerful language that is established as the most used languages for machine learning / big data related projects.

Executing a pml with pyspark

The punchplatform provide a script named punchplatform-pyspark that will run the PML, it behave like the analytics script, for example to run an example called "dataset_generator.pml" stored in the current directory, the command is :

1
punchplatform-pyspark --job dataset_generator.pml

The file path can be either complete or relative.

Adding a python node

Information about adding a new node is available in this page

PML Python core documentation

A detailed sphinx doc is under work to document the code in-depth (like javadoc), the current code documentation is available here. note that its still a work in progress and its not very pretty yet.

Node structure

Base node template

This is the basic template for a new node creation :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from core.holders.input_holder import InputHolder
from core.holders.output_holder import OutputHolder
from core.node import AbstractNode

class RandomSplit(AbstractNode):

    def __init__(self):
        super().__init__()
        # use this section to initialize class parameters

    def execute(self, input_data: InputHolder, output: OutputHolder):
        # The code inside this method will be executed by the pml engine
        # From here you are free to construct your code freely : 
        # raw code, methods, classes, external files, etc.
        return None

Additional information:

Settings present in the "setting" section of the pml can be accessible after having specified them in the class def __init__ with the method declare_param(param_name) or one of its derivative. Once declared, they are accessible through the class variable self.settings[param_name].

to publish your stream use : output.set(my_var) this method will send the data contain in my_var in the stream, to the next node.

Spark context can be accessed with the self.context variable if needed

Node example

Here is a complete custom self example :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from core.holders.input_holder import InputHolder
from core.holders.output_holder import OutputHolder
from core.node import AbstractNode

class RandomSplit(AbstractNode):
    def __init__(self):
        super().__init__()
        # Note that name is required, other args are optional
        self.declare_float_param(name="random_seed", required=False, default=None)
        self.declare_float_list_param(name="weights", required=False, default=[0.5, 0.5])

        # optional, needed only if using the punch graphic editor 
        self.declare_dataframe_input()
        self.declare_dataframe_output("left")
        self.declare_dataframe_output("right")

    def execute(self, input_data: InputHolder, output: OutputHolder):
        df = input_data.get() # gather dataset from the entry stream
        f_w = []
        ws = self.settings["weights"] # get the weight value from the .pml file

         # force type to float (required by this specific split function)
        for w in ws:
            f_w.append(float(w)) 
        r = float(self.settings["random_seed"]) 

        split = df.randomSplit(weights=f_w, seed=r) # Split the entry dataset with a spark method
        if len(output.streams) == len(split):
            i = 0
            for key in list(output.streams.keys()):
                output.set(value=split[i], alias=key)
                i = i + 1
        else:
            print("Error, The number of output streams does not match the number of splits datasets")
        return None

And the configuration file that would use it (along with other nodes : show and dataset_generator):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
{
  job: [
        {
            type: dataset_generator
            component: input
            publish: [
                {
                    stream: data
                }
            ],
            settings: {
                input_data: [
                    {
                        Message: Hello world 1
                    }
                                        {
                        Message: Hello world 2
                    }
                                        {
                        Message: Hello world 3
                    }
                                        {
                        Message: Hello world 4
                    }
                                        {
                        Message: Hello world 5
                    }
                                        {
                        Message: Hello world 6
                    }
                                        {
                        Message: Hello world 7
                    }
                                        {
                        Message: Hello world 8
                    }
                                        {
                        Message: Hello world 9
                    }
                                        {
                        Message: Hello world 10
                    }
                ]
            }       
        },
        { 
            type: random_split
            component: random_split
            settings: 
            { 
                random_seed: 42
                weights: ["0.6","0.4"]
            }
            publish: [
                {
                    stream: left
                }
                {
                    stream: right
                }
            ]
            subscribe: [
                {
                    component: input
                    stream: data
                    alias: uno
                }
            ]
        },

        {
            type: show
            component: show
            subscribe: [
                {
                    component: random_split
                    stream: left

                }
                {
                    component:random_split
                    stream: right
                }
            ]
        }
    ]
}