Nodes and Stages Configuration¶
In this chapter, we explain how to design complete Spark pipelines from the library of nodes and stages provided by PunchPlatform.
Remember nodes are your pipeline (i.e. graph) functions. Some will be in charge of fetching the data, other in charge of performing transformations, yet others will be in charge of saving the results to some database or files. The PunchPlatform ships in with several nodes, described next. By combining nodes, a graph can cover many analytics functional use cases.
General¶
Subscription Notation¶
A node subscription is typed, can be tagged and can be named. Let’s note the type ‘T’, there are three possible subscription cardinalities. In this documentation, they are noted:
- Singleton<T>: A tag associated to a Singleton subscription must be used no more than once.
- List<T>: A tag associated to a List subscription can be repeated
- Map<T>: A tag associated to a Map subscription can be repeated and all its subscriptions must be named.
{
"subscribe": [
# An untagged Singleton<T> subscription
{"component": "component_A", "field": "field_1"},
# A tagged Singleton<T> subscription
{"component": "component_A", "field": "field_2", "tag": "tag_1"},
# Tagged List<T> subscriptions
{"component": "component_A", "field": "field_3", "tag": "tag_2"},
{"component": "component_A", "field": "field_4", "tag": "tag_2"},
# Tagged Map<T> subscriptions
{"component": "component_A", "field": "field_3", "tag": "tag_2", "name": "name_1"},
{"component": "component_A", "field": "field_4", "tag": "tag_2", "name": "name_2"}
]
}
Publication Notation¶
As subscriptions, a node publication is typed, can be tagged and can be named. Let’s note this type ‘T’, there are two possible publication cardinalities. In this documentation, they are noted:
- Singleton<T>: A tag associated to a Singleton publication must be used no more than once.
- Map<T>: A tag associated to a Map subscription can be repeated and all its publications must be named.
{
"publish": [
# An untagged Singleton<T> publication
{"field": "field_1"},
# A tagged Singleton<T> publication
{"field": "field_2", "tag": "tag_1"},
# Tagged Map<T> publications
{"field": "field_3", "tag": "tag_2", "name": "name_1"},
{"field": "field_4", "tag": "tag_2", "name": "name_2"}
]
}
Json Document¶
This chapter describe how punchplatform nodes and stages handle json documents.
Json Converter¶
Json converter are used for two reasons:
- Because conversion from json value to spark value is not trivial. In many cases, it is not possible to cast a json value to a spark object and inversely.
- To configure the type of data before looking at it. It’s a spark Mllib requirement.
Example 1¶
Let’s say, you have a spark dataset with a string column containing json documents and you want apply a punchlet on those documents. The problem is that it is not possible to automatically determine if those strings contain json document or random text. So you have to precise it by choosing the correct converter in the configuration.
Example 2¶
Let’s say, you have a spark dataset with a string column containing json documents. You want to create a new column of type long by extracting integer values from those documents. The problem is that it is not possible to automatically determine if you want to cast it to integer or to long. So you have to precise it by choosing the correct converter in the configuration.
Possible Converters¶
Type | Json type | Spark type |
---|---|---|
“double” | Double | Double |
“integer” | Integer | Integer |
“json” | Json | String |
“string” | String | String |
“vector” | Array<Double> | org.apache.spark.ml.linalg.Vector |
“long” | Integer | Long |
Json Field¶
Json fields are used for two reasons:
- To extract a value from a json document and cast it to a spark type.
- To cast a spark value to a json type and put it into a json document.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“type” | Yes | String | None | A converter type |
“json_field” | No | String | “dataset_column” value | Json field. It can be a name of any existing root key or a path at format “[field_1][field_2]…”. |
“dataset_column” | No | String | “json_field” value | The name of dataset column created from this field |
Example¶
{
"type": "integer",
"json_field": "[A][B]",
"dataset_column": "AB"
}
Json Input Stage¶
The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonInputStage” creates json documents from other dataset column.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“inputFields” | No | List<JsonField> | None | List of input fields |
“outputCol” | Yes | String | None | Name of json output column |
Example¶
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.jackson.JsonInputStage",
"settings": {
"outputCol": "json",
"inputFields": [
{ "type": "string", "dataset_column": "C" },
{ "type": "json", "dataset_column": "A" },
{ "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
]
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/json_input_stage.json --spark-master local[*] --deploy-mode client
Json Output Stage¶
The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonOutputStage” create dataset columns from a column containing json documents.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“inputCol” | Yes | String | None | Name of json input column |
“outputFields” | No | List<JsonField> | None | List of output fields |
Example¶
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonOutputStage",
"settings": {
"inputCol": "json",
"outputFields": [
{ "type": "string", "json_field": "C" },
{ "type": "json", "json_field": "A" },
{ "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
]
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/json_output_stage.json --spark-master local[*] --deploy-mode client
Csv Document¶
This chapter describe how punchplatform nodes and stages handle csv documents.
Csv Converter¶
For the same reasons as with json documents, you have to use converters in order to cast spark object to csv value.
Possible Converters¶
Type | Csv type | Spark type |
---|---|---|
“double” | Number | Double |
“integer” | Number | Integer |
“string” | String | String |
“vector” | Array | org.apache.spark.ml.linalg.Vector |
“long” | Number | Long |
Csv Field¶
A csv field be used for two reasons:
- To extract a value from a csv document and cast it to a spark type.
- To cast a spark value to a csv type and put it into a csv row.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“type” | Yes | String | None | A converter type |
“dataset_column” | No | String | None | The name of dataset column created from this field |
Example¶
{
"type": "string",
"dataset_column": "C"
}
Csv Input Stage¶
The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvInputStage” creates csv documents from other dataset column.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“inputFields” | No | List<CsvField> | None | List of input fields |
“outputCol” | Yes | String | None | Name of csv output column |
Example¶
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvInputStage",
"settings": {
"inputFields": [
{ "type": "string", "dataset_column": "C" },
{ "type": "string", "dataset_column": "A" },
{ "type": "integer", "dataset_column": "AB" }
],
"outputCol": "csv"
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/csv_input_stage.json --spark-master local[*] --deploy-mode client
Csv Output Stage¶
The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvOutputStage” create dataset columns from a column containing json documents.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“inputCol” | Yes | String | None | Name of csv input column |
“outputFields” | No | List<CsvField> | None | List of output fields |
Example¶
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.CsvOutputStage",
"settings": {
"inputCol": "csv",
"outputFields": [
{ "type": "string", "dataset_column": "C" },
{ "type": "string", "dataset_column": "A" },
{ "type": "integer", "dataset_column": "AB" }
]
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/csv_output_stage.json --spark-master local[*] --deploy-mode client
Elastic¶
Elastic Input Node¶
The “elastic_input” node reads an unique document from an elastic cluster and publish it as a string. This reading is not distributed, it’s done from the application spark driver with the elastic http api. This means each node subscribing to this one must expect receive a string value and not a dataset.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“index” | Yes | String | None | Index Name |
“type” | Yes | String | None | Type Name |
“id” | Yes | String | None | Document id |
“nodes” | Yes | List<String> | None | List of elastic nodes addresses |
“port” | No | Integer | 9200 | Http api port |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<String> | None | The document |
Example¶
{
"type": "elastic_input",
"component": "input",
"settings": {
"index": "example",
"type": "doc",
"id": "AWBQnjJj8QPf3E_8E-PX",
"nodes": [ "localhost" ]
},
"publish": [
{ "field": "value" }
]
}
Warning
If the requested document does not exist, an exception is throw. To avoid it, execute the example from Elastic Output Node.
You can execute it with the command:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_input.json --spark-master local[*] --deploy-mode client
Elastic Output Node¶
The “elastic_output” node writes an unique document received in a subscription to an elastic cluster. This writting is not distributed, it’s done from the application spark driver with the elastic http api.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“index” | Yes | String | None | Index Name |
“type” | Yes | String | None | Type Name |
“id” | No | String | Generated by elastic | Document id |
“nodes” | Yes | List<String> | None | List of elastic nodes addresses |
“port” | No | Integer | 9200 | Http api port |
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | Yes | Singleton<String> | None | Document source |
Example¶
{
"type": "elastic_output",
"component": "elastic_output",
"settings": {
"index": "example",
"type": "doc",
"id": "AWBQnjJj8QPf3E_8E-PX",
"nodes": [ "localhost" ]
},
"subscribe": [
{ "component": "input", "field": "value" }
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_output.json --spark-master local[*] --deploy-mode client
Elastic Batch Input Node¶
The “elastic_batch_input” reads elastic documents in a distributed way. It uses the elastic spark api.
Warning
If the input index does not exist, an exception is throw by default. Set “es.index.read.missing.as.empty” elastic parameter to “yes” to change this behavior and return an empty dataset.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“index” | Yes | String | None | Index name |
“type” | No | String | None | Type name |
“cluster_name” | Yes | String | None | Name of the target cluster |
“nodes” | Yes | List<String> | None | List of elastic nodes addresses |
“port” | No | Integer | 9200 | Http api port |
“elastic_settings” | No | Map<String,String> | None | Elastic parameters |
“query” | No | Json | None | An elastic query |
“column_id” | No | String | No id column | The name of the column containing the document id in output dataset |
“column_source” | No | String | No source column | The name of the column containing the document source in output dataset |
“output_fields” | No | List<JsonField> | None | List of output json fields |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<Dataset> | None | The result dataset |
Example¶
{
"type": "elastic_batch_input",
"component": "input",
"settings": {
"index": "example",
"type": "doc",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"elastic_settings": {
"es.index.read.missing.as.empty": "yes"
},
"column_id": "id",
"column_source": "source",
"output_fields": [
{
"type": "integer",
"json_field": "[A][B]",
"dataset_column": "AB"
}
]
},
"publish": [
{ "field": "data" }
]
}
Warning
If the input index does not exist, an exception is throw. To avoid it, execute the example from Elastic Batch Output Node.
You can execute it with the command:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_batch_input.json --spark-master local[*] --deploy-mode client
Elastic Batch Output Node¶
The “elastic_batch_output” writes json documents in a distributed way to an elastic cluster. It uses the elastic spark api.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“index” | Yes | String | None | Index name |
“type” | No | String | None | Type name |
“cluster_name” | Yes | String | None | Name of the target cluster |
“nodes” | Yes | List<String> | None | List of elastic nodes addresses |
“port” | No | Integer | 9200 | Http api port |
“elastic_settings” | No | Map<String,String> | None | Elastic parameters |
“column_id” | No | String | Generated by elastic | The name of the column containing the document id in input dataset |
“column_source” | No | String | None | The name of the column containing the document source in input dataset |
“input_fields” | No | List<JsonField> | None | List of input json fields added in source document |
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | Yes | Singleton<Dataset> | None | A dataset |
Example¶
{
"type": "elastic_batch_output",
"component": "output",
"settings": {
"index": "example",
"type": "doc",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"column_source": "source"
},
"subscribe": [
{ "component": "input", "field": "data" }
]
}
This above graph use this node to write a dataset of json documents to an elastic index.
You can execute it with the command:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/elastic_batch_ouput.json --spark-master local[*] --deploy-mode client
Spark Nodes¶
Nodes and stages described in this section use official spark library:
Pipeline Stage¶
A pipeline Stage configuration contains a string field “type” containing the Java class name of the pipeline stage implementation. For example, it could be “org.apache.spark.ml.clustering.KMeans”. And if needed, a json field “settings” containing the parameters of the stage. You can find the list of parameters for an official spark stage in the official spark scaladoc.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“type” | Yes | String | None | Class name of stage |
“settings” | No | Json | None | Stage settings |
Example¶
For example, the KMeans stage has the “type” “org.apache.spark.ml.clustering.KMeans” and the “params”:
- featureCol (String)
- k (integer)
- maxIter (integer)
- predictionCol (String)
- seed (long)
- tol (double)
{
"type": "org.apache.spark.ml.clustering.KMeans",
"params": {
"featureCol": "features",
"k": 5,
"maxIter": 100,
"predictionCol": "prediction",
"seed": 1234,
"tol": 0.5
}
}
With PunchPlatform, we provide also some spark pipeline stages adapted to some specific use cases you can meet. Thoses stages are accessible like any other spark Mllib stage during pipeline json construction:
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.json.JsonOutputStage",
"settings": {
"inputCol": "json",
"outputFields": [
{ "type": "string", "json_field": "C" },
{ "type": "json", "json_field": "A" },
{ "type": "integer", "json_field": "[A][B]", "dataset_column": "AB" }
]
}
}
Note
There is an automatic adaptation of the json parameter to the correct type. For example in the KMeans stage above described, the “seed” json value is casted to long.
Note
There are still the default values for stage parameters. You do not have to set every parameters.
Note
Parameters naming respect CamelCase convention because they correspond to existing Java variables.
Mllib Node¶
The “mllib” node can train a pipeline and use it on data. You can construct this pipeline with any spark Mllib and PunchPlatform pipeline stage.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“pipeline” | No | List<Stage> | Empty pipeline | Pipeline configuration |
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“input_fit” | No | Singleton<Dataset> | Empty dataset | Dataset used to fit the pipeline |
“input_transform” | No | Singleton<Dataset> | None | Dataset usest as input of fitted model |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
“output_transform” | “input_transform” provided | Dataset<Row> | None | The result dataset of application of fitted pipeline on “input_transform” dataset |
“model” | None | Singleton<Model> | None | The fitted pipeline |
Example¶
{
"type": "mllib",
"component": "mllib",
"settings": {
"pipeline": [
{
"type": "org.apache.spark.ml.classification.LogisticRegression",
"settings": { "maxIter": 10, "regParam": 0.01 }
}
]
},
"subscribe": [
{ "component": "punch_batch_fit", "field": "data", "tag": "input_fit" },
{ "component": "punch_batch_transform", "field": "data", "tag": "input_transform" }
],
"publish": [
{ "field": "data", "tag": "output_transform" }
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/mllib.json --spark-master local[*] --deploy-mode client
Sql Node¶
A “sql” node create new datasets from sql query and other subscribed dataset. The execution steps are:
- Subscribed Datasets are added as available table
- Each Statement is executed in the declaration order and new corresponding Dataset is also added as available table
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“statements” | No | List<String> | None | Statements at format “table_name = statement” |
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | No | Map<Dataset> | None | Initial Dataset converted to SQL tables |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | The name corresponds to a created table | Map<Dataset> | None | A dataset from a table created by a statement |
Example¶
{
"type": "sql",
"component": "sql",
"settings": {
"statements": [
"data = SELECT * FROM data_1 FULL OUTER JOIN data_2 ON data_1.id_1 = data_2.id_2"
]
},
"subscribe": [
{ "component": "punch_batch_1", "field": "data", "name": "data_1" },
{ "component": "punch_batch_2", "field": "data", "name": "data_2" }
],
"publish": [
{ "field": "data", "name": "data" }
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/sql.json --spark-master local[*] --deploy-mode client
Count Node¶
The “count” node simply counts the number of rows in a dataset and returns the result.
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | Yes | Dataset | None | Input dataset |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Long | None | The number of rows in input dataset |
Example¶
{
"type": "count",
"component": "count",
"subscribe": [
{ "component": "input", "field": "data" }
],
"publish": [
{ "field": "value" }
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/count.json --spark-master local[*] --deploy-mode client
Show Node¶
The “show” node simply prints first rows of a spark dataset.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“title” | No | String | “SHOW” | Text print before dataset |
“truncate” | No | Boolean | true | If true the column contents are truncated in order to limit print width |
“select” | No | List<String> | All | List of column names to print |
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | Yes | Dataset | None | Input dataset |
Example¶
{
"type": "show",
"component": "show_input",
"settings": {
"title": "INPUT",
"select": ["features", "label"]
},
"subscribe": [
{
"component": "input",
"field": "data"
}
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/show.json --spark-master local[*] --deploy-mode client
Punch¶
Punch Batch Node¶
With the node “punch_batch”, you can apply a punchlet code on each json documents from a string column of a subscribed dataset.
You can also provide external resources by adding it in “reources” setting or by named subscription tagged with “resource”. Those resources are accessible in punchlet code through the Java function:
/**
* Return a provided resource
* @param resourceName name of the resource (subscription name or "resources" map key)
* @param resourceTtype type of the resource
* @return the resource
*/
public <T> T getResource(String resourceName, Class<T> resourceTtype)
Warning
You must use this node instead of Punch Stage if you need to provide a ressource from an other node during punchlet execution.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“punchlet_code” | No | String | “{}” | Punchlet code. Override “punchlet_code_file”. |
“punchlet_code_file” | No | String | “{}” | Punchlet code file readable from driver |
“input_column” | Yes | String | None | Name of string input column |
“output_column” | Yes | String | None | Name of string output column |
“resources” | No | Json | None | Map of resources provided during punchlet execution |
“root” | No | Json | {} | Initial root tuple |
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | Yes | Singleton<Dataset> | None | Input dataset |
“resource” | No | Map<Object> | None | Map of resources provided during punchlet execution |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<Dataset> | None | The result dataset |
Example¶
{
"type": "punch_batch",
"component": "punch",
"settings": {
"punchlet_code": "{ root:[AB] = root:[A][B]; root:[D] = getResource(\"resource_1\", String.class); root:[E] = getResource(\"resource_2\", String.class); }",
"input_column": "source",
"output_column": "output",
"resources": {
"resource_1": "banana"
}
},
"subscribe": [
{
"component": "resource",
"field": "value",
"tag": "resource",
"name": "resource_2"
},
{ "component": "input", "field": "data" }
],
"publish": [
{ "field": "data" }
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/punch_batch.json --spark-master local[*] --deploy-mode client
Punch Stage¶
The stage “com.thales.services.cloudomc.punchplatform.analytics.mllib.punch.PunchStage” is very similar to Punch Batch node. It allows to apply punchlet on json’s from a string column.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“punchletCode” | No | String | “{}” | Punchlet code. Override “punchlet_code_file”. |
“punchletCodeFile” | No | String | “{}” | Punchlet code file readable from driver |
“inputCol” | Yes | String | None | Name of input column |
“outputCol” | Yes | String | None | Name of output column |
“resources” | No | Json | None | Map of resources provided during punchlet execution |
“root” | No | Json | {} | Initial root tuple |
Example¶
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.punch.PunchStage",
"settings": {
"punchletCode": "{ root:[AB] = root:[A][B]; root:[D] = getResource(\"resource_1\", String.class); }",
"inputCol": "source",
"outputCol": "output",
"resources": {
"resource_1": "banana"
}
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/punch_stage.json --spark-master local[*] --deploy-mode client
Punch Node¶
The node “punch” concatenates its subscriptions into a Tuple, executes the punchlet on it and publishes it as json string. Like “punch_batch”, you can provide resources in settings or subscriptions. By default subscription values are automaticly added to tuple as it but you can also provide a converter type in “types” setting if needed.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“punchlet_code” | No | String | “{}” | Punchlet code. Override “punchlet_code_file”. |
“punchlet_code_file” | No | String | “{}” | Punchlet code file readable from driver |
“resources” | No | Json | None | Map of resources provided during punchlet execution |
“root” | No | Json | {} | Initial root tuple |
“types” | No | List<Field> | None | List of field |
“output_field” | No | String | root | Field from tuple to publish It can be a name of any existing root key or a path at format “[field_1][field_2]…”. |
Field configuration is:
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“type” | Yes | String | None | A converter type |
“json_field” | No | String | “subscription_name” value | Json field. It can be a name of any existing root key or a path at format “[field_1][field_2]…”. |
“subscription_name” | No | String | “json_field” value | The name of a subscription |
{
"type": "string",
"json_fields": "[A][B]",
"subscription_name": "AB"
}
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | No | Map<Object> | None | Map of fields added to root Tuple |
“resource” | No | Map<Object> | None | Map of resources provided during punchlet execution |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<String> | None | The result json document |
Example¶
{
"type": "punch",
"component": "punch",
"settings": {
"punchlet_code": "{ root:[resource_1] = getResource(\"resource_1\", String.class); }",
"types": {
"A": "json"
},
"resources": {
"resource_1": "banana"
}
},
"subscribe": [
{ "component": "object_A", "field": "value", "name": "A" },
{ "component": "object_C", "field": "value", "name": "C" }
],
"publish": [
{ "field": "value" }
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/punch.json --spark-master local[*] --deploy-mode client
Vectorizer¶
introduction¶
Numerous machine-learning algorithms can be used to analyse textual variable as logs. However, these algorithms can only use numerical input data. It’s often not possible to use directly text feature without transforming them into numerical feature first.
The stage “com.thales.services.cloudomc.punchplatform.analytics.mllib.vectorizer.Vectorizer” is a stage for numerical features creation from character strings. By using it, you will be able to create a customized string-numeric transformation.
First of all, there are two distinct categories of character strings variables:
- With limited number of values: this kind of variable can be used without using Vectorizer but with a simple dichotomization (apply a “org.apache.spark.ml.feature.StringIndexer” and a “org.apache.spark.ml.feature.OneHotEncoder”).
- With unlimited or unknow number of distinct values: this variable cannot be dichotomized so Vectorizer is adapted.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“inputCol” | Yes | String | None | Name of input column containing the input textual variable |
“outputCol” | Yes | String | None | Name of vector output column |
“features” | No | List<Feature> | None | List of features to create |
Features¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“type” | Yes | String | None | Type of the feature |
“settings” | No | Json | None | Settings of the Feature |
Possible feature types are:
- “change”
- “check_pattern”
- “count”
- “distribution”
- “max_stream_length”
Feature Change¶
“change” feature counts the number of sequences of characters corresponding to a regex.
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“target” | Yes | String | None | Regex defining the type of character to count |
{
"type": "change",
"settings": {
"target": "[a-z]"
}
}
Feature Check Pattern¶
“check_pattern” allows to check if a text matches a regex pattern.
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“target” | Yes | String | None | Regex to test textual variable |
{
"type": "check_pattern",
"settings": {
"target": "^.*word.*$"
}
}
Feature Count¶
“count” feature counts the number of characters corresponding to a regex.
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“target” | Yes | String | None | Regex defining the type of character to count |
{
"type": "count",
"settings": {
"target": "[a-z]"
}
}
Feature Distribution¶
“distribution” feature computes distribution of characters defined by a regex, sort occurrence numbers and return n highest.
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“target” | Yes | String | None | Regex defining the type of character to count |
“n” | Yes | Integer | None | Number of distribution to return |
{
"type": "distribution",
"settings": {
"target": "[a-z]",
"n": 10
}
}
Feature Max Stream Length¶
“max_stream_length” feature compute the length of the n longest stream of user-defined type of characters.
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“target” | Yes | String | None | Regex defining the type of character to count |
“n” | Yes | Integer | None | Number of distribution to return |
{
"type": "max_stream_length",
"settings": {
"target": "[a-z]",
"n": 10
}
}
R¶
Note
From official R website: R is ‘GNU S’, a freely available language and environment for statistical computing and graphics which provides a wide variety of statistical and graphical techniques: linear and nonlinear modelling, statistical tests, time series analysis, classification, clustering, etc.
Installation¶
$ sudo apt-get install r-base r-base-dev
RStage¶

Warning
Developers only. This stage is not yet supported.
Most of the algortihms used in ML are implemented in R. That is why we created a connector.
The stage “com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage” is a stage that can run a R script on a stirng column.
Warning
The R stage does not conserve columns from input dataset.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“inputCol” | Yes | String | None | Name of string input column |
“outputCol” | Yes | String | None | Name of string output column |
“rCode” | No | String | “main <- identity” | R script code, override “rCodeFile” |
“rCodeFile” | No | String | “main <- identity” | R script code file readable from driver |
“functionName” | No | String | “main” | The name of the main function in the script driver |
“mode” | No | String | “json” | Mode defining how input data are read by R script (see below) |
“partitionBy” | No | List<String> | None | Shuffle data by the given columns. This means that two row sharing the same value for those columns are sent to the same R execution. |
“partitionNum” | No | Integer | None | Number of partitions in Dataset after shuffling |
Function¶
The provided main R function must accept one argument. Its expected type depends of “mode”.
main <- function(input) {
return input;
}
Mode¶
String¶
With the mode “string”, for each string value from the input column the main R function is executed once and must return a string.
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage",
"settings": {
"rCode": "main <- function(input) {return(nchar(input));}",
"inputCol": "input",
"outputCol": "output",
"mode": "string"
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/r_stage_string.json --spark-master local[*] --deploy-mode client
Json¶
With the mode “json”, for each string value from the input column the processing steps are:
- The string is interpreted as Json and parsed to a R list (equivalent to a Java map or python dictionary)
- The main R function is executed on this list and return an other list
- The result list is converted to a string at json format
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage",
"settings": {
"rCode": "main <- function(input) {input$AB <- input$A$B; return(input);}",
"inputCol": "input",
"outputCol": "output",
"mode": "json"
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/r_stage_json.json --spark-master local[*] --deploy-mode client
Partition Csv¶
With the mode “partition_csv”, for each partition of the dataset the processing steps are:
- Each string value of the column are interpreted as csv row and the partition is converted to a R dataframe.
- The main R function is executed on this dataframe and return an other dataframe
- Each row of the result dataframe is converted to a string at csv format
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.r.RStage",
"settings": {
"rCode": "main <- function(input) {input$V3 <- paste(input$V1, input$V2, sep=\"_\"); return(input);}",
"inputCol": "input_csv",
"outputCol": "output_csv",
"mode": "partition_csv"
}
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/r_stage_partition_csv.json --spark-master local[*] --deploy-mode client
Base¶
This section describes nodes functioning without external dependencies.
Object Generator Node¶
The node “object_generator” is a simple object publisher.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“value” | Yes | Object | None | Object to publish |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<Object> | None | The object from settings “value” |
Example¶
{
"type": "object_generator",
"component": "object",
"settings": {
"value": "hello"
},
"publish": [
{
"field": "value"
}
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/object_generator.json --spark-master local[*] --deploy-mode client
Print Node¶
The node “print” is a simple object printer.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“title” | No | String | “PRINT” | Text printed before value |
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | Yes | Object | None | Value to print |
Example¶
{
"type": "print",
"component": "print",
"settings": {
"title": "VALUE"
},
"subscribe": [
{
"component": "object",
"field": "value"
}
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/print.json --spark-master local[*] --deploy-mode client
Math¶
This section describes math nodes mainly used in documentation for graph concept presentation.
Number Generator Node¶
The node “number_generator” is a simple number publisher.
Settings¶
Parameter | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“value” | Yes | Double | None | Number to publish |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<Double> | None | The number from settings “value” |
Example¶
{
"type": "number_generator",
"component": "a",
"settings": {
"value": 1.0
},
"publish": [
{
"field": "value"
}
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_generator.json --spark-master local[*] --deploy-mode client
Number Addition Node¶
The node “number_addition” makes an addition with subscribed values.
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | No | List<Double> | None | Input values of addition |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<Double> | None | Result of addition |
Example¶
{
"type": "number_addition",
"component": "addition",
"subscribe": [
{
"component": "a",
"field": "value"
},
{
"component": "b",
"field": "value"
}
],
"publish": [
{
"field": "value"
}
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_addition.json --spark-master local[*] --deploy-mode client
Number Division Node¶
The node “number_division” makes a division with two subscribed values.
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
“numerator” | Yes | Double | None | Numerator value |
“denominator” | Yes | Double | None | Denominator value |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Singleton<Double> | None | Result of division |
Example¶
{
"type": "number_division",
"component": "division",
"subscribe": [
{
"component": "a",
"field": "value",
"tag": "numerator"
},
{
"component": "b",
"field": "value",
"tag": "denominator"
}
],
"publish": [
{
"field": "value"
}
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_division.json --spark-master local[*] --deploy-mode client
Number Formula Node¶
The node “number_formula” execute mathematic formulae on subscribed values.
Souscription¶
Tag | Mandatory | Type | Default | Comment |
---|---|---|---|---|
None | No | Map<Double> | None | Formula variables, souscription name is the variable name |
Publication¶
Tag | Condition | Type | Default | Comment |
---|---|---|---|---|
None | None | Map<Double> | None | Formula results, publication name is the name of variable created by the formula |
Example¶
{
"type": "number_formula",
"component": "formula",
"settings": {
"formulae": [
"c = a / b",
"d = c + 5"
]
},
"subscribe": [
{
"component": "a",
"field": "value",
"name": "a"
},
{
"component": "b",
"field": "value",
"name": "b"
}
],
"publish": [
{
"field": "value",
"name": "d"
}
]
}
You can test this example with the commands:
$ cd $PUNCHPLATFORM_CONF_DIR
$ punchplatform-analytics.sh --job resources/analytics/job/example/number_formula.json --spark-master local[*] --deploy-mode client