Skip to content

HOWTO run a small change between two elasticsearch

Why do that

With Punchplatform, you have usually data in an Elasticsearch Cluster.

Elasticsearch provides a way to reindex data when mapping must be updated. For instance: Long to Date, or String not analyzed to analyzed String.

Sometimes, you need to update the data itself, that's why punchplatform provides a way to update the data itself. For instance: change timestamps or enrich data.

Update timestamp - use case

Use case description:

Add a year to a specific timestamp field on a subset of data.

STEP 0 - Identify data

Identify the subset of data with Kibana:

  • timestamp range
  • type of logs
  • indices name

Then, read the data selectd, with an Elasticsearch Spout and print the result.

The following example show an extraction of logs of all tenants logs from 2018/01/12 9h to 2018/01/13 18h with the timestamp of the reporter (rep.ts). You can start in foreground the following topology:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
{
  "tenant" : "mytenant",
  "channel" : "replay_es",
  "name" : "single",
  "meta" : {
    "tenant" : "mytenant",
    "channel" : "replay_es",
    "vendor" : "replay_es"
  },
  "spouts" : [
     {
          "type" : "elasticsearch_spout",
          "spout_settings" : {
            "cluster_id" : "es_search",
            "index" : "events-*",
            "query": "?q=_type:log",
            "es.index.read.missing.as.empty": true
          },
          "storm_settings" : {
            "component" : "extractor_spout"
          }
      }
  ],
  "bolts" : [
    {
      "type" : "punch_bolt",
      "bolt_settings" : {    
        "punchlet_code": "{print(root);}"
      },
      "storm_settings" : {
        "executors": 1,
        "component" : "punch_bolt",
        "publish" : [ 
          { 
            "stream" : "logs", 
            "fields" : [
              "log",
              "es_type",
              "log_id",
              "es_index"
            ]  
          }
        ],
        "subscribe" : [ 
          { 
            "component" : "extractor_spout", 
            "stream" : "default",
            "grouping": "localOrShuffle"
          }
        ] 
      }
    },
  ],          

  "exception_catcher_bolt" : {
    "punchlet" : "standard/common/exception_handler.punch",
    "executors" : 1
  }, 

  "storm_settings" : {
      "metrics_consumers": [ "org.apache.storm.metric.LoggingMetricsConsumer" ],
      "topology.builtin.metrics.bucket.size.secs": 30,
      "supervisor.monitor.frequency.secs" : 60,
      "topology.max.spout.pending" : 2000,
      "topology.enable.message.timeouts": true,
      "topology.message.timeout.secs" : 30,
      "topology.worker.childopts" : "-Xms256m -Xmx256m", 
      "topology.receiver.buffer.size": 32,
      "topology.executor.receive.buffer.size": 16384,
      "topology.executor.send.buffer.size": 16384,
      "topology.transfer.buffer.size": 32,
      "topology.worker.shared.thread.pool.size": 4,
      "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
      "topology.spout.wait.strategy": "org.apache.storm.spout.SleepSpoutWaitStrategy",
      "topology.sleep.spout.wait.strategy.time.ms": 50,
      "topology.workers" : 1
  }
}

If you meet errors or issues. Check regulars errors, reread the configuration (especially index name and timestamp fields). Then, contact support-n3.punchplatform@thalesgroup.com for help.

STEP 1 - Design your process

Use your favorite tool to design your small processing:

  • punchplatform online debugger
  • punchplatform-log-injector.sh --punchlets
  • punchplatform-puncher.sh

The goal is to be focused on data only. Forget elasticsearch storm etc... Punch and data focus only.

For example, I have try to add a year do a date with the online punch debugger using :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{
  java.time.ZonedDateTime date = java.time.ZonedDateTime.parse("2018-01-11T15:05:00+01:00", java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME);
  [initialDate] = date.toString();

  java.time.Period year = java.time.Period.ofYears(1);
  java.time.ZonedDateTime datePlusYear = date.plus(year);
  [datePlusYear] = datePlusYear.toString();

  print(root);
}

Then, write your processing in a punch. For my example, it looks like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  // get date from logs ie [logs][log][rep][ts]
  java.time.ZonedDateTime date = java.time.ZonedDateTime.parse([logs][log][rep][ts].toString(), java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME);
  //[initialDate] = date.toString();

  // define 1 year, no matter leap year or not
  java.time.Period year = java.time.Period.ofYears(1);

  // add 1 year
  java.time.ZonedDateTime datePlusYear = date.plus(year);

  // update [logs][log][rep][ts] with the updated date
  [logs][log][rep][ts] = datePlusYear.toString();

}

STEP 2 - Start your processing in production

