Skip to content

MLlib Node

Overview

The nodes and stages described in this section use official Mllib the Spark Machine Learning library.

Runtime Compatibility

  • PySpark :
  • Spark :

Mllib Node Layout

The mllib node can train a pipeline and use it on data. The node basically defines the pipeline. In that pipeline you can use so-called stages that are intermediary steps. PML lets you use the standard spark stages, and provides you with some additional ones as well.

Important

In contrast to other PML nodes, the mllib node use predefined streams. You must name the input stream either input_fit or input_transform, or both if you want to fit and transform some data. Similarly you must name the output streams output_transform or model to respectively forward the transformed data or the generated model.

Here is an example illustrating the overall Mllib node. As you can see there is an internal pipeline inside. In this example that pipeline contains only a single standard spark LogisticRegression stage.

---
type: mllib
component: mllib
settings:
  pipeline:
  - type: org.apache.spark.ml.classification.LogisticRegression
    settings:
      maxIter: 10
      regParam: 0.01
subscribe:
- component: some_input_node
  stream: input_fit
- component: some_other_input_node
  stream: input_transform
publish:
- stream: output_transform
- stream: model

Stages

Each stage has two mandatory properties type and settings as illustrated here:

# the type is the class name of the stage. PML leverages all
# standard spark stages, and provides additional punch specific stages.
type: org.apache.spark.ml.classification.LogisticRegression
# each stage has som specific configuration properties. These
# appear in a settings dictionary. Checkout spark javadoc
# for each parameter
settings:
 maxIter: 10
 regParam: 0.01

Punch Stages

The punch provides a few additional useful stages that you can insert into your pipeline.

Json Output Stage

It is often the case that a rows comes in containing a json string. What you then need is to convert it into typed fields, stored in additional columns of your dataset. For example say you have the input dataset:

+------------------------------+
|json                          |
+------------------------------+
|{"value": 10, "text": "hello"}|
|{"value": 10, "text": "world"}|
+------------------------------+

Applying the following json output stage:

---
type: org.thales.punch.pml.plugins.json.JsonOutputStage
settings:
  inputCol: json
  outputFields:
  - type: string
    field: text
  - type: integer
    field: value

will produce:

+------------------------------+-----+-----+
|json                          |text |value|
+------------------------------+-----+-----+
|{"value": 10, "text": "hello"}|hello|10   |
|{"value": 10, "text": "world"}|world|10   |
+------------------------------+-----+-----+

Inner Fields

If you have inner fields like the user age in the following example:

+-----------------------------------------------------+
|json                                                 |
+-----------------------------------------------------+
|{"value": 10, "text": "hello", "user" : { "age": 12}}|
|{"value": 10, "text": "world", "user" : { "age": 12}}|
+-----------------------------------------------------+
You can refer to it using the user.age notation.

type: integer
field: user.age 

Which will produce:

+-----------------------------------------------------+-----+-----+--------+
|json                                                 |text |value|user.age|
+-----------------------------------------------------+-----+-----+--------+
|{"value": 10, "text": "hello", "user" : { "age": 12}}|hello|10   |12      |
|{"value": 10, "text": "world", "user" : { "age": 12}}|world|10   |12      |
+-----------------------------------------------------+-----+-----+--------+

Renaming columns

You can control the name of your columns using the column property. I.e:

---
type: integer
field: user.age
column: age

Show Stage

Thew show stage lets you insert in your pipeline debug information to check the input or output of individual stage. They can be transparently added to your pipeline.

---
type: org.thales.punch.pml.plugins.mllib.ShowStage
settings:
  numCols: 10
  vertical: false
  truncate: true

Punch Stage

The PunchStage is very similar to the Punch node. It lets you run a punchlet on the traversing dataset.

---
type: org.thales.punch.ml.plugins.punch.PunchStage
settings:
  punchlet_code: |-
    {
        [sum] = ...;
    }
  output_columns:
  - type: integer
    field: sum

Advanced Settings

---
settings: 
 // you can provide a punchlet file instead of inline code
 punchlet_code_file: "path_to_punchlet.punch"

Vectorizer Stage

Numerous machine-learning algorithms can be used to analyse textual variable such as logs. However, these algorithms can only use numerical input data. It is often not possible to use directly text feature without transforming them into numerical feature first.

The vectorizer stage lets you create numerical features from character strings.

In general, there are two distinct categories of character strings variables:

  • With limited number of values: this kind of variable can be used without using Vectorizer but with a simple dichotomization (apply a org.apache.spark.ml.feature.StringIndexer and a org.apache.spark.ml.feature.OneHotEncoder).
  • With unlimited or unknown number of distinct values: this variable cannot be dichotomized so Vectorizer is adapted.

The punchplatform provides several features types:

Settings

  • inputCol: String

    Name of input column containing the input textual variable

  • outputCol: String

    Name of vector output column

  • features: List<Feature>

    None List of features to create

Change Stage

The change feature counts the number of sequences of characters corresponding to a regex.

---
type: change
settings:
  target:
  - a-z

Check Pattern Stage

The check_pattern checks if a text matches a regex pattern.

---
type: check_pattern
settings:
  target: "^.*word.*$"

Count Stage

The count feature counts the number of characters corresponding to a regex.

---
type: count
settings:
  target:
  - a-z

Distribution Stage

The distribution feature computes the distribution of characters defined by a regex, sorts occurrence numbers and returns the n highest.

---
type: distribution
settings:
  target:
  - a-z
  n: 10

Max Stream Length

The max_stream_length feature computes the length of the n longest streams of user-defined type of characters.

---
type: max_stream_length
settings:
  target:
  - a-z
  n: 10

