Skip to content

HOWTO run a small change between two elasticsearch

Why do that

With Punchplatform, you have usually data in an Elasticsearch Cluster.

Elasticsearch provides a way to reindex data when mapping must be updated. For instance: Long to Date, or String not analyzed to analyzed String.

Sometimes, you need to update the data itself, that's why punchplatform provides a way to update the data itself. For instance: change timestamps or enrich data.

Update timestamp - use case

Use case description:

Add a year to a specific timestamp field on a subset of data.

STEP 0 - Identify data

Identify the subset of data with Kibana:

  • timestamp range
  • type of logs
  • indices name

Then, read the data selected, with an Elasticsearch Spout and print the result.

The following example show an extraction of logs of all tenants logs from 2018/01/12 9h to 2018/01/13 18h with the timestamp of the reporter (rep.ts). You can start in foreground the following topology:

{
  "tenant" : "mytenant",
  "channel" : "replay_es",
  "name" : "single",
  "meta" : {
    "tenant" : "mytenant",
    "channel" : "replay_es",
    "vendor" : "replay_es"
  },
  "spouts" : [
     {
          "type" : "elasticsearch_input",
          "settings" : {
            "cluster_id" : "es_search",
            "index" : "events-*",
            "query": "?q=_type:log",
            "es.index.read.missing.as.empty": true
          },
          "storm_settings" : {
            "component" : "extractor_spout"
          }
      }
  ],
  "bolts" : [
    {
      "type" : punchlet_node",
      "settings" : {    
        "punchlet_code": "{print(root);}"
      },
      "storm_settings" : {
        "executors": 1,
        "component" : punchlet_node",
        "publish" : [ 
          { 
            "stream" : "logs", 
            "fields" : [
              "log",
              "es_type",
              "log_id",
              "es_index"
            ]  
          }
        ],
        "subscribe" : [ 
          { 
            "component" : "extractor_spout", 
            "stream" : "default",
            "grouping": "localOrShuffle"
          }
        ] 
      }
    },
  ],          

  "exception_catcher_bolt" : {
    "punchlet" : "standard/common/exception_handler.punch",
    "executors" : 1
  }, 

  "storm_settings" : {
      "metrics_consumers": [ "org.apache.storm.metric.LoggingMetricsConsumer" ],
      "topology.builtin.metrics.bucket.size.secs": 30,
      "supervisor.monitor.frequency.secs" : 60,
      "topology.max.spout.pending" : 2000,
      "topology.enable.message.timeouts": true,
      "topology.message.timeout.secs" : 30,
      "topology.worker.childopts" : "-Xms256m -Xmx256m", 
      "topology.receiver.buffer.size": 32,
      "topology.executor.receive.buffer.size": 16384,
      "topology.executor.send.buffer.size": 16384,
      "topology.transfer.buffer.size": 32,
      "topology.worker.shared.thread.pool.size": 4,
      "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
      "topology.spout.wait.strategy": "org.apache.storm.spout.SleepSpoutWaitStrategy",
      "topology.sleep.spout.wait.strategy.time.ms": 50,
      "topology.workers" : 1
  }
}

If you meet errors or issues. Check regulars errors, reread the configuration (especially index name and timestamp fields). Then, contact support-n3.punchplatform@thalesgroup.com for help.

STEP 1 - Design your process

Use your favorite tool to design your small processing:

  • punchplatform online debugger
  • punchplatform-log-injector.sh --punchlets
  • punchplatform-puncher.sh

The goal is to be focused on data only. Forget elasticsearch storm etc... Punch and data focus only.

For example, I have try to add a year do a date with the online punch debugger using :

{
  java.time.ZonedDateTime date = java.time.ZonedDateTime.parse("2018-01-11T15:05:00+01:00", java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME);
  [initialDate] = date.toString();

  java.time.Period year = java.time.Period.ofYears(1);
  java.time.ZonedDateTime datePlusYear = date.plus(year);
  [datePlusYear] = datePlusYear.toString();

  print(root);
}

Then, write your processing in a punch. For my example, it looks like:

{
  // get date from logs ie [logs][log][rep][ts]
  java.time.ZonedDateTime date = java.time.ZonedDateTime.parse([logs][log][rep][ts].toString(), java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME);
  //[initialDate] = date.toString();

  // define 1 year, no matter leap year or not
  java.time.Period year = java.time.Period.ofYears(1);

  // add 1 year
  java.time.ZonedDateTime datePlusYear = date.plus(year);

  // update [logs][log][rep][ts] with the updated date
  [logs][log][rep][ts] = datePlusYear.toString();

}

STEP 2 - Start your processing in production

Assemble the reading component, the processing component and the last component, the writing component, like the following example:

