Skip to content

Use Cases

This chapter gives some concrete examples to work with PML.

Elasticsearch Index Copy

image

For a job copying all the documents from an index to an other, you need a graph with two nodes:

  1. elastic_batch_input_node creating a dataset from elastic
  2. elastic_batch_output_node writting a dataset to elastic
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[
    # This node creates a spark dataset pointing elastic data
    {
        "type": "elastic_batch_input",
        "component": "input",
        "settings": {
            "index": "mytenant-events-2017.11.30",
            "type": "doc",
            "cluster_name": "es_search",
            "nodes": [ "localhost" ],
            "column_id": "id",
            "column_source": "source"
        },
        "publish": [
            # The output spark dataset with two columns:
            #   - "id" containing the document ids
            #   - "source" containing the document sources
            { "field": "data" }
        ]
    },
    # This writes a dataset into elastic
    {
        "type": "elastic_batch_output",
        "component": "output",
        "settings": {
            "index": "copy-mytenant-events-2017.11.30",
            "type": "doc",
            "cluster_name": "es_search",
            "nodes": [ "localhost" ],
            "column_id": "id",
            "column_source": "source"
        },
        "subscribe": [
            # The input dataset
            { "component": "input", "field": "data" }
        ]
    }
]

Index Count

image

This part describe how to schedule a job counting documents in an elastic index and writting the result in an other.

You need a graph with four nodes:

  1. elastic_batch_input_node creating a dataset from elastic
  2. count_node to count rows in a dataset
  3. punch_node to create the result json
  4. elastic_output_node to write that result back to elasticsearch
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
[
    # This node creates a spark dataset pointing elastic data
    {
        "type": "elastic_batch_input",
        "component": "input",
        "settings": {
            "index": "mytenant-events-2017.11.30",
            "type": "doc",
            "cluster_name": "es_search",
            "nodes": [ "localhost" ],
            "query":    {
                "range": {
                    "ts": {
                        "gte": "2017-12-05T10:00",
                        "lte": "2017-12-05T11:00",
                        "time_zone": "+01:00"
                    }
                }
            }
        },
        "publish": [
            # The output spark dataset has no column.
            { "field": "data" }
        ]
    },
    # This node counts the number of rows in a spark dataset
    {
        "type": "count",
        "component": "count",
        "subscribe": [
            # The input dataset
            { "component": "input", "field": "data" }
        ],
        "publish": [
            # The number of rows in long format
            { "field": "value" }
        ]
    },
    # This node creates the result json document
    # By default, without code, a Punch node concatenates
    # inputs into a json
    {
        "type": "punch",
        "component": "punch",
        "settings": {
            # The initial json
            "root": {
                "from": "2017-12-05T10:00",
                "to": "2017-12-05T11:00",
                "index": "mytenant-events-2017.11.30/log"
            }
        },
        "subscribe": [
            # The number of documents added in the result json
            # in the field "count"
            { "component": "count", "field": "value", "name": "count" }
        ],
        "publish": [
            # The result json
            { "field": "value" }
        ]
    },
    # This node writes the result json in elastic
    {
        "type": "elastic_output",
        "component": "output",
        "settings": {
            "index": "count-mytenant-events-2017.11.30",
            "type": "count",
            "nodes": [ "localhost" ]
        },
        "subscribe": [
            # The result json
            { "component": "punch", "field": "value" }
        ]
    }
]

Suspect URL Detection

image

This part presents how to train a suspect url detection model based on K-Means and how to apply it with PunchPlatform ML features.

Statistical Method

Giving a set of urls, we would like to detect anormal ones, i.e. having some rare characteristics compared to the majority. What we need is a process that associates to each url to an anomaly score.

image

The overall process to detect suspect url is the following:

  • Read a url dataset from a given time range in Elasticsearch.
  • For each url, compute a numerical vector based on a list of feature (length, number of , letters distribution, ...).
  • Apply a K-Means on these vectors.
  • For each vector, compute the distance between itself and the nearest K-Means cluster center.
  • Write the results back into elasticsearch.

Note

K-Means clustering is a method of vector quantization, originally from signal processing, that is popular for cluster analysis in data mining. k-means clustering aims to partition n observations into k clusters in which each observation belongs to the cluster with the nearest mean, serving as a prototype of the cluster. This results in a partitioning of the data space into Voronoi cells.

Input Data

In order to avoid overfitting, we use two distinct datasets. Each is read using a dedicated elastic_batch_input_node:

  • One dataset is used to train the K-Means algorithm.
  • One dataset is submitted to the the trained model.

