Skip to content

NSG AzureBlobStorage Input

Introduction

Please refer to AzureBlobStorage Spout before reading this documentation.

This inout node makes it easy to pull NSG data generated by Azure NSG cloud services without duplications.

Let's explain with an example how this node differs to the classic AzureBlobStorage Spout. Given the following blob myBlob, which contains a json file where data are appended to a list at regular intervals of time. What is following below illustrates this concept (with m an integer in minutes):

Interval m + 1

{
  "records": [
    {
      "age": 1,
      "name": "jonathan",
      "surname": "yue chun",
      "gender": "M"
    }
  ]
}

Interval m + 2

{
  "records": [
    {
      "age": 1,
      "name": "jonathan",
      "surname": "yue chun",
      "gender": "M"
    },
    {
      "age": 2,
      "name": "jane",
      "surname": "geng",
      "gender": "F"
    }
  ]
}

Interval m + 3

{
  "records": [
    {
      "age": 1,
      "name": "jonathan",
      "surname": "yue chun",
      "gender": "M"
    },
    {
      "age": 2,
      "name": "jane",
      "surname": "geng",
      "gender": "F"
    },
    {
      "age": 3,
      "name": "jj",
      "surname": "doggy",
      "gender": "M"
    }
  ]
}

We can notice that each minute, data are appended to our records array. With time, this array will increase in size... By using the classic AzureBlobStorage Spout, on each update of myBlob, the whole array will be emitted as tuples inside our topology, and hence data duplications... With the NSGAzureBlobStorage Spout, we make sure that only the newly appended data are emitted.

Note

This spout is not designed to be run with more than one storm executor.

Example(s)

Below is an example for fetching blobs (~2 MB max) on a virtual directory container

