Skip to content

MLlib Node

The nodes and stages described in this section use official Mllib the Spark Machine Learning library.

Mllib Node Layout

The mllib node can train a pipeline and use it on data. The node basically defines the pipeline. In that pipeline you can use so-called stages that are intermediary steps. PML lets you use the standard spark stages, and provides you with some additional ones as well.

Important

In contrast to other PML nodes, the mllib node use predefined streams. You must name the input stream either input_fit or input_transform, or both if you want to fit and transform some data. Similarly you must name the output streams output_transform or model to respectively forward the transformed data or the generated model.

Here is an example illustrating the overall Mllib node. As you can see there is an internal pipeline inside. In this example that pipeline contains only a single standard spark LogisticRegression stage.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
    type: mllib
    component: mllib
    settings: {
        // The mllib pipeline is composed of a sequence of stages.
        // In this example only one logistic regression stage is used.
        pipeline: [
            {
                type: org.apache.spark.ml.classification.LogisticRegression
                settings: {
                    maxIter: 10
                    regParam: 0.01
                }
            }
        ]
    }
    subscribe: [
        {
            component: some_input_node
            stream: input_fit
        }
        {
            component: some_other_input_node
            stream: input_transform
        }
    ]
    publish: [
        {
            stream: output_transform
        }
        {
            stream: model
        }
    ]
}

Stages

Each stage has two mandatory properties type and settings as illustrated here:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
    // the type is the class name of the stage. PML leverages all
    // standard spark stages, and provides additional punch specific stages.
    type: org.apache.spark.ml.classification.LogisticRegression

    // each stage has som specific configuration properties. These
    // appear in a settings dictionary. Checkout spark javadoc
    // for each parameter
    settings: {
        maxIter: 10
        regParam: 0.01
    }
}

Punch Stages

The punch provides a few additional useful stages that you can insert into your pipeline.

Json Output Stage

It is often the case that a rows comes in containing a json string. What you then need is to convert it into typeds fields, stored in additional columns of your dataset. For example say you have the input dataset:

1
2
3
4
5
6
+------------------------------+
|json                          |
+------------------------------+
|{"value": 10, "text": "hello"}|
|{"value": 10, "text": "world"}|
+------------------------------+

Applying the following json output stage:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
            {
              type: org.thales.punch.pml.plugins.json.JsonOutputStage
              settings: {
                inputCol: json
                outputFields: [
                  {
                    type: string
                    field: text
                  }
                  {
                    type: integer
                    field: value
                  }
                ]
              }
            }

will produce:

+------------------------------+-----+-----+ |json |text |value| +------------------------------+-----+-----+ |{"value": 10, "text": "hello"}|hello|10 | |{"value": 10, "text": "world"}|world|10 | +------------------------------+-----+-----+

Inner Fields

SHould you have inner fields like the user age in the following example:

1
2
3
4
5
6
+-----------------------------------------------------+
|json                                                 |
+-----------------------------------------------------+
|{"value": 10, "text": "hello", "user" : { "age": 12}}|
|{"value": 10, "text": "world", "user" : { "age": 12}}|
+-----------------------------------------------------+

You can refer to these using the user.age notation.

1
2
3
4
    {
        type: integer
        field: user.age
    }

Which will produce:

1
2
3
4
5
6
+-----------------------------------------------------+-----+-----+--------+
|json                                                 |text |value|user.age|
+-----------------------------------------------------+-----+-----+--------+
|{"value": 10, "text": "hello", "user" : { "age": 12}}|hello|10   |12      |
|{"value": 10, "text": "world", "user" : { "age": 12}}|world|10   |12      |
+-----------------------------------------------------+-----+-----+--------+

Renaming columns

You can control the name of your columns using the column property. I.e:

1
2
3
4
5
    {
        type: integer
        field: user.age
        column: age
    }

Show Stage

Thew show stage lets you insert in your pipeline debug information to check the input or ouptput of individual stage. They can be transparently added to your pipeline.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    type: org.thales.punch.pml.plugins.mllib.ShowStage
    settings: {
        // Optional the number of rows to print
        numCols: 10
        // Optional if true print one columns per line
        vertical: false
        // Optional truncate long string values
        truncate: true
    }
}

Punch Stage

