Job Uses Cases

In this chapter, we present job and plan uses cases.

Index Copy

../../../_images/analytics_copy.png

For a job copying at execution all the documents from an index to an other, you need a graph with two nodes:

  1. Elastic Batch Input Node creating a dataset from elastic
  2. Elastic Batch Output Node writting a dataset to elastic
[
        # This node creates a spark dataset pointing elastic data
        {
                "type": "elastic_batch_input",
                "component": "input",
                "settings": {
                        "resource": "mytenant-events-2017.11.30/log",
                        "cluster_name": "es_search",
                        "nodes": [ "localhost" ],
                        "column_id": "id",
                        "column_source": "source"
                },
                "publish": [
                        # The output spark dataset with two columns:
                        #   - "id" containing the document ids
                        #   - "source" containing the document sources
                        { "field": "data" }
                ]
        },
        # This writes a dataset into elastic
        {
                "type": "elastic_batch_output",
                "component": "output",
                "settings": {
                        "resource": "copy-mytenant-events-2017.11.30/log",
                        "cluster_name": "es_search",
                        "nodes": [ "localhost" ],
                        "column_id": "id",
                        "column_source": "source"
                },
                "subscribe": [
                        # The input dataset
                        { "component": "input", "field": "data" }
                ]
        }
]

Index Count

../../../_images/analytics_count.png

This part describe how to schedule a job counting documents in an elastic index and writting the result in an other.

For a job copying at execution all the documents from an index to an other, you need a graph with four nodes:

  1. Elastic Batch Input Node creating a dataset from elastic
  2. Count Node counting row in a dataset
  3. Punch Node creating the result json
  4. Elastic Output Node writting a json to elastic
[
        # This node creates a spark dataset pointing elastic data
        {
                "type": "elastic_batch_input",
                "component": "input",
                "settings": {
                        "resource": "mytenant-events-2017.11.30/log",
                        "cluster_name": "es_search",
                        "nodes": [ "localhost" ],
                        "query":        {
                                "range": {
                                        "ts": {
                                                "gte": "2017-12-05T10:00",
                                                "lte": "2017-12-05T11:00",
                                                "time_zone": "+01:00"
                                        }
                                }
                        }
                },
                "publish": [
                        # The output spark dataset has no column.
                        { "field": "data" }
                ]
        },
        # This node counts the number of rows in a spark dataset
        {
                "type": "count",
                "component": "count",
                "subscribe": [
                        # The input dataset
                        { "component": "input", "field": "data" }
                ],
                "publish": [
                        # The number of rows in long format
                        { "field": "value" }
                ]
        },
        # This node creates the result json document
        # By default, without code, a Punch node concatenates
        # inputs into a json
        {
                "type": "punch",
                "component": "punch",
                "settings": {
                        # The initial json
                        "root": {
                                "from": "2017-12-05T10:00",
                                "to": "2017-12-05T11:00",
                                "index": "mytenant-events-2017.11.30/log"
                        }
                },
                "subscribe": [
                        # The number of documents added in the result json
                        # in the field "count"
                        { "component": "count", "field": "value", "name": "count" }
                ],
                "publish": [
                        # The result json
                        { "field": "value" }
                ]
        },
        # This node writes the result json in elastic
        {
                "type": "elastic_output",
                "component": "output",
                "settings": {
                        "index": "count-mytenant-events-2017.11.30",
                        "type": "count",
                        "nodes": [ "localhost" ]
                },
                "subscribe": [
                        # The result json
                        { "component": "punch", "field": "value" }
                ]
        }
]

Suspect URL Detection

../../../_images/analytics_suspect_url.png

This part presents how to train a suspect url detection model based on K-Means and how to apply it with PunchPlatform ML features.

Statistical Method

Giving a set of urls, we would like to detect anormal ones. Anormal means which has some rare characteristic in comparison with other urls. So we need a process that associate to each url an anomaly score.

../../../_images/analytics_suspect_url_graph.png

The statistical method used to detect suspect url is:

  • Get dataset of url from an Elasticsearch index.
  • For each url, compute a numerical vector based on a list of feature (length, number of “/”, letters distribution, …).
  • Apply a K-Means on these vectors.
  • For each vector, compute the distance between it and the nearest K-Means cluster center.
  • Write results in elasticsearch index.

Note

K-Means clustering is a method of vector quantization, originally from signal processing, that is popular for cluster analysis in data mining. k-means clustering aims to partition n observations into k clusters in which each observation belongs to the cluster with the nearest mean, serving as a prototype of the cluster. This results in a partitioning of the data space into Voronoi cells.

Input Data

In order to avoid overfitting, we use two dataset recovered by two Elastic Batch Input Node:

  • A “input_fit” dataset used to train the K-Means algorithm.
  • A “input_transform” dataset on which we apply the trained model.

Those dataset contain different time bucket of documents from the same elastic index:

