Use Cases
This chapter gives some concrete examples to work with PML.
Elasticsearch Index Copy¶
For a job copying all the documents from an index to an other, you need a graph with two nodes:
elastic_batch_input_node
creating a dataset from elasticelastic_batch_output_node
writing a dataset to elastic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | [ # This node creates a spark dataset pointing elastic data { "type": "elastic_batch_input", "component": "input", "settings": { "index": "mytenant-events-2017.11.30", "type": "doc", "cluster_name": "es_search", "nodes": [ "localhost" ], "column_id": "id", "column_source": "source" }, "publish": [ # The output spark dataset with two columns: # - "id" containing the document ids # - "source" containing the document sources { "field": "data" } ] }, # This writes a dataset into elastic { "type": "elastic_batch_output", "component": "output", "settings": { "index": "copy-mytenant-events-2017.11.30", "type": "doc", "cluster_name": "es_search", "nodes": [ "localhost" ], "column_id": "id", "column_source": "source" }, "subscribe": [ # The input dataset { "component": "input", "field": "data" } ] } ] |
Index Count¶
This part describe how to schedule a job counting documents in an elastic index and writing the result in an other.
You need a graph with four nodes:
elastic_batch_input_node
creating a dataset from elasticcount_node
to count rows in a datasetpunch_node
to create the result jsonelastic_output_node
to write that result back to elasticsearch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | [ # This node creates a spark dataset pointing elastic data { "type": "elastic_batch_input", "component": "input", "settings": { "index": "mytenant-events-2017.11.30", "type": "doc", "cluster_name": "es_search", "nodes": [ "localhost" ], "query": { "range": { "ts": { "gte": "2017-12-05T10:00", "lte": "2017-12-05T11:00", "time_zone": "+01:00" } } } }, "publish": [ # The output spark dataset has no column. { "field": "data" } ] }, # This node counts the number of rows in a spark dataset { "type": "count", "component": "count", "subscribe": [ # The input dataset { "component": "input", "field": "data" } ], "publish": [ # The number of rows in long format { "field": "value" } ] }, # This node creates the result json document # By default, without code, a Punch node concatenates # inputs into a json { "type": "punch", "component": "punch", "settings": { # The initial json "root": { "from": "2017-12-05T10:00", "to": "2017-12-05T11:00", "index": "mytenant-events-2017.11.30/log" } }, "subscribe": [ # The number of documents added in the result json # in the field "count" { "component": "count", "field": "value", "name": "count" } ], "publish": [ # The result json { "field": "value" } ] }, # This node writes the result json in elastic { "type": "elastic_output", "component": "output", "settings": { "index": "count-mytenant-events-2017.11.30", "type": "count", "nodes": [ "localhost" ] }, "subscribe": [ # The result json { "component": "punch", "field": "value" } ] } ] |
Suspect URL Detection¶
This part presents how to train a suspect url detection model based on K-Means and how to apply it with PunchPlatform ML features.
Statistical Method¶
Giving a set of urls, we would like to detect anomalous ones, i.e. having some rare characteristics compared to the majority. What we need is a process that associates to each url to an anomaly score.
The overall process to detect suspect url is the following:
- Read a url dataset from a given time range in Elasticsearch.
- For each url, compute a numerical vector based on a list of feature (length, number of , letters distribution, ...).
- Apply a K-Means on these vectors.
- For each vector, compute the distance between itself and the nearest K-Means cluster center.
- Write the results back into elasticsearch.
Note
K-Means clustering is a method of vector quantization, originally from signal processing, that is popular for cluster analysis in data mining. k-means clustering aims to partition n observations into k clusters in which each observation belongs to the cluster with the nearest mean, serving as a prototype of the cluster. This results in a partitioning of the data space into Voronoi cells.
Input Data¶
In order to avoid
overfitting, we use two
distinct datasets. Each is read using a dedicated
elastic_batch_input_node
:
- One dataset is used to train the K-Means algorithm.
- One dataset is submitted to the the trained model.
These datasets contain different time buckets of documents from the same elastic index. I.e. we train on data between 10:00 and 11:00, and we transform on data between 11:00 and 12:00.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | { "type": "elastic_batch_input", "component": "input_fit", "settings": { "index": "mytenant-events-2017.11.30", "cluster_name": "es_search", "nodes": [ "localhost" ], "column_id": "id", "column_source": "source", "query": { "range": { "ts": { "gte": "2017-12-05T10:00", "lte": "2017-12-05T11:00", "time_zone": "+01:00" } } } }, "publish": [ { "field": "data" } ] }, { "type": "elastic_batch_input", "component": "input_transform", "settings": { "index": "mytenant-events-2017.11.30", "cluster_name": "es_search", "nodes": [ "localhost" ], "column_id": "id", "column_source": "source", "query": { "range": { "ts": { "gte": "2017-12-05T11:00", "lte": "2017-12-05T12:00", "time_zone": "+01:00" } } } }, "publish": [ { "field": "data" } ] } |
Pipeline¶
The spark pipeline to train is composed of three stages:
- A
json_input_stage
to extract the url from the elastic document. - A
vectorizer_stage
to create feature vector. - A KMeans Stage, the model used.
- A
json_output_stage
to create a json with model result.
This pipeline is set in an mllib_node
subscribing to both elastic_batch_input_node
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | { "type": "mllib", "component": "pipeline", "settings": { "pipeline": [ { "type": "org.thales.punch.ml.plugins.jackson.JsonOutputStage", "settings": { "inputColumn": "source", "outputFields": [ { "type": "string", "json_field": "[target][uri][urn]", "dataset_column": "url" } ] } }, { "type": "org.thales.punch.ml.plugins.vectorizer.VectorizerStage", "settings": { "inputCol": "url", "outputCol": "features", "features": [ { "type": "count", "settings": { "regex": "/" } }, { "type": "distribution", "settings": { "regex": "[.]", "n": 10 } }, { "type": "max_stream_length", "settings": { "regex": "[^A-Za-z0-9]", "n": 1 } }, { "type": "check_pattern", "settings": { "regex": "^[0-9].*$" } }, { "type": "check_pattern", "settings": { "regex": "^.*[^A-Za-z0-9].*$" } } ] } }, { "type": "org.apache.spark.ml.clustering.KMeans", "settings": { "k": 10 } }, { "type": "org.thales.punch.ml.plugins.jackson.JsonInputStage", "settings": { "inputFields": [ { "type": "json", "dataset_column": "source" }, { "type": "vector", "json_field": "[detection][features]", "dataset_column": "features" }, { "type": "integer", "json_field": "[detection][prediction]", "dataset_column": "prediction" } ], "outputColumn": "output" } } ] }, "subscribe": [ { "component": "input_fit", "field": "data", "tag": "input_fit" }, { "component": "input_transform", "field": "data", "tag": "input_transform" } ], "publish": [ { "field": "data", "tag": "output_transform" }, { "field": "model", "tag": "model" } ] } |
Post Processing¶
The above pipeline just associates a group number to each url but we
need the distance from feature vector to group centers. Those group
centers are present in the fitted pipeline published by the
mllib_node
node in the model field.
So we can use a punch_batch_node
node to
convert group number into distance anomaly score:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | { "type": "punch_batch", "component": "post_processing", "settings": { "punchlet_code": "{{punch_code}}", "input_column": "output", "output_column": "post_processing" }, "subscribe": [ { "component": "pipeline", "field": "data" }, { "component": "pipeline", "field": "model", "tag": "resource", "name": "model" } ], "publish": [ { "field": "data" } ] } |
Where punch_code is:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | { // Getting the fitted model PipelineModel pipelineModel = getResource(, PipelineModel.class); // Extraction of KMeans model KMeansModel kMeans = (KMeansModel) pipelineModel.stages()[2]; // Getting url group number int cluster = root:[detection][prediction]; // Getting associated KMeans center org.apache.spark.ml.linalg.Vector center = kMeans.clusterCenters()[cluster]; // Conversion of json array to Vector List<Tuple> listTuple = root:[detection][features].asArray(); double[] arrayDouble = new double[listTuple.size()]; for(int i = 0; i < listTuple.size(); i++) { arrayDouble[i] = listTuple.get(i).asDouble(); } org.apache.spark.ml.linalg.Vector vector = new DenseVector(arrayDouble); // Computation of distance Double distance = Vectors.sqdist(center, vector); // Add distance to output json root:[output][detection][distance] = distance; } |
Output Data¶
Finally, an elastic_batch_output_node
node is used to write result to Elasticsearch. This node will write the
column as the
document source.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | { "type": "elastic_batch_output", "component": "output", "settings": { "resource": "result-mytenant-events-2017.11.30/log", "cluster_name": "es_search", "nodes": [ "localhost" ], "column_id": "id", "column_source": "post_processing" }, "subscribe": [ { "component": "post_processing", "field": "data" } ] } |