{
    "tenant" : "mytenant",
    "channel" : "replay_es",
    "name" : "single",
    "meta" : {
      "tenant" : "mytenant",
      "channel" : "replay_es",
      "vendor" : "replay_es"
    },
    "spouts" : [
      {
            "type" : "elasticsearch_input",
            "settings" : {
              "es_client_type" : "client",
              "es_cluster_name" : "es_search",
              "es_cluster_nodes_and_ports" : "localhost:9300",
              "index_name" : "events-mytenant-*",
              "from_datetime" : "2018-01-12T09:00:00+0100",
              "to_datetime" : "2018-01-13T18:00:00+0100",
              "timestamp_field" : "rep.ts",
              "timeslice_size_ms" : 60000,
              "filtering_request" : "_type:",
              "es_id_storm_output_field_name" : "log_id",
              "es_type_storm_output_field_name" : "es_type",
              "es_document_storm_output_field_name" : "log",
              "es_index_storm_output_field_name" : "es_index"
            },
            "storm_settings" : {
              "executors": 1,
              "component" : "extractor_spout",
              "publish" : [
                {
                  "stream" : "logs" ,  

                  "fields" : ["log","es_type", "log_id", "es_index"]
                }
              ]
            }
        }
    ],  

    "bolts" : [
      {
        "type" : punchlet_node",
        "settings" : {    
          "punchlet_json_resources" : [],
          "punchlet" : [
            "replay_es/removeTs.punch",
            "replay_es/plusOneYear.punch"
          ]
        },
        "storm_settings" : {
          "executors": 1,
          "component" : punchlet_node",
          "publish" : [ 
            { 
              "stream" : "logs", 
              "fields" : [
                "log",
                "es_type",
                "log_id",
                "es_index"
              ]  
            }
          ],
          "subscribe" : [ 
            { 
              "component" : "extractor_spout", 
              "stream" : "logs",
              "grouping": "localOrShuffle"
            }
          ] 
        }
      },
      {
        "type": "elasticsearch_bolt",
        "settings": {
          "cluster_id": "es_search",
          "per_stream_settings" : [
            {
              "stream" : "logs",          
              "index" : { "type" : "daily" , "prefix" : "events-%{tenant}-" },          
              "document_json_field" : "log",          
              "document_id_field" : "log_id",          
            }
          ]
        },
        "storm_settings": {
          "component": "elasticsearch",
          "subscribe": [ { "component": punchlet_node", "stream": "logs", "grouping" : "localOrShuffle" } ]
        }
      }
    ],  

    "metrics" : {
        "reporters" : [
            {
              "type" : "elasticsearch",
              "cluster_name" : "es_search",
              "transport" :
              "localhost:9300", 
              "max_results_size" : 10000,
              "native_es_settings" : {
                "transport.netty.workerCount" : "1"
              },
              "reporting_interval" : 1
            }
        ]
    },              

    "exception_catcher_bolt" : {
      "punchlet" : "standard/common/exception_handler.punch",
      "executors" : 1
    }, 

    "storm_settings" : {
        "metrics_consumers": [ "org.apache.storm.metric.LoggingMetricsConsumer" ],
        "topology.builtin.metrics.bucket.size.secs": 30,
        "supervisor.monitor.frequency.secs" : 60,
        "topology.max.spout.pending" : 2000,
        "topology.enable.message.timeouts": true,
        "topology.message.timeout.secs" : 30,
        "topology.worker.childopts" : "-Xms256m -Xmx256m", 
        "topology.receiver.buffer.size": 32,
        "topology.executor.receive.buffer.size": 16384,
        "topology.executor.send.buffer.size": 16384,
        "topology.transfer.buffer.size": 32,
        "topology.worker.shared.thread.pool.size": 4,
        "topology.disruptor.wait.strategy": "com.lmax.disruptor.BlockingWaitStrategy",
        "topology.spout.wait.strategy": "org.apache.storm.spout.SleepSpoutWaitStrategy",
        "topology.sleep.spout.wait.strategy.time.ms": 50,
        "topology.workers" : 1
    }
}

We have to start in a cluster, so we need a channel_structure like this:

{
    "topologies" : [
        {
            "topology" : "single_topology.json",
            "execution_mode" : "cluster",
            "cluster" : "main",
            "reload_action" : "kill_then_start"
        }
    ],
    "autotest_latency_control" : {     
        "path_controls" : {         
            "to_elasticsearch" : {
                "input" : {
                    "pp_platform_id" : "punchplatform-primary",
                    "tenant_name" : "mytenant",
                    "channel_name" : "replay_es",
                    "topology_name" : "single",
                    "storm_container_id" : "main",
                    "component_name" : "extractor_spout"
                },
                "output" : {
                    "pp_platform_id" : "punchplatform-primary",
                    "tenant_name" : "mytenant",
                    "channel_name" : "replay_es",                    
                    "topology_name" : "single",
                    "storm_container_id" : "main",
                    "component_name" : "elasticsearch"
                },
                "warn_threshold_s" : 2,
                "error_threshold_s" : 4
            }
        }
    },
    "metrics" : {
        "metrics_source" : {
            "configuration" : {
                "type" : "elasticsearch",
                "cluster_name" : "es_search",
                "transport" : "localhost:9300",
                "native_es_settings" : {
                    "transport.netty.workerCount" : "1"
                }
            }
        }
    },
    "kafka_topics" : {}
}

To start, simply run:

channelctl start --channel replay_es

STEP 3 - Administrate and Monitor

Punchplatform provides several way to monitor the kind of processing.

To monitor the high level of the job:

  • Channels tab in punchplatform admin to see latency control health
  • Jobs tab in punchplatform admin to see the health and a high level progress.
  • punchplatform-jobs.sh command

To monitor the low level of the job:

  • Punchplatform provides metrics and dashboarding components (elasticsearch&kibana)
  • Create a dashboard to understand the processing in your job.

For example:

Before starting job:

image

During the job:

image

At the end of the job:

image

image