Assemble the reading component, the processing component and the last component, the writing component, like the following example:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
{
    "tenant" : "mytenant",
    "channel" : "replay_es",
    "name" : "single",
    "meta" : {
      "tenant" : "mytenant",
      "channel" : "replay_es",
      "vendor" : "replay_es"
    },
    "spouts" : [
      {
            "type" : "elasticsearch_spout",
            "spout_settings" : {
              "es_client_type" : "client",
              "es_cluster_name" : "es_search",
              "es_cluster_nodes_and_ports" : "localhost:9300",
              "index_name" : "events-mytenant-*",
              "from_datetime" : "2018-01-12T09:00:00+0100",
              "to_datetime" : "2018-01-13T18:00:00+0100",
              "timestamp_field" : "rep.ts",
              "timeslice_size_ms" : 60000,
              "filtering_request" : "_type:",
              "es_id_storm_output_field_name" : "log_id",
              "es_type_storm_output_field_name" : "es_type",
              "es_document_storm_output_field_name" : "log",
              "es_index_storm_output_field_name" : "es_index"
            },
            "storm_settings" : {
              "executors": 1,
              "component" : "extractor_spout",
              "publish" : [
                {
                  "stream" : "logs" ,  

                  "fields" : ["log","es_type", "log_id", "es_index"]
                }
              ]
            }
        }
    ],  

    "bolts" : [
      {
        "type" : "punch_bolt",
        "bolt_settings" : {    
          "punchlet_json_resources" : [],
          "punchlet" : [
            "replay_es/removeTs.punch",
            "replay_es/plusOneYear.punch"
          ]
        },
        "storm_settings" : {
          "executors": 1,
          "component" : "punch_bolt",
          "publish" : [ 
            { 
              "stream" : "logs", 
              "fields" : [
                "log",
                "es_type",
                "log_id",
                "es_index"
              ]  
            }
          ],
          "subscribe" : [ 
            { 
              "component" : "extractor_spout", 
              "stream" : "logs",
              "grouping": "localOrShuffle"
            }
          ] 
        }
      },
      {
        "type": "elasticsearch_bolt",
        "bolt_settings": {
          "cluster_id": "es_search",
          "per_stream_settings" : [
            {
              "stream" : "logs",          
              "index" : { "type" : "daily" , "prefix" : "events-%{tenant}-" },          
              "document_json_field" : "log",          
              "document_id_field" : "log_id",          
            }
          ]
        },
        "storm_settings": {
          "component": "elasticsearch",
          "subscribe": [ { "component": "punch_bolt", "stream": "logs", "grouping" : "localOrShuffle" } ]
        }
      }
    ],  

    "metrics" : {
        "reporters" : [
            {
              "type" : "elasticsearch",
              "cluster_name" : "es_search",
              "transport" :
              "localhost:9300", 
              "max_results_size" : 10000,
              "native_es_settings" : {
                "transport.netty.workerCount" : "1"
              },
              "reporting_interval" : 1
            }
        ]
    },              

    "exception_catcher_bolt" : {
      "punchlet" : "standard/common/exception_handler.punch",
      "executors" : 1
    }, 

    "storm_settings" : {
        "metrics_consumers": [ "org.apache.storm.metric.LoggingMetricsConsumer" ],
        "topology.builtin.metrics.bucket.size.secs": 30,
        "supervisor.monitor.frequency.secs" : 60,
        "topology.max.spout.pending" : 2000,
        "topology.enable.message.timeouts": true,
        "topology.message.timeout.secs" : 30,
        "topology.worker.childopts" : "-Xms256m -Xmx256m", 
        "topology.receiver.buffer.size": 32,
        "topology.executor.receive.buffer.size": 16384,
        "topology.executor.send.buffer.size": 16384,
        "topology.transfer.buffer.size": 32,
        "topology.worker.shared.thread.pool.size": 4,
        "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
        "topology.spout.wait.strategy": "org.apache.storm.spout.SleepSpoutWaitStrategy",
        "topology.sleep.spout.wait.strategy.time.ms": 50,
        "topology.workers" : 1
    }
}

We have to start in a cluster, so we need a channel_structure like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
    "topologies" : [
        {
            "topology" : "single_topology.json",
            "execution_mode" : "cluster",
            "cluster" : "main",
            "reload_action" : "kill_then_start"
        }
    ],
    "autotest_latency_control" : {     
        "path_controls" : {         
            "to_elasticsearch" : {
                "input" : {
                    "pp_platform_id" : "punchplatform-primary",
                    "tenant_name" : "mytenant",
                    "channel_name" : "replay_es",
                    "topology_name" : "single",
                    "storm_container_id" : "main",
                    "component_name" : "extractor_spout"
                },
                "output" : {
                    "pp_platform_id" : "punchplatform-primary",
                    "tenant_name" : "mytenant",
                    "channel_name" : "replay_es",                    
                    "topology_name" : "single",
                    "storm_container_id" : "main",
                    "component_name" : "elasticsearch"
                },
                "warn_threshold_s" : 2,
                "error_threshold_s" : 4
            }
        }
    },
    "metrics" : {
        "metrics_source" : {
            "configuration" : {
                "type" : "elasticsearch",
                "cluster_name" : "es_search",
                "transport" : "localhost:9300",
                "native_es_settings" : {
                    "transport.netty.workerCount" : "1"
                }
            }
        }
    },
    "kafka_topics" : {}
}

To start, simply run:

1
$ punchplatform-channel.sh --start mytenant/replay_es

STEP 3 - Administrate and Monitor

Punchplatform provides severals way to monitor the kind of processing.

To monitor the high level of the job:

  • Channels tab in punchplatform admin to see latency control health
  • Jobs tab in punchplatform admin to see the health and a high level progress.
  • punchplatform-jobs.sh command

To monitor the low level of the job:

  • Punchplatform provides metrics and dashboarding components (elasticsearch&kibana)
  • Create a dashboard to understand the processing in your job.

For example:

Before starting job:

image

During the job:

image

At the end of the job:

image

image