Azure Blob Storage Input¶
Introduction¶
The Azure Blob Storage Spout makes it possible to pull data from an Azure Blob container's periodically. Hence, pulling of large data sets stored as blobs in your subscription can be made in a near real time fashion. The type of data which is currently supported is JSON or array of JSON.
A
: JSON
{
"age": 1,
"name": "jonathan",
"surname": "yue chun",
"gender": "M"
}
B
: Array of JSON
{
"records": [
{
"age": 1,
"name": "jonathan",
"surname": "yue chun",
"gender": "M"
},
{
"age": 2,
"name": "jane",
"surname": "geng",
"gender": "F"
}
]
}
Given the JSON format contained in your blob file, the spout will have different behaviour. In this case, you should use the appropriate codec
.
First of, if your blob data type myBlob
matched the one of A
, you should specify "codec": "json"
- see in example above, on each new update of myBlob
, the whole JSON will be emitted throughout the topology as a tuple. For instance:
{"age": 1,"name": "jonathan","surname": "yue chun","gender": "M"}
On the other hand, if the data type is similar to the one of type B
, you should specify "codec": "json_array"
, only the JSON object contained in your array of JSON will be propagated on each new modification:
{"age": 1,"name": "jonathan","surname": "yue chun","gender": "M"}
AND
{"age": 2,"name": "jane","surname": "geng","gender": "F"}
To summarise, this spout will compare the last modified date to a specific read_file_since_last
for all files in a container virtual directory. Only the the blob matching this criteria wil be streamed in the topology as tuple in json format.
This spout will work in an iterative way by fetching each blob one by one. In this case, you should set pull_interval
and read_file_since_last
accordingly to the blob size your are going to pull. You should also take into account your internet bandwidth connection.
Note
: All the blobs are stored in virtual memory. Make sure to have a server with enough RAM to use this spout.
Info
There will always be a lag in time when a blob finished uploading. Since this spout will fetch files base on their timestamp, be sure to not set a pull_interval
and a read_file_since_last
with low values.
To avoid duplicate polling, always set pull_interval
less than read_file_since_last
, eg. "pull_interval": "10s"
and "read_file_since_last": "13s"
Note
This spout is not designed to be run with more than one storm executor.
Note
As of now, only json_array codec is supported. If the current codec does not match your needs, send us an email to: dimitri@punchplatform.com. We will do our best to implement one for your use case.
Example(s)¶
Below is an example for fetching files on a virtual directory container
{
"tenant": "my_tenant",
"dag": [
{
"type": "azure_blob_storage_input",
"settings": {
"blobstorage_name": "testblob",
"blobstorage_key": "mytestkey",
"container_name": "test",
"virtual_directory_path_regex": "*",
"pull_interval": "10s",
"read_file_since_last": "12s",
"codec": "json_array",
"read_strategy": "latest",
"watch_file_starting_by": "a-zA-Z",
"chunk_size": 1048576
},
"component": "azureblob",
"publish": [
{
"stream": "input",
"fields": [
"value"
]
}
]
},
{
"type": "punchlet_node",
"settings": {
"punchlet_code": "{print(root);}",
"decoding_strategy" : "smart"
},
"component": punchlet_node",
"subscribe": [
{
"component": "azureblob",
"stream": "input"
}
]
}
],
"storm_settings": {
"topology.worker.childopts": "-server -Xms128m -Xmx128m",
"topology.enable.message.timeouts": true,
"topology.message.timeout.secs": 10,
"topology.max.spout.pending": 10000,
"topology.sleep.spout.wait.strategy.time.ms": 50
}
}
Parameters¶
blobstorage_name
: String
The name of yor blob storage (located in your azure resource group of your subscription).
blobstorage_key
: String
The key provided with your Azure Blob Storage instance.
container_name
: String
The name of a container of the blob in which you want to pull data.
virtual_directory_blob_name_extension_regex
: String
takes as value the extension of a blob type. Ex:
json
to retrieve only *.json blobs from the specified container.
pull_interval
: String
The number of seconds/minutes/hours to wait before executing the next pull. In other words, the time to wait before executing the next call after the current process is finished. Follows elasticsearch time convention: Xs, Xm, Xh where X is an integer and s for seconds, m for minutes and h for hours.
read_blob_since_last
: String
This parameter will be used to search for blobs on your container which will be older than the (current time) - (time_value). Follows elasticsearch time convention: Xs, Xm, Xh where X is an integer and s for seconds, m for minutes and h for hours. Will default to
pull_interval
- 2s if not specified in your topology configuration.
codec
: String
This parameter is used for large blobs. For instance a json array blob of more than 50 mb. Instead of emitting in the topology the whole json, each json object in the array will be emitted at a time. Takes as value
json_array
. More options will be available with time.
read_strategy
: String
last_committed
: The spout will start the pulling from a last_committed blob as reference.
earliest
: The spout will pull all blobs and exit.
latest
: The spout will ignore checkpoint blob (if present) and pull only latest/modified blob to emit in the topology (this strategy will be used by default if not specified in your topology configuration).
chunk_size
: Integer
Size of each reading operation in bytes. The buffer size should be big enough if you plan to pull large blob. Will default to: 1048576 if not specified in your topology configuration.
last_committed_blob_name
: String
Specify a custom name for your checkpoint blob. Note, this parameter will also be used when the topology will start over in checkpoint read_strategy.
blob_name_prefix
: String
Pull only blobs starting from a specific character only. For instance,
"blob_name_prefix": "a-bAB"
will retrieve only blob name starting by lettera, b, A or B
. By default if not specified:a-zA-Z0-9
.
virtual_directory_prefix_list
: List(of String)
Specify which (Sub)-virtual directory you want to restrict the the spout from fetching blobs...