Json Input Stage

---
type: org.thales.punch.pml.plugins.json.JsonInputStage
settings:
  outputCol: data
  inputFields:
  - type: json
    json_field: "[source]"
    dataset_column: source
  - type: string
    json_field: "[init][host][ip]"
    dataset_column: init_host_ip

Spark Stage

Refer to the Spark documentation for the numerous stages. We will consider one useful example to highlight how the parameters must be defined.

In the following example the KMeans stage is used. It is identified by its class name horg.apache.spark.ml.clustering.KMeans and the has a number of specific parameters of various type:

  • featureCol (String)
  • k (integer)
  • maxIter (integer)
  • predictionCol (String)
  • seed (long)
  • tol (double)
---
type: org.apache.spark.ml.clustering.KMeans
params:
  featureCol: features
  k: 5
  maxIter: 100
  predictionCol: prediction
  seed: 1234
  tol: 0.5

It is worth noticing a few points:

  • There is an automatic type conversion of the json parameters. For example in the KMeans stage above, the json seed value is casted to long.
  • You do not have to set all parameters, most spark stage provides adequate default values.
  • You must enforce CamelCase convention and use the same names as the ones documented in the Spark apis."

Dealing with Json Documents

This chapter describes how the punchplatform nodes and stages handle json documents. The punchplatform provides json input and output stage that allows you to easily convert json document into spark data and vice versa.

Json converter

Converter Types

Json converter types are used for two reasons:

  • Conversing json value into spark data is not trivial. In many cases, it is not possible to cast a json value to a spark object and inversely.

  • Spark MLib deals with typed data. It is thus required to find out and declare the type of the data before you can use it.

Example 1

Say you have a spark dataset with a string column containing json documents, and you want to apply a punchlet on those documents. The problem is that it is not possible to automatically determine if those strings contain or not valid json documents. Hence, you must choose the correct converter in the configuration.

Example 2

Say you have a spark dataset with a string column containing json documents. You want to create a new column of type long by extracting integer values out of these documents.

Here the problem is that it is not possible to automatically determine that you actually want to cast it to a long. Again you have to precise what you want by choosing the correct converter.

Supported Converters

Type Json type Spark type
double Double Double
integer Integer Integer
json Json String
string String String
vector Array org.apache.spark.ml.linalg.Vector
long Integer Long

Json Fields

Json fields are used for two reasons:

  • To extract a value from a json document and cast it to a spark type.
  • To cast a spark value to a json type and put it into a json document.

Settings

Parameter Mandatory Type Default Comment
type Yes String None A converter type
json_field No String dataset_column value Json field. It can be a name of any existing root key or a path at format [field_1][field_2]…”.
dataset_column” No String “json_field” value The name of dataset column created from this field

Example

---
type: integer
json_field: "[A][B]"
dataset_column: AB

Json Input Stage

The org.thales.punch.ml.plugins.json.JsonInputStage stage creates json documents from other dataset column.

Example

---
type: org.thales.punch.ml.plugins.json.JsonInputStage
settings:
  outputCol: json
  inputFields:
  - type: string
    dataset_column: C
  - type: json
    dataset_column: A
  - type: integer
    json_field: "[A][B]"
    dataset_column: AB

Settings

  • inputFields: List<JsonField>

    List of input fields

    • type: "type"

      All the type handle by the json input stage : string, boolean, double, json, vector, integer, long, wrapped_array_long, wrapped_array_double, wrapped_array_string

  • outputCol: String

    Name of json output column

Json Output Stage

The org.thales.punch.pml.plugins.json.JsonOutputStage stage create dataset columns from a column containing json documents.

example:

---
type: org.thales.punch.pml.plugins.json.JsonOutputStage
settings:
  inputCol: json
  outputFields:
  - type: string
    json_field: C
  - type: json
    json_field: A
  - type: integer
    json_field: "[A][B]"
    dataset_column: AB

Settings

  • inputCol: String

    Name of json input column

  • outputFields: List<JsonField>

    List of output fields

Dealing with Csv documents

This chapter describes how punchplatform nodes and stages handle csv documents.

Csv Converter

You have to use converters in order to cast spark object to csv value, for the same reasons as explained for json documents.

Supported Converters

Type Csv type Spark type
double Number Double
integer Number Integer
string String String
vector Array org.apache.spark.ml.linalg.Vector
long Number Long

Csv Field

You have two reasons to use csv field:

  • To extract a value from a csv document and cast it to a spark type.
  • To cast a spark value to a csv type and put it into a csv row.

Settings

  • type: String

    A converter type

  • dataset_column: String

    The name of dataset column created from this field

Example

---
type: string
dataset_column: C

Csv Input Stage

The org.thales.punch.pml.plugins.csv.CsvInputStage stage creates csv documents from other dataset column.

Example

---
type: org.thales.punch.pml.plugins.csv.CsvInputStage
settings:
  inputFields:
  - type: string
    dataset_column: C
  - type: string
    dataset_column: A
  - type: integer
    dataset_column: AB
  outputCol: csv

Settings

  • inputFields: List<CsvField>

    List of input fields

  • outputCol: String

    Name of csv output column

Csv Output Stage

The stage org.thales.punch.pml.plugins.csv.CsvOutputStage create dataset columns from a column containing json documents.

Example

---
type: org.thales.punch.pml.plugins.csv.CsvOutputStage
settings:
  inputCol: csv
  outputFields:
  - type: string
    dataset_column: C
  - type: string
    dataset_column: A
  - type: integer
    dataset_column: AB

Settings

  • inputCol: String

    Name of csv input column

  • outputFields: List<CsvField>

    List of output fields