Skip to content

AzureBlobStorage Spout

AzureBlobStorageSpout Introduction

The Azure Blob Storage Spout makes it possible to pull data from an Azure Blob container's periodically. Hence, pulling of large data sets stored as blobs in your subscription can be made in a near real time fashion. The type of data which is currenlty supported is JSON or array of JSON.

A: JSON

1
2
3
4
5
6
{
  "age": 1,
  "name": "jonathan",
  "surname": "yue chun",
  "gender": "M"
}

B: Array of JSON

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
  "records": [
    {
      "age": 1,
      "name": "jonathan",
      "surname": "yue chun",
      "gender": "M"
    },
    {
      "age": 2,
      "name": "jane",
      "surname": "geng",
      "gender": "F"
    }
  ]
}

Given the JSON format contained in your blob file, the spout will have different behaviour. In this case, you should use the appropriate codec. First of, if your blob data type myBlob matched the one of A, you should specify "codec": "json" - see in example above, on each new update of myBlob, the whole JSON will be emitted throughout the topology as a tuple. For instance:

1
{"age": 1,"name": "jonathan","surname": "yue chun","gender": "M"}

On the other hand, if the data type is similar to the one of type B, you should specify "codec": "json_array", only the JSON object contained in your array of JSON will be propagated on each new modification:

1
{"age": 1,"name": "jonathan","surname": "yue chun","gender": "M"}

AND

1
{"age": 2,"name": "jane","surname": "geng","gender": "F"}

To summarise, this spout will compare the last modified date to a specific read_file_since_last for all files in a container virtual directory. Only the the blob matching this criteria wil be streamed in the topology as tuple in json format.

This spout will work in an iterative way by fetching each blob one by one. In this case, you should set pull_interval and read_file_since_last accordingly to the blob size your are going to pull. You should also take into account your internet bandwidth connection.

Note: All the blobs are stored in virtual memory. Make sure to have a server with enough RAM to use this spout.

Info

There will always be a lag in time when a blob finished uploading. Since this spout will fetch files base on their timestamp, be sure to not set a pull_interval and a read_file_since_last with low values. To avoid duplicate polling, always set pull_interval less than read_file_since_last, eg. "pull_interval": "10s" and "read_file_since_last": "13s"

Note

This spout is not designed to be run with more than one storm executor.

Note

As of now, only json_array codec is supported. If the current codec does not match your needs, send us an email to: dimitri@punchplatform.com. We will do our best to implement one for your use case.

Example(s)

Below is an example for fetching files on a virtual directory container

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
{
  "tenant": "my_tenant",
  "spouts": [
    {   
      "type": "azureblobstorage_spout",
      "spout_settings": {
        "blobstorage_name": "testblob",
        "blobstorage_key": "mytestkey",
        "container_name": "test",
        "virtual_directory_path_regex": "*",
        "pull_interval": "10s",
        "read_file_since_last": "12s",
        "codec": "json_array",
        "read_strategy": "latest",
        "watch_file_starting_by": "a-zA-Z",
        "chunk_size": 1048576
      },
      "storm_settings": {
        "component": "azureblob",
        "publish": [
          {
            "stream": "input",
            "fields": [
              "value"
            ]
          }
        ]
      }
    }
  ],
  "bolts": [
    {
      "type": "punch_bolt",
      "bolt_settings": {
        "punchlet_code": "{print(root);}",
        "decoding_strategy" : "smart"
      },
      "storm_settings": {
        "component": "punch_bolt",
        "subscribe": [
          {
            "component": "azureblob",
            "stream": "input"
          }
        ]
      }
    }
  ],
  "storm_settings": {
    "topology.worker.childopts": "-server -Xms128m -Xmx128m",
    "topology.enable.message.timeouts": true,
    "topology.message.timeout.secs": 10,
    "topology.max.spout.pending": 10000,
    "topology.sleep.spout.wait.strategy.time.ms": 50
  }
}

Parameters

  • blobstorage_name : String

The name of yor blob storage (located in your azure resource group of your subscription).

  • blobstorage_key : String

The key provided with your Azure Blob Storage instance.

  • container_name : String

The name of a container of the blob in which you want to pull data.

  • virtual_directory_blob_name_extension_regex : String

takes as value the extension of a blob type. Ex:  json to retrive only *.json blobs from the specified container.

  • pull_interval : String

The number of seconds/minutes/hours to wait before executing the next pull. In other words, the time to wait before executing the next call after the current process is finished. Follows elasticsearch time convention: Xs, Xm, Xh where X is an integer and s for seconds, m for minutes and h for hours.

  • read_blob_since_last : String

This parameter will be used to search for blobs on your container which will be older than the (current time) - (time_value). Follows elasticsearch time convention: Xs, Xm, Xh where X is an integer and s for seconds, m for minutes and h for hours. Will default to pull_interval - 2s if not specified in your topology configuration.

  • codec: String

This parameter is used for large blobs. For instance a json array blob of more than 50 mb. Instead of emmiting in the topology the whole json, each json object in the array will be emitted at a time. Takes as value json_array. More options will be available with time.

  • read_strategy: String

last_committed: The spout will start the pulling from a last_committed blob as reference.

earliest: The spout will pull all blobs and exit.

latest: The spout will ignore checkpoint blob (if present) and pull only latest/modified blob to emit in the topology (this strategy will be used by default if not specified in your topology configuration).

  • chunk_size: Integer

Size of each reading operation in bytes. The buffer size should be big enough if you plan to pull large blob. Will default to: 1048576 if not specified in your topology configuration.

  • last_committed_blob_name: String

Specify a custom name for your checkpoint blob. Note, this parameter will also be used when the topology will start over in checkpoint read_strategy.

  • blob_name_prefix: String

Pull only blobs starting from a specific character only. For instance, "blob_name_prefix": "a-bAB" will retrieve only blob name starting by letter a, b, A or B. By default if not specified: a-zA-Z0-9.

  • virtual_directory_prefix_list: List(of String)

Specify which (Sub)-virtual directory you want to restrict the the spout from fetching blobs...