MLlib Node¶
Overview¶
The nodes and stages described in this section use official Mllib the Spark Machine Learning library.
Runtime Compatibility¶
- PySpark : ❌
- Spark : ✅
Mllib Node Layout¶
The mllib
node can train a pipeline and use it on data. The node basically defines the pipeline.
In that pipeline you can use so-called stages that are intermediary steps. PML lets you use
the standard spark stages, and provides you with some additional ones as well.
Important
In contrast to other PML nodes, the mllib node use predefined streams. You must
name the input stream either input_fit
or input_transform
, or both if you want to fit and transform some data. Similarly you must name the output streams
output_transform
or model
to respectively forward the transformed data or the generated model.
Here is an example illustrating the overall Mllib node. As you can see there is an internal pipeline inside. In this example that pipeline contains only a single standard spark LogisticRegression stage.
---
type: mllib
component: mllib
settings:
pipeline:
- type: org.apache.spark.ml.classification.LogisticRegression
settings:
maxIter: 10
regParam: 0.01
subscribe:
- component: some_input_node
stream: input_fit
- component: some_other_input_node
stream: input_transform
publish:
- stream: output_transform
- stream: model
Stages¶
Each stage has two mandatory properties type
and settings
as illustrated here:
# the type is the class name of the stage. PML leverages all
# standard spark stages, and provides additional punch specific stages.
type: org.apache.spark.ml.classification.LogisticRegression
# each stage has som specific configuration properties. These
# appear in a settings dictionary. Checkout spark javadoc
# for each parameter
settings:
maxIter: 10
regParam: 0.01
Punch Stages¶
The punch provides a few additional useful stages that you can insert into your pipeline.
Json Output Stage¶
It is often the case that a rows comes in containing a json string. What you then need is to convert it into typed fields, stored in additional columns of your dataset. For example say you have the input dataset:
+------------------------------+
|json |
+------------------------------+
|{"value": 10, "text": "hello"}|
|{"value": 10, "text": "world"}|
+------------------------------+
Applying the following json output stage:
---
type: org.thales.punch.pml.plugins.json.JsonOutputStage
settings:
inputCol: json
outputFields:
- type: string
field: text
- type: integer
field: value
will produce:
+------------------------------+-----+-----+
|json |text |value|
+------------------------------+-----+-----+
|{"value": 10, "text": "hello"}|hello|10 |
|{"value": 10, "text": "world"}|world|10 |
+------------------------------+-----+-----+
Inner Fields¶
If you have inner fields like the user age in the following example:
+-----------------------------------------------------+
|json |
+-----------------------------------------------------+
|{"value": 10, "text": "hello", "user" : { "age": 12}}|
|{"value": 10, "text": "world", "user" : { "age": 12}}|
+-----------------------------------------------------+
user.age
notation.
type: integer
field: user.age
Which will produce:
+-----------------------------------------------------+-----+-----+--------+
|json |text |value|user.age|
+-----------------------------------------------------+-----+-----+--------+
|{"value": 10, "text": "hello", "user" : { "age": 12}}|hello|10 |12 |
|{"value": 10, "text": "world", "user" : { "age": 12}}|world|10 |12 |
+-----------------------------------------------------+-----+-----+--------+
Renaming columns¶
You can control the name of your columns using the column
property. I.e:
---
type: integer
field: user.age
column: age
Show Stage¶
Thew show stage lets you insert in your pipeline debug information to check the input or output of individual stage. They can be transparently added to your pipeline.
---
type: org.thales.punch.pml.plugins.mllib.ShowStage
settings:
numCols: 10
vertical: false
truncate: true
Punch Stage¶
The PunchStage is very similar to the Punch node. It lets you run a punchlet on the traversing dataset.
---
type: org.thales.punch.ml.plugins.punch.PunchStage
settings:
punchlet_code: |-
{
[sum] = ...;
}
output_columns:
- type: integer
field: sum
Advanced Settings¶
---
settings:
// you can provide a punchlet file instead of inline code
punchlet_code_file: "path_to_punchlet.punch"
Vectorizer Stage¶
Numerous machine-learning algorithms can be used to analyse textual variable such as logs. However, these algorithms can only use numerical input data. It is often not possible to use directly text feature without transforming them into numerical feature first.
The vectorizer
stage lets you create numerical features from character strings.
In general, there are two distinct categories of character strings variables:
- With limited number of values: this kind of variable can be used without using Vectorizer
but with a simple dichotomization (apply a
org.apache.spark.ml.feature.StringIndexer
and aorg.apache.spark.ml.feature.OneHotEncoder
). - With unlimited or unknown number of distinct values: this variable cannot be dichotomized so Vectorizer is adapted.
The punchplatform provides several features types:
Settings¶
inputCol
: StringName of input column containing the input textual variable
outputCol
: StringName of vector output column
features
: List<Feature>None List of features to create
Change Stage¶
The change
feature counts the number of sequences of characters corresponding to a regex.
---
type: change
settings:
target:
- a-z
Check Pattern Stage¶
The check_pattern
checks if a text matches a regex pattern.
---
type: check_pattern
settings:
target: "^.*word.*$"
Count Stage¶
The count
feature counts the number of characters corresponding to a regex.
---
type: count
settings:
target:
- a-z
Distribution Stage¶
The distribution
feature computes the distribution of characters defined
by a regex, sorts occurrence numbers and returns the n highest.
---
type: distribution
settings:
target:
- a-z
n: 10
Max Stream Length¶
The max_stream_length
feature computes the length of the n longest
streams of user-defined type of characters.
---
type: max_stream_length
settings:
target:
- a-z
n: 10
Json Input Stage¶
---
type: org.thales.punch.pml.plugins.json.JsonInputStage
settings:
outputCol: data
inputFields:
- type: json
json_field: "[source]"
dataset_column: source
- type: string
json_field: "[init][host][ip]"
dataset_column: init_host_ip
Spark Stage¶
Refer to the Spark documentation for the numerous stages. We will consider one useful example to highlight how the parameters must be defined.
In the following example the KMeans stage
is used. It is identified by its class name horg.apache.spark.ml.clustering.KMeans
and the has a number of specific
parameters of various type:
- featureCol (String)
- k (integer)
- maxIter (integer)
- predictionCol (String)
- seed (long)
- tol (double)
---
type: org.apache.spark.ml.clustering.KMeans
params:
featureCol: features
k: 5
maxIter: 100
predictionCol: prediction
seed: 1234
tol: 0.5
It is worth noticing a few points:
- There is an automatic type conversion of the json parameters. For example in the KMeans stage above, the json
seed
value is casted to long. - You do not have to set all parameters, most spark stage provides adequate default values.
- You must enforce CamelCase convention and use the same names as the ones documented in the Spark apis."
Dealing with Json Documents¶
This chapter describes how the punchplatform nodes and stages handle json documents. The punchplatform provides json input and output stage that allows you to easily convert json document into spark data and vice versa.
Json converter¶
Converter Types¶
Json converter types are used for two reasons:
-
Conversing json value into spark data is not trivial. In many cases, it is not possible to cast a json value to a spark object and inversely.
-
Spark MLib deals with typed data. It is thus required to find out and declare the type of the data before you can use it.
Example 1
Say you have a spark dataset with a string column containing json documents, and you want to apply a punchlet on those documents. The problem is that it is not possible to automatically determine if those strings contain or not valid json documents. Hence, you must choose the correct converter in the configuration.
Example 2
Say you have a spark dataset with a string column containing json documents. You want to create a new column of type long by extracting integer values out of these documents.
Here the problem is that it is not possible to automatically determine that you actually want to cast it to a long. Again you have to precise what you want by choosing the correct converter.
Supported Converters¶
Type | Json type | Spark type |
---|---|---|
double | Double | Double |
integer | Integer | Integer |
json | Json | String |
string | String | String |
vector | Array |
org.apache.spark.ml.linalg.Vector |
long | Integer | Long |
Json Fields¶
Json fields are used for two reasons:
- To extract a value from a json document and cast it to a spark type.
- To cast a spark value to a json type and put it into a json document.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
type | Yes | String | None | A converter type |
json_field | No | String | dataset_column | value Json field. It can be a name of any existing root key or a path at format [field_1][field_2]…”. |
dataset_column” | No | String | “json_field” | value The name of dataset column created from this field |
Example
---
type: integer
json_field: "[A][B]"
dataset_column: AB
Json Input Stage¶
The org.thales.punch.ml.plugins.json.JsonInputStage
stage creates json documents from other dataset column.
Example
---
type: org.thales.punch.ml.plugins.json.JsonInputStage
settings:
outputCol: json
inputFields:
- type: string
dataset_column: C
- type: json
dataset_column: A
- type: integer
json_field: "[A][B]"
dataset_column: AB
Settings¶
inputFields
: List<JsonField>List of input fields
type
: "type"All the type handle by the json input stage : string, boolean, double, json, vector, integer, long, wrapped_array_long, wrapped_array_double, wrapped_array_string
outputCol
: StringName of json output column
Json Output Stage¶
The org.thales.punch.pml.plugins.json.JsonOutputStage
stage create dataset columns from a column containing json documents.
example:
---
type: org.thales.punch.pml.plugins.json.JsonOutputStage
settings:
inputCol: json
outputFields:
- type: string
json_field: C
- type: json
json_field: A
- type: integer
json_field: "[A][B]"
dataset_column: AB
Settings¶
inputCol
: StringName of json input column
outputFields
: List<JsonField>List of output fields
Dealing with Csv documents¶
This chapter describes how punchplatform nodes and stages handle csv documents.
Csv Converter¶
You have to use converters in order to cast spark object to csv value, for the same reasons as explained for json documents.
Supported Converters¶
Type | Csv type | Spark type |
---|---|---|
double | Number | Double |
integer | Number | Integer |
string | String | String |
vector | Array | org.apache.spark.ml.linalg.Vector |
long | Number | Long |
Csv Field¶
You have two reasons to use csv field:
- To extract a value from a csv document and cast it to a spark type.
- To cast a spark value to a csv type and put it into a csv row.
Settings¶
type
: StringA converter type
dataset_column
: StringThe name of dataset column created from this field
Example
---
type: string
dataset_column: C
Csv Input Stage¶
The org.thales.punch.pml.plugins.csv.CsvInputStage
stage creates csv documents from other dataset column.
Example
---
type: org.thales.punch.pml.plugins.csv.CsvInputStage
settings:
inputFields:
- type: string
dataset_column: C
- type: string
dataset_column: A
- type: integer
dataset_column: AB
outputCol: csv
Settings¶
inputFields
: List<CsvField>List of input fields
outputCol
: StringName of csv output column
Csv Output Stage¶
The stage org.thales.punch.pml.plugins.csv.CsvOutputStage
create dataset columns from a column containing json documents.
Example
---
type: org.thales.punch.pml.plugins.csv.CsvOutputStage
settings:
inputCol: csv
outputFields:
- type: string
dataset_column: C
- type: string
dataset_column: A
- type: integer
dataset_column: AB
Settings¶
inputCol
: StringName of csv input column
outputFields
: List<CsvField>List of output fields