{
  "tenant": "my_tenant",
  "dag": [
    {   
      "type": "nsgazureblobstorage_spout",
      "settings": {
        "blobstorage_name": "your_blob_name",
        "blobstorage_key": "your_blob_key",
        "container_name": "your_container_name",
        "last_committed_blob_name": "myUseCase",
        "pull_interval": "10s",
        "read_blob_since_last": "12s",
        "codec": "json_array",
        "read_strategy": "last_committed",
        "chunk_size": 1048576,
        "blob_name_prefix": "a-zA-Z",
        "virtual_directory_blob_name_extension_regex": "*",
        "virtual_directory_prefix_list": ["virtualdir_1", "virtualdir_2"],
        "nsg_azure_array_size_bytes": 1000000,
        "nsg_azure_last_bytes_to_ignore": 2,
        "nsg_azure_virtual_directory_scan_since_last": "4h"   
      },
      "storm_settings": {
        "component": "azureblob",
        "publish": [
          {
            "stream": "input",
            "fields": [
              "value"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "punchlet_node",
      "settings": {
        "punchlet_code": "{print(root);}",
        "decoding_strategy" : "smart"
      },
      "storm_settings": {
        "component": punchlet_node",
        "subscribe": [
          {
            "component": "azureblob",
            "stream": "input"
          }
        ]
      }
    },
    {
      "type" : "elasticsearch_bolt",
      "settings" : {
        "cluster_id" : "es_search",
        "per_stream_settings" : [
          {
            "stream" : "input",
            "index" : { "type" : "daily" , "prefix" : "azureblob-" },
            "document_json_field" : "value",
            "additional_document_value_fields" : [
              { "type" : "date" , "document_field" : "@timestamp", "format" : "iso"}
            ]
          }
        ]
      },
      "storm_settings": {
      "component": "elasticsearch_bolt",
      "subscribe": [ { "component": "azureblob", "stream": "input" } ]
      }
    }
  ],
  "storm_settings": {
    "topology.worker.childopts": "-server -Xms2048m -Xmx2048m",
    "topology.enable.message.timeouts": true,
    "topology.message.timeout.secs": 10,
    "topology.max.spout.pending": 10000,
    "topology.sleep.spout.wait.strategy.time.ms": 50
  }
}

Parameters

  • blobstorage_name : String

The name of yor blob storage (located in your azure resource group of your subscription).

  • blobstorage_key : String

The key provided with your Azure Blob Storage instance.

  • container_name : String

The name of a container of the blob in which you want to pull data.

  • virtual_directory_blob_name_extension_regex : String

takes as value the extension of a blob type. Ex: json to retrieve only *.json blobs from the specified container.

  • pull_interval : String

The number of seconds/minutes/hours to wait before executing the next pull. In other words, the time to wait before executing the next call after the current process is finished. Follows elasticsearch time convention: Xs, Xm, Xh where X is an integer and s for seconds, m for minutes and h for hours.

  • read_blob_since_last : String

This parameter will be used to search for blobs on your container which will be older than the (current time) - (time_value). Follows elasticsearch time convention: Xs, Xm, Xh where X is an integer and s for seconds, m for minutes and h for hours. Will default to pull_interval - 2s if not specified in your topology configuration.

  • codec: String

This parameter is used for large blobs. For instance a json array blob of more than 50 mb. Instead of emitting in the topology the whole json, each json object in the array will be emitted at a time. Takes as value json_array. More options will be available with time.

  • read_strategy: String

last_committed: The spout will start the pulling from a last_committed blob as reference.

earliest: The spout will pull all blobs and exit.

latest: The spout will ignore checkpoint blob (if present) and pull only latest/modified blob to emit in the topology (this strategy will be used by default if not specified in your topology configuration).

  • chunk_size: Integer

Size of each reading operation in bytes. The buffer size should be big enough if you plan to pull large blob. Will default to: 1048576 if not specified in your topology configuration.

  • last_committed_blob_name: String

Specify a custom name for your checkpoint blob. Note, this parameter will also be used when the topology will start over in checkpoint read_strategy.

  • blob_name_prefix: String

Pull only blobs starting from a specific character only. For instance, "blob_name_prefix": "a-bAB" will retrieve only blob name starting by letter a, b, A or B. By default if not specified: a-zA-Z0-9.

  • virtual_directory_prefix_list: List(of String)

Specify which (Sub)-virtual directory you want to restrict the the spout from fetching blobs...

  • nsg_azure_array_size_bytes: Integer

An approximate size in bytes of data that is appended to a json array.

  • nsg_azure_last_bytes_to_ignore: Integer

Specify the number of bytes to be ignore at the end of blob. This parameter is used to remember the last line read in an array. For instance, if our data is appended regularly to a json array as json object: {"records": [{..},{..}]}. In this case we would set azure_nsg_last_bytes_to_ignore: 2

  • nsg_azure_virtual_directory_scan_since_last: String

This parameter takes a number and a character which is either y: year, M: month, d: day, h: hour and mn or m for minute. It is used to fetch blob blobs whose name's begins by a prefix (virtual_directory_prefix) and is followed by year/month/day/hour/minute (/ character is used to delimit the year, month, etc...). Hence, by setting this parameter and given the value you specified, a path will be generated. This path will be based on the concatenation of your virtual_directory_prefix and nsg_azure_virtual_directory_scan_since_last. The digit which is specified will be used to select the last X virtual directories, where X is a digit. For instance: by setting "virtual_directory_prefix_list": ["virtualdir1/virtualdir2"] and "nsg_azure_virtual_directory_scan_since_last": "5h", this spout will fetch blob which are located under:

1
2
3
4
5
   - `virtualdir1/virtualdir2/y=2019/m=01/d=16/h=05`
   - `virtualdir1/virtualdir2/y=2019/m=01/d=16/h=04`
   - `virtualdir1/virtualdir2/y=2019/m=01/d=16/h=03`
   - `virtualdir1/virtualdir2/y=2019/m=01/d=16/h=02`
   - `virtualdir1/virtualdir2/y=2019/m=01/d=16/h=01`

Note: if minute is specified (mor mn), by default, the current system year, month, day and hour will be used. The same logic can be applied on month, day, hour.