HOWTO extract massive amount of logs from archiving system

Why do that

Another typical use case is massive extraction on a file-system or on another archiving system. As explained in previous section we could extract data with PunchPlatform command-line, but this action is:

  • not resilient (what happens if process shut off before the end?)
  • not very performant, meaning the process runs on one machine only

A better way to do it is to launch a dedicated topology, supervised by Storm and eventually running on multiple machines.

Extraction

The main idea here is to manage a list of tasks specifying the archives files to extract and to pass this list to an Archive Processor Bolt, who processes the effective extraction from the Archiving system. The tasks are documents stored in Elasticsearch. It’s very convenient because PunchPlatform offers components to interface Elasticsearch and Kibana or Grafana can be used to visualize the progression of tasks processing.

Warning

A PunchBolt, not represented on the figure, could eventually be necessary to process tasks status before to update it in Elasticsearch (by ES Bolt).

What to do

To massively extract archived data, the workflow is the following:

  1. Determine precisely which data you want to extract from your archiving system : which topic and which time range. Typically a topic matches a technology, so we may want to extract one or multiple topics.

  2. Choose or deploy an archiving system or a file-system where to extract data.

  3. Choose or deploy an Elasticsearch cluster reachable by the Storm cluster hosting your future extraction topology. Of course the Storm cluster will also need to reach your archiving systems.

    Note

    You do not need a big Elasticsearch cluster to host your tasks. Typically a simple 3-nodes cluster will be enough for resilience, or a single-node one for non-resilience.

  4. On a PunchPlatform Admin station, adapt and launch following command:

    punchplatform-objects-storage.sh batches-tasks-list --cluster ceph:myCephCluster --pool mytenant-data --topic myTopic --elasticsearch-cluster-name myESCluster --elasticsearch-index tasks-for-extraction --elasticsearch-transport myESEndpoint:9300 --taskslist-prefix extractor-
    

    It will generate a tasks list into Elasticsearch, each task matching one archived batch to extract. This tasks will be used by your extraction topology to know which data to extract and to produce reports about extraction. Next it’s easy to use Kibana or Grafana dashboards to follow extraction status.

    If you have multiple topics, please adapt and launch command as many times as you have topics to extract.

  5. In a new channel and a new topology, instantiate our first component: an Elasticsearch Spout. It will be responsible of reading tasks to send informations about data to extract to the next component. See an example of component configuration:

       {
        "type" : "elasticsearch_spout",
        "spout_settings" : {
          "es_client_type" : "client",
          "load_control" : "rate",
          "load_control.rate" : 10,
          "load_control.adaptative" : false,
          "es_cluster_nodes_and_ports" : "myESEndpoint:9300",
          "es_cluster_name" : "myESCluster",
          "index_name" : "tasks-for-extraction",
          "from_datetime" : "2016-01-01T00:00:00+0100",
          "to_datetime" : "2016-12-31T23:59:59+0100",
          "filtering_request" : "_type:task AND NOT status.value:DONE",
          "timeslice_size_ms" : 60000,
          "timestamp_field" : "status.update_ts",
          "es_client_type" : "client",
          "es_id_storm_output_field_name" : "task_id",
          "es_type_storm_output_field_name" : "task_type",
          "es_document_storm_output_field_name" : "task",
          "es_index_storm_output_field_name" : "es_index",
          "acknowledgement_reporting" : {
            "storm_stream_id" : "tasks_updates",
            "boolean_ack_storm_field_name" : "acked",
            "replay_failed_timeslices" : false
          },
          "fields_extraction" : {
              "partition" : {
                  "es_field" : "[batch][source][partition]"
              },
              "batch_id" : {
                  "es_field" : "[batch][source][batch_id]"
              }
          }
        },
        "storm_settings" : {
          "executors": 1,
          "component" : "tasks_spout",
          "publish" : [
            {
              "stream" : "tasks" ,
              "fields" : ["task","task_type", "task_id", "es_index", "partition", "batch_id"]
            },
            {
              "stream" : "tasks_updates" ,
              "fields" : ["task","task_type", "task_id", "es_index", "acked"]
            }
          ]
        }
    }
    

    The main parameters you absolutely need to understand are :

    • es_cluster_nodes_and_ports es_cluster_name index_name
    • from_datetime and to_datetime
    • timestamp_field

    You can find a complete description of this component here.

    Note

    Please note if you want to extract multiple topics, you will need to instantiate multiple topologies. Each topology will be identical except the ArchiveProcessor component which will specify topic name.

  6. Please configure an ArchiveProcessor Bolt. It receives from the previous component informations about batch to extract and processes the effective extraction. On the sample below we suppose that data was not enciphered and we don’t want to encipher it. We also suppose we want to extract it on a file-system.

    {
    "type" : "archive_processor_bolt",
    "bolt_settings" : {
      "object_storage_cluster" : "ceph:myCephCluster",
      "object_storage_pool" : "mytenant-data",
      "source_topic_name" : "myTopic",
      "partition_id_storm_field" : "partition",
      "batch_id_storm_field" : "batch_id",
      "actions" : {
        "republication" : {
            "compression_format" : "GZIP",
            "publication_actions" : [
                {
                    "action_type":"write_to_filesystem",
                    "path" : "/path/to/extracted/data/",
                    "file_prefix" : "extract",
                    "required_ciphering_content" : false,
                    "files_to_process" : [ "raw" ]
                }
            ]
        }
      },
      "output_report_storm_field" : "task",
      "report_emission" : {
         "report_stream" : "tasks",
         "report_field" : "task",
         "id_field" : "task_id",
         "task_input_storm_field" : "task"
      }
    },
    "storm_settings" : {
      "executors": 1,
      "component" : "archive_reprocessor",
      "publish" : [
        {
          "stream" : "tasks",
          "fields" : ["task", "task_id" ]
        }
      ],
      "subscribe" : [
        {
          "component" : "tasks_spout",
          "stream" : "tasks",
          "grouping": "localOrShuffle"
        }
      ]
    }
    }
    

    Please find full description of ArchiveProcessor Bolt here.

  7. Configure a punchlet to receive tasks status from ArchiveProcessor component and format it.

    {
         "type" : "punch_bolt",
         "bolt_settings" : {
           "punchlet_code" : "{[tasks][es_index]=\"tasks\"; [tasks][es_type]=\"task\"; [tasks][es_id]=[tasks][task_id];}"
         },
         "storm_settings" : {
           "executors": 1,
           "component" : "punch_bolt",
           "publish" : [
             {
               "stream" : "tasks",
               "fields" : [ "task" , "es_index" , "es_type" , "es_id" ]
             }
           ],
           "subscribe" : [
             {
               "component" : "archive_reprocessor",
               "stream" : "tasks",
               "grouping": "localOrShuffle"
             }
           ]
         }
       }
    
  8. Finally configure an Elasticsearch Bolt to update tasks status.

    {
               "type" : "elasticsearch_bolt",
               "bolt_settings" : {
                   "watchdog_timeout" : "1h",
                   "batch_size" : 1000,
                   "queue_size" : 1000,
                   "batch_interval" : 1000,
                   "data_field" : "task",
                   "index_field" : "es_index",
                   "type_field" : "es_type",
                   "id_field" : "es_id",
                   "request_timeout" : "20s",
                   "cluster_id" : "myESCluster",
                   "document_type" : "task",
                   "index_failed" : true
               },
               "storm_settings" : {
                  "executors": 1,
                  "component" : "elasticsearch_bolt",
                  "subscribe" : [
                      {
                        "component" : "punch_bolt",
                        "stream" : "tasks",
                        "grouping": "localOrShuffle"
                      }
                  ]
                }
          }
    
  9. Congratulations you configured a massive extraction topology. If you’re sure about what you’ve done, please launch it.

    punchplatform-topology.sh
    

    We invite you to build a Kibana dashboard to follow extraction status. You’ll be able to know how many tasks have been extracted, errors if something occurs, etc.

    For a kickstarter Kibana monitoring dashboard, create the ‘task*’ index pattern in your kibana settings, based on ‘status.update_ts’ timestamp field, and import the resources/kibana/bulk/tasks-monitoring.json (from the standalone distrib, or the punchplatform ‘pp-log-resources’ git repository) into kibana objects through the Kibana GUI.

Special case : ciphered archive

For more details on archiving, please refer to ../Archiving.html.

If you have a ciphered archive as source, you will need to modify your topology to provide the path to the keystore holding your deciphering keys, and the keys aliases for all keys that may be needed for extracting all data from your targetted timescope (remember that ciphering keys may have changed over time, as described in keys operation principles (cf Archiving system administration)).

The needed change is to insert a ‘deciphering’ section inside the ‘republication’ section of the ArchiveProcessorBolt :

[...]
    "republication": {
              "deciphering" : {
                    "keystore" : "/data/keystore.jks",
                    "keys" : [
                         {
                           "key_id" : "MyOperationalKey"
                         }
                     ]
              },
[...]