Nodes and Stages Configuration

In this chapter, we explain how to design complete Spark pipelines from the library of nodes and stages provided by PunchPlatform.

Remember nodes are your pipeline (i.e. graph) functions. Some will be in charge of fetching the data, other in charge of performing transformations, yet others will be in charge of saving the results to some database or files. The PunchPlatform ships in with several nodes, described next. By combining nodes, a graph can cover many analytics functional use cases.

General

Subscription Notation

A node subscription is typed, can be tagged and can be named. Let’s note the type ‘T’, there are three possible subscription cardinalities. In this documentation, they are noted:

  • Singleton<T>: A tag associated to a Singleton subscription must be used no more than once.
  • List<T>: A tag associated to a List subscription can be repeated
  • Map<T>: A tag associated to a Map subscription can be repeated and all its subscriptions must be named.
{

        "subscribe": [

                # An untagged Singleton<T> subscription
                {"component": "component_A", "field": "field_1"},

                # A tagged Singleton<T> subscription
                {"component": "component_A", "field": "field_2", "tag": "tag_1"},

                # Tagged List<T> subscriptions
                {"component": "component_A", "field": "field_3", "tag": "tag_2"},
                {"component": "component_A", "field": "field_4", "tag": "tag_2"},

                # Tagged Map<T> subscriptions
                {"component": "component_A", "field": "field_3", "tag": "tag_2", "name": "name_1"},
                {"component": "component_A", "field": "field_4", "tag": "tag_2", "name": "name_2"}

        ]

}

Publication Notation

As subscriptions, a node publication is typed, can be tagged and can be named. Let’s note this type ‘T’, there are two possible publication cardinalities. In this documentation, they are noted:

  • Singleton<T>: A tag associated to a Singleton publication must be used no more than once.
  • Map<T>: A tag associated to a Map subscription can be repeated and all its publications must be named.
{

        "publish": [

                # An untagged Singleton<T> publication
                {"field": "field_1"},

                # A tagged Singleton<T> publication
                {"field": "field_2", "tag": "tag_1"},

                # Tagged Map<T> publications
                {"field": "field_3", "tag": "tag_2", "name": "name_1"},
                {"field": "field_4", "tag": "tag_2", "name": "name_2"}

        ]

}

Json Document

This chapter describe how punchplatform nodes and stages handle json documents.

Json Converter

Json converter are used for two reasons:

  • Because conversion from json value to spark value is not trivial. In many cases, it is not possible to cast a json value to a spark object and inversely.
  • To configure the type of data before looking at it. It’s a spark Mllib requirement.

Example 1

Let’s say, you have a spark dataset with a string column containing json documents and you want apply a punchlet on those documents. The problem is that it is not possible to automatically determine if those strings contain json document or random text. So you have to precise it by choosing the correct converter in the configuration.

Example 2

Let’s say, you have a spark dataset with a string column containing json documents. You want to create a new column of type long by extracting integer values from those documents. The problem is that it is not possible to automatically determine if you want to cast it to integer or to long. So you have to precise it by choosing the correct converter in the configuration.

Possible Converters

Type Json type Spark type
“double” Double Double
“integer” Integer Integer
“json” Json String
“string” String String
“vector” Array<Double> org.apache.spark.ml.linalg.Vector
“long” Integer Long

Json Field

Json fields are used for two reasons:

  • To extract a value from a json document and cast it to a spark type.
  • To cast a spark value to a json type and put it into a json document.

Settings

Parameter Mandatory Type Default Comment
“type” Yes String None A converter type
“json_field” No String “dataset_column” value Json field. It can be a name of any existing root key or a path at format “[field_1][field_2]…”.
“dataset_column” No String “json_field” value The name of dataset column created from this field

Example

{
        "type": "integer",
        "json_field": "[A][B]",
        "dataset_column": "AB"
}

Json Input Stage

The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonInputStage” creates json documents from other dataset column.

Settings

Parameter Mandatory Type Default Comment
“inputFields” No List<JsonField> None List of input fields
“outputCol” Yes String None Name of json output column

Example