The PunchStage is very similar to the Punch node. It lets you run a punchlet on the traversing dataset.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
    type: org.thales.punch.ml.plugins.punch.PunchStage
    settings: {
        punchlet_code:
        '''
        {
            [sum] = ...;
        }
        '''
        output_columns: [
            {
                type: integer
                field: sum
            }
        ]
    }
}

Advanced Settings

1
2
3
4
    settings: {
        // you can provide a punchlet file instead of inline code
        punchlet_code_file: "path_to_punchlet.punch"
    }

Vectorizer Stage

Numerous machine-learning algorithms can be used to analyse textual variable such as logs. However, these algorithms can only use numerical input data. It is often not possible to use directly text feature without transforming them into numerical feature first.

The vectorizer stage lets you create numerical features from character strings.

In general, there are two distinct categories of character strings variables:

  • With limited number of values: this kind of variable can be used without using Vectorizer but with a simple dichotomization (apply a org.apache.spark.ml.feature.StringIndexer and a org.apache.spark.ml.feature.OneHotEncoder).
  • With unlimited or unknow number of distinct values: this variable cannot be dichotomized so Vectorizer is adapted.

The punchplatform provides several features types:

Settings

  • inputCol: String

    Name of input column containing the input textual variable

  • outputCol: String

    Name of vector output column

  • features: List<Feature>

    None List of features to create

Change Stage

The change feature counts the number of sequences of characters corresponding to a regex.

1
2
3
4
5
6
7
{
    type: change
    settings: {
        // Regex defining the type of character to count
        target: [a-z]
    }
}

Check Pattern Stage

The check_pattern checks if a text matches a regex pattern.

1
2
3
4
5
6
7
{
    type: "check_pattern",
    settings: {
        // the regex to test textual variable
        target: ^.*word.*$
    }
}

Count Stage

The count feature counts the number of characters corresponding to a regex.

1
2
3
4
5
6
7
{
    type: count
    settings: {
        // Regex defining the type of character to count
        target: [a-z]
    }
}

Distribution Stage

The distribution feature computes the distribution of characters defined by a regex, sorts occurrence numbers and returns the n highest.

1
2
3
4
5
6
7
8
9
{
    type: distribution
    settings: {
        // Regex defining the type of character to count
        target: [a-z]
        // Number of distribution to return
        n: 10
    }
}

Max Stream Length

The max_stream_length feature computes the length of the n longest stream of user-defined type of characters.

1
2
3
4
5
6
7
8
9
{
    type: max_stream_length
    settings: {
        // Regex defining the type of character to count
        target: [a-z]
        // Number of distribution to return
        n: 10
    }
}

Json Input Stage

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
    "type": "org.thales.punch.pml.plugins.json.JsonInputStage",
    "settings": {
        "outputCol": "data",
        "inputFields": [
            {
                "type": "json",
                "json_field": "[source]",
                "dataset_column": "source"
            },
            {
                "type": "string",
                "json_field": "[init][host][ip]",
                "dataset_column": "init_host_ip"
            }
        ]
    }
}

Spark Stage

Refer to the Spark documentation for the numerous stages. We will consider one useful example to highlight how the parameters must be defined.

In the following example the KMeans stage is used. It is identified by its class name horg.apache.spark.ml.clustering.KMeans and the has a number of specific parameters of various type:

  • featureCol (String)
  • k (integer)
  • maxIter (integer)
  • predictionCol (String)
  • seed (long)
  • tol (double)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    type: org.apache.spark.ml.clustering.KMeans
    params: {
        featureCol: features
        k: 5
        maxIter: 100
        predictionCol: prediction
        seed: 1234
        tol: 0.5
    }
}

It is worth noticing a few points:

  • There is an automatic type conversion of the json parameters. For example in the KMeans stage above, the json seed value is casted to long.
  • You do not have to set all parameters, most spark stage provides adequate default values.
  • You must enforce CamelCase convention and use the same names as the ones documented in the Spark apis."

Dealing with Json Documents

This chapter describes how the punchplatform nodes and stages handle json documents. The punchplatform provides json input and output stage that allows you to easily convert json document into spark data and vice versa.

Json converter

Converter Types

Json converter types are used for two reasons:

  • Conversing json value into spark data is not trivial. In many cases, it is not possible to cast a json value to a spark object and inversely.

  • Spark MLib deals with typed data. It is thus required to find out and declare the type of the data before you can use it.

