public class AzureBlobStorageSpout
extends org.thales.punch.libraries.storm.api.BaseInputNode
For this spout to work, you will need an Azure account with administrator access to the blob on which
you want to fetch data.
By specifying a blob extension (json, csv, *, etc...) of a container, this spout will pull on a specific time interval every blobs that was inserted / modified the last XX seconds/minutes/hours.
A last_committed blob is created in root path of your container. The blob contain the date of the last successfull ack tuple.
The created blob name is attributed to a unique blob name: Platform ID_Platform tenant_Platform Channel_Your topology name.
Parameter | Description | Value Type | Values | Required |
blobstorage_name | The name of the blob storage. | String | "your_blob_storage_name" | true |
blobstorage_key | The key of your blob storage. | String | "your_blob_storage_key" | true |
container_name | The container name found in your blob storage. | String | a_container_name | true |
virtual_directory_blob_name_extension_regex | Blob extension that should be pull. | String | "json" | true |
pull_interval | Use elasticsearch time convention. Ex 10s for 10 seconds, 10m for 10 minutes and 10h for 10 hours. If not specified default value to 15 seconds. |
String | "Xs" or "Xm" or "Xh", where X is an integer. |
false |
read_blob_since_last | Use elasticsearch time convention. Ex 10s for 10 seconds, 10m for 10 minutes and 10h for 10 hours. If not specified default value to 15 + 2 seconds. Will pull blob whose age (date) is greater than "current_time" - "last_modified_time". Upload time lag should be taken into account. Adding 2 seconds guarantees that all blobs will be pulled. |
String | "Xs" or "Xm" or "Xh", where X is an integer. |
false |
chunk_size | Size of each reading operation in bytes. The buffer size should be big enough if you plan to pull large blob. Will default to: 1048576 if not specified. | Integer | ex: 256000 | false |
codec | Specify a codec to be use by the buffer. | String | "json_array" | true |
read_strategy | Define the mode this spout should scan an azure blob storage container.
last_committed: Resume pull from last_committed blob. earliest: Pull all blobs that exist in the container. Exit the topology when pulling is over. latest: Pull only latest blobs and last_committed blob are ignored. This is the default read strategy if user did not specified. |
String | "last_committed" or "earliest" or "latest" | false |
Below is a working example:
{
"spouts": [
{
"type": "azureblobstorage_spout",
"spout_settings": {
"blobstorage_name": "name-of-your-blob-storage",
"blobstorage_key": "your-blob-storage-key",
"container_name": "name-of-a-container-in-your-blob",
"virtual_directory_blob_name_extension_regex": "*",
"pull_interval": 10s,
"read_blob_since_last": 19s,
"chunk_size": 1048576,
"codec": "json_array",
"read_strategy": "last_committed",
"blob_name_prefix": "a-pA-P",
"virtual_directory_prefix_list": [""],
},
"storm_settings": {
"component": "azureblob",
"publish": [
{
"stream": "input",
"fields": [
"value"
]
}
]
}
}
],
"bolts": [
{
"type": "punch_bolt",
"bolt_settings": {
"punchlet_code": "{print(root);}",
"decoding_strategy" : "smart"
},
"storm_settings": {
"component": "punch_bolt",
"subscribe": [
{
"component": "azureblob",
"stream": "input"
}
]
}
}
]
}
Constructor and Description |
---|
AzureBlobStorageSpout(org.thales.punch.libraries.storm.api.NodeSettings config,
boolean enableNsg)
Constructor.
|
Modifier and Type | Method and Description |
---|---|
void |
ack(Object msgId) |
void |
fail(Object msgId) |
void |
lastCommittedWriter()
will be use to write last_committed on success
|
void |
nextTuple() |
void |
open(Map conf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.spout.SpoutOutputCollector collector) |
close, deactivate, declareOutputFields, getPublishedStreams, regulate, sendLatencyRecord
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
public AzureBlobStorageSpout(org.thales.punch.libraries.storm.api.NodeSettings config, boolean enableNsg)
config
- the spout configenableNsg
- true to make the spout act as a NSG spoutpublic void open(Map conf, org.apache.storm.task.TopologyContext context, org.apache.storm.spout.SpoutOutputCollector collector)
open
in interface org.apache.storm.spout.ISpout
open
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void nextTuple()
public void lastCommittedWriter()
public void ack(Object msgId)
ack
in interface org.apache.storm.spout.ISpout
ack
in class org.thales.punch.libraries.storm.api.BaseInputNode
public void fail(Object msgId)
fail
in interface org.apache.storm.spout.ISpout
fail
in class org.thales.punch.libraries.storm.api.BaseInputNode
Copyright © 2022. All rights reserved.