{
        "type": "elastic_batch_input",
        "component": "input_fit",
        "settings": {
                "resource": "mytenant-events-2017.11.30/log",
                "cluster_name": "es_search",
                "nodes": [ "localhost" ],
                "column_id": "id",
                "column_source": "source",
                "query":        {
                        "range": {
                                "ts": {
                                        "gte": "2017-12-05T10:00",
                                        "lte": "2017-12-05T11:00",
                                        "time_zone": "+01:00"
                                }
                        }
                }
        },
        "publish": [
                { "field": "data" }
        ]
},
{
        "type": "elastic_batch_input",
        "component": "input_transform",
        "settings": {
                "resource": "mytenant-events-2017.11.30/log",
                "cluster_name": "es_search",
                "nodes": [ "localhost" ],
                "column_id": "id",
                "column_source": "source",
                "query":        {
                        "range": {
                                "ts": {
                                        "gte": "2017-12-05T11:00",
                                        "lte": "2017-12-05T12:00",
                                        "time_zone": "+01:00"
                                }
                        }
                }
        },
        "publish": [
                { "field": "data" }
        ]
}

Pipeline

../../../_images/analytics_suspect_url_pipeline.png

The spark pipeline to train is composed of three stages:

This pipeline is set in an Mllib Node subscribing to both Elastic Batch Input Node:

{
        "type": "mllib",
        "component": "pipeline",
        "settings": {
                "pipeline": [
                        {
                                "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.jackson.JsonOutputStage",
                                "settings": {
                                        "inputColumn": "source",
                                        "outputFields": [
                                                {
                                                        "type": "string",
                                                        "json_field": "[target][uri][urn]",
                                                        "dataset_column": "url"
                                                }
                                        ]
                                }
                        },
                        {
                                "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.vectorizer.VectorizerStage",
                                "settings": {
                                        "inputCol": "url",
                                        "outputCol": "features",
                                        "features": [
                                                { "type": "count", "settings": { "regex": "/" } },
                                                { "type": "distribution", "settings": { "regex": "[.]", "n": 10 } },
                                                { "type": "max_stream_length", "settings": { "regex": "[^A-Za-z0-9]", "n": 1 } },
                                                { "type": "check_pattern", "settings": { "regex": "^[0-9].*$" } },
                                                { "type": "check_pattern", "settings": { "regex": "^.*[^A-Za-z0-9].*$" } }
                                        ]
                                }
                        },
                        {
                                "type": "org.apache.spark.ml.clustering.KMeans",
                                "settings": { "k": 10 }
                        },
                        {
                                "type": "com.thales.services.cloudomc.punchplatform.analytics.plugins.jackson.JsonInputStage",
                                "settings": {
                                        "inputFields": [
                                                {
                                                        "type": "json",
                                                        "dataset_column": "source"
                                                },
                                                {
                                                        "type": "vector",
                                                        "json_field": "[detection][features]",
                                                        "dataset_column": "features"
                                                },
                                                {
                                                        "type": "integer",
                                                        "json_field": "[detection][prediction]",
                                                        "dataset_column": "prediction"
                                                }
                                        ],
                                        "outputColumn": "output"
                                }
                        }
                ]
        },
        "subscribe": [
                { "component": "input_fit", "field": "data", "tag": "input_fit" },
                { "component": "input_transform", "field": "data", "tag": "input_transform" }
        ],
        "publish": [
                { "field": "data", "tag": "output_transform" },
                { "field": "model", "tag": "model" }
        ]
}

Post Processing

The above pipeline just associates a group number to each url but we need the distance from feature vector to group centers. Those group centers are present in the fitted pipeline published by the Mllib Node node in the “model” field.

So we can use a Punch Batch Node node to convert group number into distance anomaly score:

{
        "type": "punch_batch",
        "component": "post_processing",
        "settings": {
                "punchlet_code": "{{punch_code}}",
                "input_column": "output",
                "output_column": "post_processing"
        },
        "subscribe": [
                { "component": "pipeline", "field": "data" },
                { "component": "pipeline", "field": "model", "tag": "resource", "name": "model" }
        ],
        "publish": [
                { "field": "data" }
        ]
}

Where “{{punch_code}}” is:

{

        // Getting the fitted model
        PipelineModel pipelineModel = getResource(\"model\", PipelineModel.class);

        // Extraction of KMeans model
        KMeansModel kMeans = (KMeansModel) pipelineModel.stages()[2];

        // Getting url group number
        int cluster = root:[detection][prediction];

        // Getting associated KMeans center
        org.apache.spark.ml.linalg.Vector center = kMeans.clusterCenters()[cluster];

        // Conversion of json array to Vector
        List<Tuple> listTuple = root:[detection][features].asArray();
        double[] arrayDouble = new double[listTuple.size()];
        for(int i = 0; i < listTuple.size(); i++) {
                arrayDouble[i] = listTuple.get(i).asDouble();
        }
        org.apache.spark.ml.linalg.Vector vector = new DenseVector(arrayDouble);

        // Computation of distance
        Double distance = Vectors.sqdist(center, vector);

        // Add distance to output json
        root:[output][detection][distance] = distance;

}

Output Data

Finally, an Elastic Batch Output Node node is used to write result to Elasticsearch. This node will write the column “id” as the document id and the column “output” as the document source.

{
        "type": "elastic_batch_output",
        "component": "output",
        "settings": {
                "resource": "result-mytenant-events-2017.11.30/log",
                "cluster_name": "es_search",
                "nodes": [ "localhost" ],
                "column_id": "id",
                "column_source": "post_processing"
        },
        "subscribe": [
                { "component": "post_processing", "field": "data" }
        ]
}