These datasets contain different time buckets of documents from the same elastic index. I.e. we train on data between 10:00 and 11:00, and we transform on data between 11:00 and 12:00.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
    "type": "elastic_batch_input",
    "component": "input_fit",
    "settings": {
        "index": "mytenant-events-2017.11.30",
        "cluster_name": "es_search",
        "nodes": [ "localhost" ],
        "column_id": "id",
        "column_source": "source",
        "query":    {
            "range": {
                "ts": {
                    "gte": "2017-12-05T10:00",
                    "lte": "2017-12-05T11:00",
                    "time_zone": "+01:00"
                }
            }
        }
    },
    "publish": [
        { "field": "data" }
    ]
},
{
    "type": "elastic_batch_input",
    "component": "input_transform",
    "settings": {
        "index": "mytenant-events-2017.11.30",
        "cluster_name": "es_search",
        "nodes": [ "localhost" ],
        "column_id": "id",
        "column_source": "source",
        "query":    {
            "range": {
                "ts": {
                    "gte": "2017-12-05T11:00",
                    "lte": "2017-12-05T12:00",
                    "time_zone": "+01:00"
                }
            }
        }
    },
    "publish": [
        { "field": "data" }
    ]
}

Pipeline

image

The spark pipeline to train is composed of three stages:

  • A json_input_stage to extract the url from the elastic document.
  • A vectorizer_stage to create feature vector.
  • A KMeans Stage, the model used.
  • A json_output_stage to create a json with model result.

This pipeline is set in an mllib_node subscribing to both elastic_batch_input_node:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
{
    "type": "mllib",
    "component": "pipeline",
    "settings": {
        "pipeline": [
            {
                "type": "org.thales.punch.ml.plugins.jackson.JsonOutputStage",
                "settings": {
                    "inputColumn": "source",
                    "outputFields": [
                        {
                            "type": "string",
                            "json_field": "[target][uri][urn]",
                            "dataset_column": "url"
                        }
                    ]
                }
            },
            {
                "type": "org.thales.punch.ml.plugins.vectorizer.VectorizerStage",
                "settings": {
                    "inputCol": "url",
                    "outputCol": "features",
                    "features": [
                        { "type": "count", "settings": { "regex": "/" } },
                        { "type": "distribution", "settings": { "regex": "[.]", "n": 10 } },
                        { "type": "max_stream_length", "settings": { "regex": "[^A-Za-z0-9]", "n": 1 } },
                        { "type": "check_pattern", "settings": { "regex": "^[0-9].*$" } },
                        { "type": "check_pattern", "settings": { "regex": "^.*[^A-Za-z0-9].*$" } }
                    ]
                }
            },
            {
                "type": "org.apache.spark.ml.clustering.KMeans",
                "settings": { "k": 10 }
            },
            {
                "type": "org.thales.punch.ml.plugins.jackson.JsonInputStage",
                "settings": {
                    "inputFields": [
                        {
                            "type": "json",
                            "dataset_column": "source"
                        },
                        {
                            "type": "vector",
                            "json_field": "[detection][features]",
                            "dataset_column": "features"
                        },
                        {
                            "type": "integer",
                            "json_field": "[detection][prediction]",
                            "dataset_column": "prediction"
                        }
                    ],
                    "outputColumn": "output"
                }
            }
        ]
    },
    "subscribe": [
        { "component": "input_fit", "field": "data", "tag": "input_fit" },
        { "component": "input_transform", "field": "data", "tag": "input_transform" }
    ],
    "publish": [
        { "field": "data", "tag": "output_transform" },
        { "field": "model", "tag": "model" }
    ]
}

Post Processing

The above pipeline just associates a group number to each url but we need the distance from feature vector to group centers. Those group centers are present in the fitted pipeline published by the mllib_node node in the model field.

So we can use a punch_batch_node node to convert group number into distance anomaly score:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
    "type": "punch_batch",
    "component": "post_processing",
    "settings": {
        "punchlet_code": "{{punch_code}}",
        "input_column": "output",
        "output_column": "post_processing"
    },
    "subscribe": [
        { "component": "pipeline", "field": "data" },
        { "component": "pipeline", "field": "model", "tag": "resource", "name": "model" }
    ],
    "publish": [
        { "field": "data" }
    ]
}

Where is:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{

    // Getting the fitted model
    PipelineModel pipelineModel = getResource(, PipelineModel.class);

    // Extraction of KMeans model
    KMeansModel kMeans = (KMeansModel) pipelineModel.stages()[2];

    // Getting url group number 
    int cluster = root:[detection][prediction]; 

    // Getting associated KMeans center
    org.apache.spark.ml.linalg.Vector center = kMeans.clusterCenters()[cluster]; 

    // Conversion of json array to Vector
    List<Tuple> listTuple = root:[detection][features].asArray(); 
    double[] arrayDouble = new double[listTuple.size()]; 
    for(int i = 0; i < listTuple.size(); i++) { 
        arrayDouble[i] = listTuple.get(i).asDouble(); 
    } 
    org.apache.spark.ml.linalg.Vector vector = new DenseVector(arrayDouble); 

    // Computation of distance
    Double distance = Vectors.sqdist(center, vector); 

    // Add distance to output json
    root:[output][detection][distance] = distance;

}

Output Data

Finally, an elastic_batch_output_node node is used to write result to Elasticsearch. This node will write the column as the document source.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
    "type": "elastic_batch_output",
    "component": "output",
    "settings": {
        "resource": "result-mytenant-events-2017.11.30/log",
        "cluster_name": "es_search",
        "nodes": [ "localhost" ],
        "column_id": "id",
        "column_source": "post_processing"
    },
    "subscribe": [
        { "component": "post_processing", "field": "data" }
    ]
}