Example 1

Say you have a spark dataset with a string column containing json documents and you want to apply a punchlet on those documents. The problem is that it is not possible to automatically determine if those strings contain or not valid json documents. Hence you must precise that by choosing the correct converter in the configuration.

Example 2

Say you have a spark dataset with a string column containing json documents. You want to create a new column of type long by extracting integer values out of these documents.

Here the problem is that it is not possible to automatically determine that you actually want to cast it to a long. Again you have to precise what you want by choosing the correct converter.

Supported Converters

 Type  Json type   Spark type
 double  Double  Double
 integer  Integer  Integer
 json  Json  String
 string  String  String
 vector  Array  org.apache.spark.ml.linalg.Vector
 long  Integer  Long

Json Fields

Json fields are used for two reasons:

  • To extract a value from a json document and cast it to a spark type.
  • To cast a spark value to a json type and put it into a json document.

Settings

Parameter Mandatory Type Default Comment
type Yes String None A converter type
json_field No String dataset_column value Json field. It can be a name of any existing root key or a path at format [field_1][field_2]…”.
dataset_column” No String “json_field” value The name of dataset column created from this field

Example

1
2
3
4
5
{
    "type": "integer",
    "json_field": "[A][B]",
    "dataset_column": "AB"
}

Json Input Stage

The org.thales.punch.ml.plugins.json.JsonInputStage stage creates json documents from other dataset column.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "type": "org.thales.punch.ml.plugins.json.JsonInputStage",
    "settings": {
        "outputCol": "json",
        "inputFields": [
            { "type": "string", "dataset_column": "C" },
            { "type": "json", "dataset_column": "A" },
            { "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
        ]
    }
}

Settings

  • inputFields: List<JsonField>

    List of input fields

    • type : "type"

      All the type handle by the json input stage : string, boolean, double, json, vector, integer, long, wrapped_array_long, wrapped_array_double, wrapped_array_string

  • outputCol: String

    Name of json output column

Json Output Stage

The org.thales.punch.pml.plugins.json.JsonOutputStage stage create dataset columns from a column containing json documents.

example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "type": "org.thales.punch.pml.plugins.json.JsonOutputStage",
    "settings": {
        "inputCol": "json",
        "outputFields": [
            { "type": "string", "json_field": "C" },
            { "type": "json", "json_field": "A" },
            { "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
        ]
    }
}

Settings

  • inputCol: String

    Name of json input column

  • outputFields: List<JsonField>

    List of output fields

Dealing with Csv documents

This chapter describes how punchplatform nodes and stages handle csv documents.

Csv Converter

You have to use converters in order to cast spark object to csv value, for the same reasons as explained for json documents.

Supported Converters

Type Csv type  Spark type
double Number Double
integer Number Integer
string String String
vector Array org.apache.spark.ml.linalg.Vector
long Number Long

Csv Field

You have two reasons to use csv field:

  • To extract a value from a csv document and cast it to a spark type.
  • To cast a spark value to a csv type and put it into a csv row.

Settings

  • type: String

    A converter type

  • dataset_column: String

    The name of dataset column created from this field

Example

1
2
3
4
{
    "type": "string",
    "dataset_column": "C"
}

Csv Input Stage

The org.thales.punch.pml.plugins.csv.CsvInputStage stage creates csv documents from other dataset column.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "type": "org.thales.punch.pml.plugins.csv.CsvInputStage",
    "settings": {
        "inputFields": [
            { "type": "string", "dataset_column": "C" },
            { "type": "string", "dataset_column": "A" },
            { "type": "integer", "dataset_column": "AB" }
        ],
        "outputCol": "csv"
    }
}

Settings

  • inputFields: List<CsvField>

    List of input fields

  • outputCol: String

    Name of csv output column

Csv Output Stage

The stage org.thales.punch.pml.plugins.csv.CsvOutputStage create dataset columns from a column containing json documents.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "type": "org.thales.punch.pml.plugins.csv.CsvOutputStage",
    "settings": {
        "inputCol": "csv",
        "outputFields": [
            { "type": "string", "dataset_column": "C" },
            { "type": "string", "dataset_column": "A" },
            { "type": "integer", "dataset_column": "AB" }
        ]
    }
}

Settings

  • inputCol: String

    Name of csv input column

  • outputFields: List<CsvField>

    List of output fields