Job Uses Cases¶
Content
In this chapter, we present job and plan uses cases.
Index Copy¶

For a job copying at execution all the documents from an index to an other, you need a graph with two nodes:
- Elastic Batch Input Node creating a dataset from elastic
- Elastic Batch Output Node writting a dataset to elastic
[
# This node creates a spark dataset pointing elastic data
{
"type": "elastic_batch_input",
"component": "input",
"settings": {
"resource": "mytenant-events-2017.11.30/log",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"column_id": "id",
"column_source": "source"
},
"publish": [
# The output spark dataset with two columns:
# - "id" containing the document ids
# - "source" containing the document sources
{ "field": "data" }
]
},
# This writes a dataset into elastic
{
"type": "elastic_batch_output",
"component": "output",
"settings": {
"resource": "copy-mytenant-events-2017.11.30/log",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"column_id": "id",
"column_source": "source"
},
"subscribe": [
# The input dataset
{ "component": "input", "field": "data" }
]
}
]
Index Count¶

This part describe how to schedule a job counting documents in an elastic index and writting the result in an other.
For a job copying at execution all the documents from an index to an other, you need a graph with four nodes:
- Elastic Batch Input Node creating a dataset from elastic
- Count Node counting row in a dataset
- Punch Node creating the result json
- Elastic Output Node writting a json to elastic
[
# This node creates a spark dataset pointing elastic data
{
"type": "elastic_batch_input",
"component": "input",
"settings": {
"resource": "mytenant-events-2017.11.30/log",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"query": {
"range": {
"ts": {
"gte": "2017-12-05T10:00",
"lte": "2017-12-05T11:00",
"time_zone": "+01:00"
}
}
}
},
"publish": [
# The output spark dataset has no column.
{ "field": "data" }
]
},
# This node counts the number of rows in a spark dataset
{
"type": "count",
"component": "count",
"subscribe": [
# The input dataset
{ "component": "input", "field": "data" }
],
"publish": [
# The number of rows in long format
{ "field": "value" }
]
},
# This node creates the result json document
# By default, without code, a Punch node concatenates
# inputs into a json
{
"type": "punch",
"component": "punch",
"settings": {
# The initial json
"root": {
"from": "2017-12-05T10:00",
"to": "2017-12-05T11:00",
"index": "mytenant-events-2017.11.30/log"
}
},
"subscribe": [
# The number of documents added in the result json
# in the field "count"
{ "component": "count", "field": "value", "name": "count" }
],
"publish": [
# The result json
{ "field": "value" }
]
},
# This node writes the result json in elastic
{
"type": "elastic_output",
"component": "output",
"settings": {
"index": "count-mytenant-events-2017.11.30",
"type": "count",
"nodes": [ "localhost" ]
},
"subscribe": [
# The result json
{ "component": "punch", "field": "value" }
]
}
]
Suspect URL Detection¶

This part presents how to train a suspect url detection model based on K-Means and how to apply it with PunchPlatform ML features.
Statistical Method¶
Giving a set of urls, we would like to detect anormal ones. Anormal means which has some rare characteristic in comparison with other urls. So we need a process that associate to each url an anomaly score.

The statistical method used to detect suspect url is:
- Get dataset of url from an Elasticsearch index.
- For each url, compute a numerical vector based on a list of feature (length, number of “/”, letters distribution, …).
- Apply a K-Means on these vectors.
- For each vector, compute the distance between it and the nearest K-Means cluster center.
- Write results in elasticsearch index.
Note
K-Means clustering is a method of vector quantization, originally from signal processing, that is popular for cluster analysis in data mining. k-means clustering aims to partition n observations into k clusters in which each observation belongs to the cluster with the nearest mean, serving as a prototype of the cluster. This results in a partitioning of the data space into Voronoi cells.
Input Data¶
In order to avoid overfitting, we use two dataset recovered by two Elastic Batch Input Node:
- A “input_fit” dataset used to train the K-Means algorithm.
- A “input_transform” dataset on which we apply the trained model.
Those dataset contain different time bucket of documents from the same elastic index:
{
"type": "elastic_batch_input",
"component": "input_fit",
"settings": {
"resource": "mytenant-events-2017.11.30/log",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"column_id": "id",
"column_source": "source",
"query": {
"range": {
"ts": {
"gte": "2017-12-05T10:00",
"lte": "2017-12-05T11:00",
"time_zone": "+01:00"
}
}
}
},
"publish": [
{ "field": "data" }
]
},
{
"type": "elastic_batch_input",
"component": "input_transform",
"settings": {
"resource": "mytenant-events-2017.11.30/log",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"column_id": "id",
"column_source": "source",
"query": {
"range": {
"ts": {
"gte": "2017-12-05T11:00",
"lte": "2017-12-05T12:00",
"time_zone": "+01:00"
}
}
}
},
"publish": [
{ "field": "data" }
]
}
Pipeline¶