{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.jackson.JsonInputStage",
        "settings": {
                "outputCol": "json",
                "inputFields": [
                        { "type": "string", "dataset_column": "C" },
                        { "type": "json", "dataset_column": "A" },
                        { "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
                ]
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/json_input_stage.json --spark-master local[*] --deploy-mode client

Json Output Stage

The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonOutputStage” create dataset columns from a column containing json documents.

Settings

Parameter Mandatory Type Default Comment
“inputCol” Yes String None Name of json input column
“outputFields” No List<JsonField> None List of output fields

Example

{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonOutputStage",
        "settings": {
                "inputCol": "json",
                "outputFields": [
                        { "type": "string", "json_field": "C" },
                        { "type": "json", "json_field": "A" },
                        { "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
                ]
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/json_output_stage.json --spark-master local[*] --deploy-mode client

Csv Document

This chapter describe how punchplatform nodes and stages handle csv documents.

Csv Converter

For the same reasons as with json documents, you have to use converters in order to cast spark object to csv value.

Possible Converters

Type Csv type Spark type
“double” Number Double
“integer” Number Integer
“string” String String
“vector” Array org.apache.spark.ml.linalg.Vector
“long” Number Long

Csv Field

A csv field be used for two reasons:

  • To extract a value from a csv document and cast it to a spark type.
  • To cast a spark value to a csv type and put it into a csv row.

Settings

Parameter Mandatory Type Default Comment
“type” Yes String None A converter type
“dataset_column” No String None The name of dataset column created from this field

Example

{
        "type": "string",
        "dataset_column": "C"
}

Csv Input Stage

The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvInputStage” creates csv documents from other dataset column.

Settings

Parameter Mandatory Type Default Comment
“inputFields” No List<CsvField> None List of input fields
“outputCol” Yes String None Name of csv output column

Example

{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvInputStage",
        "settings": {
                "inputFields": [
                        { "type": "string", "dataset_column": "C" },
                        { "type": "string", "dataset_column": "A" },
                        { "type": "integer", "dataset_column": "AB" }
                ],
                "outputCol": "csv"
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/csv_input_stage.json --spark-master local[*] --deploy-mode client

Csv Output Stage

The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvOutputStage” create dataset columns from a column containing json documents.

Settings

Parameter Mandatory Type Default Comment
“inputCol” Yes String None Name of csv input column
“outputFields” No List<CsvField> None List of output fields

Example

{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvOutputStage",
        "settings": {
                "inputCol": "csv",
                "outputFields": [
                        { "type": "string", "dataset_column": "C" },
                        { "type": "string", "dataset_column": "A" },
                        { "type": "integer", "dataset_column": "AB" }
                ]
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/csv_output_stage.json --spark-master local[*] --deploy-mode client

Elastic

Elastic Input Node

The “elastic_input” node reads an unique document from an elastic cluster and publish it as a string. This reading is not distributed, it’s done from the application spark driver with the elastic http api. This means each node subscribing to this one must expect receive a string value and not a dataset.

Settings

Parameter Mandatory Type Default Comment
“index” Yes String None Index Name
“type” Yes String None Type Name
“id” Yes String None Document id
“nodes” Yes List<String> None List of elastic nodes addresses
“port” No Integer 9200 Http api port

Publication

Tag Condition Type Default Comment
None None Singleton<String> None The document

Example

{
        "type": "elastic_input",
        "component": "input",
        "settings": {
                "index": "example",
                "type": "doc",
                "id": "AWBQnjJj8QPf3E_8E-PX",
                "nodes": [ "localhost" ]
        },
        "publish": [
                { "field": "value" }
        ]
}

Warning

If the requested document does not exist, an exception is throw. To avoid it, execute the example from Elastic Output Node.

You can execute it with the command:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_input.json --spark-master local[*] --deploy-mode client

Elastic Output Node

The “elastic_output” node writes an unique document received in a subscription to an elastic cluster. This writting is not distributed, it’s done from the application spark driver with the elastic http api.

Settings

Parameter Mandatory Type Default Comment
“index” Yes String None Index Name
“type” Yes String None Type Name
“id” No String Generated by elastic Document id
“nodes” Yes List<String> None List of elastic nodes addresses
“port” No Integer 9200 Http api port

Souscription

Tag Mandatory Type Default Comment
None Yes Singleton<String> None Document source

Example

{
        "type": "elastic_output",
        "component": "elastic_output",
        "settings": {
                "index": "example",
                "type": "doc",
                "id": "AWBQnjJj8QPf3E_8E-PX",
                "nodes": [ "localhost" ]
        },
        "subscribe": [
                { "component": "input", "field": "value" }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_output.json --spark-master local[*] --deploy-mode client

Elastic Batch Input Node

The “elastic_batch_input” reads elastic documents in a distributed way. It uses the elastic spark api.

Warning

If the input index does not exist, an exception is throw by default. Set “es.index.read.missing.as.empty” elastic parameter to “yes” to change this behavior and return an empty dataset.

Settings

Parameter Mandatory Type Default Comment
“index” Yes String None Index name
“type” No String None Type name
“cluster_name” Yes String None Name of the target cluster
“nodes” Yes List<String> None List of elastic nodes addresses
“port” No Integer 9200 Http api port
“elastic_settings” No Map<String,String> None Elastic parameters
“query” No Json None An elastic query
“column_id” No String No id column The name of the column containing the document id in output dataset
“column_source” No String No source column The name of the column containing the document source in output dataset
“output_fields” No List<JsonField> None List of output json fields

Publication

Tag Condition Type Default Comment
None None Singleton<Dataset> None The result dataset

Example

{
        "type": "elastic_batch_input",
        "component": "input",
        "settings": {
                "index": "example",
                "type": "doc",
                "cluster_name": "es_search",
                "nodes": [ "localhost" ],
                "elastic_settings": {
                        "es.index.read.missing.as.empty": "yes"
                },
                "column_id": "id",
                "column_source": "source",
                "output_fields": [
                        {
                                "type": "integer",
                                "json_field": "[A][B]",
                                "dataset_column": "AB"
                        }
                ]
        },
        "publish": [
                { "field": "data" }
        ]
}

Warning

If the input index does not exist, an exception is throw. To avoid it, execute the example from Elastic Batch Output Node.

You can execute it with the command:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_batch_input.json --spark-master local[*] --deploy-mode client

Elastic Batch Output Node

The “elastic_batch_output” writes json documents in a distributed way to an elastic cluster. It uses the elastic spark api.

Settings

Parameter Mandatory Type Default Comment
“index” Yes String None Index name
“type” No String None Type name
“cluster_name” Yes String None Name of the target cluster
“nodes” Yes List<String> None List of elastic nodes addresses
“port” No Integer 9200 Http api port
“elastic_settings” No Map<String,String> None Elastic parameters
“column_id” No String Generated by elastic The name of the column containing the document id in input dataset
“column_source” No String None The name of the column containing the document source in input dataset
“input_fields” No List<JsonField> None List of input json fields added in source document

Souscription

Tag Mandatory Type Default Comment
None Yes Singleton<Dataset> None A dataset

Example

{
        "type": "elastic_batch_output",
        "component": "output",
        "settings": {
                "index": "example",
                "type": "doc",
                "cluster_name": "es_search",
                "nodes": [ "localhost" ],
                "column_source": "source"
        },
        "subscribe": [
                { "component": "input", "field": "data" }
        ]
}

This above graph use this node to write a dataset of json documents to an elastic index.

You can execute it with the command:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_batch_ouput.json --spark-master local[*] --deploy-mode client

Spark Nodes

Nodes and stages described in this section use official spark library:

  • Mllib: the spark machine-learning library.
  • SQL: the spark structured data processing.

Pipeline Stage

A pipeline Stage configuration contains a string field “type” containing the Java class name of the pipeline stage implementation. For example, it could be “org.apache.spark.ml.clustering.KMeans”. And if needed, a json field “settings” containing the parameters of the stage. You can find the list of parameters for an official spark stage in the official spark scaladoc.

Settings

Parameter Mandatory Type Default Comment
“type” Yes String None Class name of stage
“settings” No Json None Stage settings

Example

For example, the KMeans stage has the “type” “org.apache.spark.ml.clustering.KMeans” and the “params”:

  • featureCol (String)
  • k (integer)
  • maxIter (integer)
  • predictionCol (String)
  • seed (long)
  • tol (double)
{
        "type": "org.apache.spark.ml.clustering.KMeans",
        "params": {
                "featureCol": "features",
                "k": 5,
                "maxIter": 100,
                "predictionCol": "prediction",
                "seed": 1234,
                "tol": 0.5
        }
}

With PunchPlatform, we provide also some spark pipeline stages adapted to some specific use cases you can meet. Thoses stages are accessible like any other spark Mllib stage during pipeline json construction:

{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonOutputStage",
        "settings": {
                "inputCol": "json",
                "outputFields": [
                        { "type": "string", "json_field": "C" },
                        { "type": "json", "json_field": "A" },
                        { "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
                ]
        }
}

Note

There is an automatic adaptation of the json parameter to the correct type. For example in the KMeans stage above described, the “seed” json value is casted to long.

Note

There are still the default values for stage parameters. You do not have to set every parameters.

Note

Parameters naming respect CamelCase convention because they correspond to existing Java variables.

Mllib Node

The “mllib” node can train a pipeline and use it on data. You can construct this pipeline with any spark Mllib and PunchPlatform pipeline stage.

Settings

Parameter Mandatory Type Default Comment
“pipeline” No List<Stage> Empty pipeline Pipeline configuration

Souscription

Tag Mandatory Type Default Comment
“input_fit” No Singleton<Dataset> Empty dataset Dataset used to fit the pipeline
“input_transform” No Singleton<Dataset> None Dataset usest as input of fitted model

Publication

Tag Condition Type Default Comment
“output_transform” “input_transform” provided Dataset<Row> None The result dataset of application of fitted pipeline on “input_transform” dataset
“model” None Singleton<Model> None The fitted pipeline

Example

{
        "type": "mllib",
        "component": "mllib",
        "settings": {
                "pipeline": [
                        {
                                "type": "org.apache.spark.ml.classification.LogisticRegression",
                                "settings": { "maxIter": 10, "regParam": 0.01 }
                        }
                ]
        },
        "subscribe": [
                { "component": "punch_batch_fit", "field": "data", "tag": "input_fit" },
                { "component": "punch_batch_transform", "field": "data", "tag": "input_transform" }
        ],
        "publish": [
                { "field": "data", "tag": "output_transform" }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/mllib.json --spark-master local[*] --deploy-mode client

Sql Node

A “sql” node create new datasets from sql query and other subscribed dataset. The execution steps are:

  1. Subscribed Datasets are added as available table
  2. Each Statement is executed in the declaration order and new corresponding Dataset is also added as available table

Settings

Parameter Mandatory Type Default Comment
“statements” No List<String> None Statements at format “table_name = statement”

Souscription

Tag Mandatory Type Default Comment
None No Map<Dataset> None Initial Dataset converted to SQL tables

Publication

Tag Condition Type Default Comment
None The name corresponds to a created table Map<Dataset> None A dataset from a table created by a statement

Example

{
        "type": "sql",
        "component": "sql",
        "settings": {
                "statements": [
                        "data = SELECT * FROM data_1 FULL OUTER JOIN data_2 ON data_1.id_1 = data_2.id_2"
                ]
        },
        "subscribe": [
                { "component": "punch_batch_1", "field": "data", "name": "data_1" },
                { "component": "punch_batch_2", "field": "data", "name": "data_2" }
        ],
        "publish": [
                { "field": "data", "name": "data" }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/sql.json --spark-master local[*] --deploy-mode client

Count Node

The “count” node simply counts the number of rows in a dataset and returns the result.

Souscription

Tag Mandatory Type Default Comment
None Yes Dataset None Input dataset

Publication

Tag Condition Type Default Comment
None None Long None The number of rows in input dataset

Example

{
        "type": "count",
        "component": "count",
        "subscribe": [
                { "component": "input", "field": "data" }
        ],
        "publish": [
                { "field": "value" }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/count.json --spark-master local[*] --deploy-mode client

Show Node

The “show” node simply prints first rows of a spark dataset.

Settings

Parameter Mandatory Type Default Comment
“title” No String “SHOW” Text print before dataset
“truncate” No Boolean true If true the column contents are truncated in order to limit print width
“select” No List<String> All List of column names to print

Souscription

Tag Mandatory Type Default Comment
None Yes Dataset None Input dataset

Example

{
        "type": "show",
        "component": "show_input",
        "settings": {
                "title": "INPUT",
                "select": ["features", "label"]
        },
        "subscribe": [
                {
                        "component": "input",
                        "field": "data"
                }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/show.json --spark-master local[*] --deploy-mode client

Punch

Punch Batch Node

With the node “punch_batch”, you can apply a punchlet code on each json documents from a string column of a subscribed dataset.

You can also provide external resources by adding it in “reources” setting or by named subscription tagged with “resource”. Those resources are accessible in punchlet code through the Java function:

/**
 * Return a provided resource
 * @param resourceName name of the resource (subscription name or "resources" map key)
 * @param resourceTtype type of the resource
 * @return the resource
 */
public <T> T getResource(String resourceName, Class<T> resourceTtype)

Warning

You must use this node instead of Punch Stage if you need to provide a ressource from an other node during punchlet execution.

Settings

Parameter Mandatory Type Default Comment
“punchlet_code” No String “{}” Punchlet code. Override “punchlet_code_file”.
“punchlet_code_file” No String “{}” Punchlet code file readable from driver
“input_column” Yes String None Name of string input column
“output_column” Yes String None Name of string output column
“resources” No Json None Map of resources provided during punchlet execution
“root” No Json {} Initial root tuple

Souscription

Tag Mandatory Type Default Comment
None Yes Singleton<Dataset> None Input dataset
“resource” No Map<Object> None Map of resources provided during punchlet execution

Publication

Tag Condition Type Default Comment
None None Singleton<Dataset> None The result dataset

Example

{
        "type": "punch_batch",
        "component": "punch",
        "settings": {
                "punchlet_code": "{ root:[AB] = root:[A][B]; root:[D] = getResource(\"resource_1\", String.class); root:[E] = getResource(\"resource_2\", String.class); }",
                "input_column": "source",
                "output_column": "output",
                "resources": {
                        "resource_1": "banana"
                }
        },
        "subscribe": [
                {
                        "component": "resource",
                        "field": "value",
                        "tag": "resource",
                        "name": "resource_2"
                },
                { "component": "input", "field": "data" }
        ],
        "publish": [
                { "field": "data" }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/punch_batch.json --spark-master local[*] --deploy-mode client

Punch Stage

The stage “com.thales.services.cloudomc.punchplatform.analytics.mllib.punch.PunchStage” is very similar to Punch Batch node. It allows to apply punchlet on json’s from a string column.

Settings

Parameter Mandatory Type Default Comment
“punchletCode” No String “{}” Punchlet code. Override “punchlet_code_file”.
“punchletCodeFile” No String “{}” Punchlet code file readable from driver
“inputCol” Yes String None Name of input column
“outputCol” Yes String None Name of output column
“resources” No Json None Map of resources provided during punchlet execution
“root” No Json {} Initial root tuple

Example

{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.punch.PunchStage",
        "settings": {
                "punchletCode": "{ root:[AB] = root:[A][B]; root:[D] = getResource(\"resource_1\", String.class); }",
                "inputCol": "source",
                "outputCol": "output",
                "resources": {
                        "resource_1": "banana"
                }
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/punch_stage.json --spark-master local[*] --deploy-mode client

Punch Node

The node “punch” concatenates its subscriptions into a Tuple, executes the punchlet on it and publishes it as json string. Like “punch_batch”, you can provide resources in settings or subscriptions. By default subscription values are automaticly added to tuple as it but you can also provide a converter type in “types” setting if needed.

Settings

Parameter Mandatory Type Default Comment
“punchlet_code” No String “{}” Punchlet code. Override “punchlet_code_file”.
“punchlet_code_file” No String “{}” Punchlet code file readable from driver
“resources” No Json None Map of resources provided during punchlet execution
“root” No Json {} Initial root tuple
“types” No List<Field> None List of field
“output_field” No String root Field from tuple to publish It can be a name of any existing root key or a path at format “[field_1][field_2]…”.

Field configuration is:

Parameter Mandatory Type Default Comment
“type” Yes String None A converter type
“json_field” No String “subscription_name” value Json field. It can be a name of any existing root key or a path at format “[field_1][field_2]…”.
“subscription_name” No String “json_field” value The name of a subscription
{
        "type": "string",
        "json_fields": "[A][B]",
        "subscription_name": "AB"
}

Souscription

Tag Mandatory Type Default Comment
None No Map<Object> None Map of fields added to root Tuple
“resource” No Map<Object> None Map of resources provided during punchlet execution

Publication

Tag Condition Type Default Comment
None None Singleton<String> None The result json document

Example

{
        "type": "punch",
        "component": "punch",
        "settings": {
                "punchlet_code": "{ root:[resource_1] = getResource(\"resource_1\", String.class); }",
                "types": {
                        "A": "json"
                },
                "resources": {
                        "resource_1": "banana"
                }
        },
        "subscribe": [
                { "component": "object_A", "field": "value", "name": "A" },
                { "component": "object_C", "field": "value", "name": "C" }
        ],
        "publish": [
                { "field": "value" }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/punch.json --spark-master local[*] --deploy-mode client

Vectorizer

introduction

Numerous machine-learning algorithms can be used to analyse textual variable as logs. However, these algorithms can only use numerical input data. It’s often not possible to use directly text feature without transforming them into numerical feature first.

The stage “com.thales.services.cloudomc.punchplatform.analytics.mllib.vectorizer.Vectorizer” is a stage for numerical features creation from character strings. By using it, you will be able to create a customized string-numeric transformation.

First of all, there are two distinct categories of character strings variables:

  • With limited number of values: this kind of variable can be used without using Vectorizer but with a simple dichotomization (apply a “org.apache.spark.ml.feature.StringIndexer” and a “org.apache.spark.ml.feature.OneHotEncoder”).
  • With unlimited or unknow number of distinct values: this variable cannot be dichotomized so Vectorizer is adapted.

Settings

Parameter Mandatory Type Default Comment
“inputCol” Yes String None Name of input column containing the input textual variable
“outputCol” Yes String None Name of vector output column
“features” No List<Feature> None List of features to create

Features

Parameter Mandatory Type Default Comment
“type” Yes String None Type of the feature
“settings” No Json None Settings of the Feature

Possible feature types are:

  • “change”
  • “check_pattern”
  • “count”
  • “distribution”
  • “max_stream_length”

Feature Change

“change” feature counts the number of sequences of characters corresponding to a regex.

Parameter Mandatory Type Default Comment
“target” Yes String None Regex defining the type of character to count
{
        "type": "change",
        "settings": {
                "target": "[a-z]"
        }
}

Feature Check Pattern

“check_pattern” allows to check if a text matches a regex pattern.

Parameter Mandatory Type Default Comment
“target” Yes String None Regex to test textual variable
{
        "type": "check_pattern",
        "settings": {
                "target": "^.*word.*$"
        }
}

Feature Count

“count” feature counts the number of characters corresponding to a regex.

Parameter Mandatory Type Default Comment
“target” Yes String None Regex defining the type of character to count
{
        "type": "count",
        "settings": {
                "target": "[a-z]"
        }
}

Feature Distribution

“distribution” feature computes distribution of characters defined by a regex, sort occurrence numbers and return n highest.

Parameter Mandatory Type Default Comment
“target” Yes String None Regex defining the type of character to count
“n” Yes Integer None Number of distribution to return
{
        "type": "distribution",
        "settings": {
                "target": "[a-z]",
                "n": 10
        }
}

Feature Max Stream Length

“max_stream_length” feature compute the length of the n longest stream of user-defined type of characters.

Parameter Mandatory Type Default Comment
“target” Yes String None Regex defining the type of character to count
“n” Yes Integer None Number of distribution to return
{
        "type": "max_stream_length",
        "settings": {
                "target": "[a-z]",
                "n": 10
        }
}

R

Note

From official R website: R is ‘GNU S’, a freely available language and environment for statistical computing and graphics which provides a wide variety of statistical and graphical techniques: linear and nonlinear modelling, statistical tests, time series analysis, classification, clustering, etc.

Installation

$ sudo apt-get install r-base r-base-dev

RStage

../../../_images/analytics_r_stage.png

Warning

Developers only. This stage is not yet supported.

Most of the algortihms used in ML are implemented in R. That is why we created a connector.

The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage” is a stage that can run a R script on a stirng column.

Warning

The R stage does not conserve columns from input dataset.

Settings

Parameter Mandatory Type Default Comment
“inputCol” Yes String None Name of string input column
“outputCol” Yes String None Name of string output column
“rCode” No String “main <- identity” R script code, override “rCodeFile”
“rCodeFile” No String “main <- identity” R script code file readable from driver
“functionName” No String “main” The name of the main function in the script driver
“mode” No String “json” Mode defining how input data are read by R script (see below)
“partitionBy” No List<String> None Shuffle data by the given columns. This means that two row sharing the same value for those columns are sent to the same R execution.
“partitionNum” No Integer None Number of partitions in Dataset after shuffling

Function

The provided main R function must accept one argument. Its expected type depends of “mode”.

main <- function(input) {
        return input;
}

Mode

String

With the mode “string”, for each string value from the input column the main R function is executed once and must return a string.

{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage",
        "settings": {
                "rCode": "main <- function(input) {return(nchar(input));}",
                "inputCol": "input",
                "outputCol": "output",
                "mode": "string"
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/r_stage_string.json --spark-master local[*] --deploy-mode client
Json

With the mode “json”, for each string value from the input column the processing steps are:

  1. The string is interpreted as Json and parsed to a R list (equivalent to a Java map or python dictionary)
  2. The main R function is executed on this list and return an other list
  3. The result list is converted to a string at json format
{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage",
        "settings": {
                "rCode": "main <- function(input) {input$AB <- input$A$B; return(input);}",
                "inputCol": "input",
                "outputCol": "output",
                "mode": "json"
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/r_stage_json.json --spark-master local[*] --deploy-mode client
Partition Csv

With the mode “partition_csv”, for each partition of the dataset the processing steps are:

  1. Each string value of the column are interpreted as csv row and the partition is converted to a R dataframe.
  2. The main R function is executed on this dataframe and return an other dataframe
  3. Each row of the result dataframe is converted to a string at csv format
{
        "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage",
        "settings": {
                "rCode": "main <- function(input) {input$V3 <- paste(input$V1, input$V2, sep=\"_\"); return(input);}",
                "inputCol": "input_csv",
                "outputCol": "output_csv",
                "mode": "partition_csv"
        }
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/r_stage_partition_csv.json --spark-master local[*] --deploy-mode client

Base

This section describes nodes functioning without external dependencies.

Object Generator Node

The node “object_generator” is a simple object publisher.

Settings

Parameter Mandatory Type Default Comment
“value” Yes Object None Object to publish

Publication

Tag Condition Type Default Comment
None None Singleton<Object> None The object from settings “value”

Example

{
        "type": "object_generator",
        "component": "object",
        "settings": {
                "value": "hello"
        },
        "publish": [
                {
                        "field": "value"
                }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/object_generator.json --spark-master local[*] --deploy-mode client

Math

This section describes math nodes mainly used in documentation for graph concept presentation.

Number Generator Node

The node “number_generator” is a simple number publisher.

Settings

Parameter Mandatory Type Default Comment
“value” Yes Double None Number to publish

Publication

Tag Condition Type Default Comment
None None Singleton<Double> None The number from settings “value”

Example

{
        "type": "number_generator",
        "component": "a",
        "settings": {
                "value": 1.0
        },
        "publish": [
                {
                        "field": "value"
                }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_generator.json --spark-master local[*] --deploy-mode client

Number Addition Node

The node “number_addition” makes an addition with subscribed values.

Souscription

Tag Mandatory Type Default Comment
None No List<Double> None Input values of addition

Publication

Tag Condition Type Default Comment
None None Singleton<Double> None Result of addition

Example

{
        "type": "number_addition",
        "component": "addition",
        "subscribe": [
                {
                        "component": "a",
                        "field": "value"
                },
                {
                        "component": "b",
                        "field": "value"
                }
        ],
        "publish": [
                {
                        "field": "value"
                }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_addition.json --spark-master local[*] --deploy-mode client

Number Division Node

The node “number_division” makes a division with two subscribed values.

Souscription

Tag Mandatory Type Default Comment
“numerator” Yes Double None Numerator value
“denominator” Yes Double None Denominator value

Publication

Tag Condition Type Default Comment
None None Singleton<Double> None Result of division

Example

{
        "type": "number_division",
        "component": "division",
        "subscribe": [
                {
                        "component": "a",
                        "field": "value",
                        "tag": "numerator"
                },
                {
                        "component": "b",
                        "field": "value",
                        "tag": "denominator"
                }
        ],
        "publish": [
                {
                        "field": "value"
                }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_division.json --spark-master local[*] --deploy-mode client

Number Formula Node

The node “number_formula” execute mathematic formulae on subscribed values.

Souscription

Tag Mandatory Type Default Comment
None No Map<Double> None Formula variables, souscription name is the variable name

Publication

Tag Condition Type Default Comment
None None Map<Double> None Formula results, publication name is the name of variable created by the formula

Example

{
        "type": "number_formula",
        "component": "formula",
        "settings": {
                "formulae": [
                        "c = a / b",
                        "d = c + 5"
                ]
        },
        "subscribe": [
                {
                        "component": "a",
                        "field": "value",
                        "name": "a"
                },
                {
                        "component": "b",
                        "field": "value",
                        "name": "b"
                }
        ],
        "publish": [
                {
                        "field": "value",
                        "name": "d"
                }
        ]
}

You can test this example with the commands:

$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_formula.json --spark-master local[*] --deploy-mode client