public abstract class AbstractFileOutput
extends org.thales.punch.libraries.storm.api.BaseProcessingNode
implements org.thales.punch.libraries.punchlang.api.IRuntimeContext
The file bolt writes the incoming tuples into archives. It batches the tuples in groups, flushes the whole content at once in the configured archive cluster.
The file bolt settings lets you decide where to flush the batches (filesystem or ceph objects) and act on the saved archive format (compression, encryption, separator etc..).
To take into consideration that cluster parameter can take multiple mount points, each of them should be
delimited be a comma separator. For instance:
"cluster": "file:///tmp/archive-logs-1/storage,file:///tmp/archive-logs-2/storage"
Here is a complete example to flush the batch to a (local or mounted) filesystem:
"bolts" : [
{
"type" : "file_bolt",
"bolt_settings" : {
"cluster" : "file:///tmp/archive",
}
"storm_settings" : {
"executors": 1,
"subscribe" : [ { "stream" : "logs" } ]
}
}
property | mandatory | type | default | comment |
---|---|---|---|---|
"cluster" | yes | String | - | The url of the destination archive store. See below for details |
"compression_format" | no | String | "NONE" | One of "NONE", "SNAPPY" or "GZIP" |
"folders_hierarchy_pattern" | no | String | "%{topic}/%{partition}/%{date}/%{batches_set}" | If objects-storage is implemented over a file-system, batches of logs will be written on multiple folders and sub-folders. This folders hierarchy is described in this parameter as a pattern. It can contain a static string like "my_archives/my_technology" and/or specific patterns: %{topic} for a topic, %{partition} for partition number, %{date} for earliest tuple date time in batch (following YYYY.mm.dd format only), or %{batches_set} which is the first numbers of batch number. This last pattern is tricky: it allows to store a fixed maximum number of batches in a folder. For example if "batches_per_folder" equals 1000 and if "folders_hierarchy_pattern" is "%{batches_set} then batches 1098 and 1099 will be stored in folder "1" and batches 2000 and 2001 will be stored in folder "2". |
"batches_per_folder" | no | Integer | 1000 | If objects-storage is implemented over a file-system, batches of logs will be written on multiple folders and sub-folders. Each sub-folder, if default folders hierarchy pattern is kept, will contain at most this number of batches. |
"strategy" | no | String | "exactly_once" | Define the batching strategy, accepted values are: "exactly_once", "at_least_once", "one_by_one" |
"create_root" | no | boolean | false | By default, the filebolt does not create the root directory. Set this option to true will create the root directory if it doesn't exists. |
"tags" | no | String[] | - | Only works in "at_least_once" batching mode. This parameter modify the batch grouping strategy to regroup events based on their "tags" values. A subscribed Storm field must correspond to each tag. The bolt reads the values contained in each field and regroups all the Tuples with the same values in the same batch. All the tags values are appended to the final file name. |
"batch_size" | no | Integer | 1000 | Only for "at_least_once" case, define the maximum number of events stored in a batch. |
"batch_expiration_timeout" | no | String | 10s | Only for "at_least_once" case. If no new events are added to a batch for more than this period, the batch is flushed. |
"batch_life_timeout" | no | String | 0 | Only for "at_least_once" case. If the batch has been alive for more than this period, the batch is flushed. |
The cluster parameter lets you choose among the following archive endpoints :
This bolt emits metadata containing key information about each batch (compressed size, encoding...). By cascading an ElasticBolt, you can index these metadata in ElasticSearch.
The archive files or objects (if using ceph object storage) contains basically csv data, containing all or the configured the received fields.
Check for additional documentation: https://doc.punchplatform.com/Reference_Guide/Data_Movement/Bolts/File_Bolt.html
AtLeastOnceFileBolt
,
OneByOneFileBolt
,
Serialized FormModifier and Type | Field and Description |
---|---|
protected org.thales.punch.libraries.objectstorage.tuples.ArchiveFormatSettings |
archiveFormatSettings
The archive format settings.
|
protected IFileNameSupplier |
fileNameSupplier
This supplier is in charge of providing us with the identifier
of each batch.
|
protected com.codahale.metrics.Timer |
tupleRtt
Time to process tuple metric
|
protected org.thales.punch.libraries.storm.api.StreamDeclaration |
userStream
The user stream where to publish the batches metadata.
|
Constructor and Description |
---|
AbstractFileOutput(org.thales.punch.libraries.storm.api.NodeSettings boltConfig,
List<org.thales.punch.libraries.objectstorage.tuples.ArchiveDevice> archiveDevices,
IFileNameSupplier fileNameSupplier,
org.thales.punch.libraries.objectstorage.tuples.ArchiveFormatSettings archiveFormatSettings)
Ctor.
|
Modifier and Type | Method and Description |
---|---|
void |
addRuntimeData(Object context,
int key,
Object value) |
protected BatchArchiver |
createArchiver(org.thales.punch.libraries.objectstorage.indexing.BatchMetadata metadata)
Create correct BatchArchiver from metadata
|
protected void |
flush(BatchArchiver batch)
Flush this batch, i.e.
|
List<String> |
getBloomFilterFields()
Getter for bloom fields.
|
Object |
getRuntimeData(Object context,
int key) |
long |
getTimestampFromTuple(org.apache.storm.tuple.ITuple tuple)
Extracts a timestamp from the received tuple.
|
protected org.thales.punch.libraries.objectstorage.indexing.BatchMetadata |
initMetadata(long timestamp)
Common metadata initialization
|
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector outputCollector)
Prepare this bolt.
|
protected org.thales.punch.libraries.storm.api.StreamDeclaration userStream
protected final org.thales.punch.libraries.objectstorage.tuples.ArchiveFormatSettings archiveFormatSettings
protected final IFileNameSupplier fileNameSupplier
protected transient com.codahale.metrics.Timer tupleRtt
public AbstractFileOutput(org.thales.punch.libraries.storm.api.NodeSettings boltConfig, List<org.thales.punch.libraries.objectstorage.tuples.ArchiveDevice> archiveDevices, IFileNameSupplier fileNameSupplier, org.thales.punch.libraries.objectstorage.tuples.ArchiveFormatSettings archiveFormatSettings)
boltConfig
- Bolt settingsarchiveDevices
- Url to archive devicesfileNameSupplier
- Supplier for filename (Object storage or Simple filename)archiveFormatSettings
- Specific Batch settingspublic void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector outputCollector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
protected void flush(BatchArchiver batch)
batch
- batch to flushprotected org.thales.punch.libraries.objectstorage.indexing.BatchMetadata initMetadata(long timestamp)
timestamp
- beginning timestamppublic long getTimestampFromTuple(org.apache.storm.tuple.ITuple tuple)
tuple
- the input tuplepublic void addRuntimeData(Object context, int key, Object value)
addRuntimeData
in interface org.thales.punch.libraries.punchlang.api.IRuntimeContext
public Object getRuntimeData(Object context, int key)
getRuntimeData
in interface org.thales.punch.libraries.punchlang.api.IRuntimeContext
protected BatchArchiver createArchiver(org.thales.punch.libraries.objectstorage.indexing.BatchMetadata metadata)
metadata
- metadataCopyright © 2023. All rights reserved.