The spark pipeline to train is composed of three stages:
- A Json Input Stage to extract the url from the elastic document.
- A Vectorizer to create feature vector.
- A KMeans Stage, the model used.
- A Json Output Stage to create a json with model result.
This pipeline is set in an Mllib Node subscribing to both Elastic Batch Input Node:
{
"type": "mllib",
"component": "pipeline",
"settings": {
"pipeline": [
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.jackson.JsonOutputStage",
"settings": {
"inputColumn": "source",
"outputFields": [
{
"type": "string",
"json_field": "[target][uri][urn]",
"dataset_column": "url"
}
]
}
},
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.vectorizer.VectorizerStage",
"settings": {
"inputCol": "url",
"outputCol": "features",
"features": [
{ "type": "count", "settings": { "regex": "/" } },
{ "type": "distribution", "settings": { "regex": "[.]", "n": 10 } },
{ "type": "max_stream_length", "settings": { "regex": "[^A-Za-z0-9]", "n": 1 } },
{ "type": "check_pattern", "settings": { "regex": "^[0-9].*$" } },
{ "type": "check_pattern", "settings": { "regex": "^.*[^A-Za-z0-9].*$" } }
]
}
},
{
"type": "org.apache.spark.ml.clustering.KMeans",
"settings": { "k": 10 }
},
{
"type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.jackson.JsonInputStage",
"settings": {
"inputFields": [
{
"type": "json",
"dataset_column": "source"
},
{
"type": "vector",
"json_field": "[detection][features]",
"dataset_column": "features"
},
{
"type": "integer",
"json_field": "[detection][prediction]",
"dataset_column": "prediction"
}
],
"outputColumn": "output"
}
}
]
},
"subscribe": [
{ "component": "input_fit", "field": "data", "tag": "input_fit" },
{ "component": "input_transform", "field": "data", "tag": "input_transform" }
],
"publish": [
{ "field": "data", "tag": "output_transform" },
{ "field": "model", "tag": "model" }
]
}
Post Processing¶
The above pipeline just associates a group number to each url but we need the distance from feature vector to group centers. Those group centers are present in the fitted pipeline published by the Mllib Node node in the “model” field.
So we can use a Punch Batch Node node to convert group number into distance anomaly score:
{
"type": "punch_batch",
"component": "post_processing",
"settings": {
"punchlet_code": "{{punch_code}}",
"input_column": "output",
"output_column": "post_processing"
},
"subscribe": [
{ "component": "pipeline", "field": "data" },
{ "component": "pipeline", "field": "model", "tag": "resource", "name": "model" }
],
"publish": [
{ "field": "data" }
]
}
Where “{{punch_code}}” is:
{
// Getting the fitted model
PipelineModel pipelineModel = getResource(\"model\", PipelineModel.class);
// Extraction of KMeans model
KMeansModel kMeans = (KMeansModel) pipelineModel.stages()[2];
// Getting url group number
int cluster = root:[detection][prediction];
// Getting associated KMeans center
org.apache.spark.ml.linalg.Vector center = kMeans.clusterCenters()[cluster];
// Conversion of json array to Vector
List<Tuple> listTuple = root:[detection][features].asArray();
double[] arrayDouble = new double[listTuple.size()];
for(int i = 0; i < listTuple.size(); i++) {
arrayDouble[i] = listTuple.get(i).asDouble();
}
org.apache.spark.ml.linalg.Vector vector = new DenseVector(arrayDouble);
// Computation of distance
Double distance = Vectors.sqdist(center, vector);
// Add distance to output json
root:[output][detection][distance] = distance;
}
Output Data¶
Finally, an Elastic Batch Output Node node is used to write result to Elasticsearch. This node will write the column “id” as the document id and the column “output” as the document source.
{
"type": "elastic_batch_output",
"component": "output",
"settings": {
"resource": "result-mytenant-events-2017.11.30/log",
"cluster_name": "es_search",
"nodes": [ "localhost" ],
"column_id": "id",
"column_source": "post_processing"
},
"subscribe": [
{ "component": "post_processing", "field": "data" }
]
}