public class FileTransferBolt
extends org.thales.punch.libraries.storm.api.BaseProcessingNode
This node enable you to transfer file located on your filesystem to somewhere else: S3, Hadoop, FileSystem (based on supported sink point of avro-parquet lib)
It is expected that this node receive a stream containing the following field: - [absolute path of where the file is located on filesystem]: use the node configuration to select the right field (received_file_path) - remote_file_last_modified_timestamp: original creation or modified date of the file that will be transfer For each transferred file, this node will emit meta data of the transferred file: - local_downloaded_file_path: file location on filesystem used during transfer - remote_file_location: where the file is transferred to... can be something like s3a://bucket/filename.csv - file_creation_date: equals to remote_file_last_modified_timestamp received in stream NOTE: Use hadoop_settngs key to configure the right client.
{
version: "6.0"
type: punchline
runtime: storm
dag: [
{
type: sftp_input
component: input
settings: {
cleanup: false
download_path: /tmp/toto
download_ignore_suffix: ".complete"
download_add_suffix: ".nm4"
consume_mode: earliest
sftp_settings: {
sftp.ssh.host: server3
sftp.ssh.auth.user: abcuser
sftp.ssh.auth.pass: abcpassword
sftp.ssh.file.name_regex: "*.complete"
sftp.ssh.scan_directories: [
SAISData
]
}
checkpoint_settings: {
checkpoint.application_runtime_id: sftp_application_id_test
checkpoint.es_index_prefix: jonathan_tenant-test-
checkpoint.es_nodes: [
{
host: localhost
}
]
}
}
publish: [
{
stream: files
fields: [
meta
]
}
]
subscribe: [
]
}
{
type: punchlet_node
component: punch
settings: {
punchlet_code: "{ print(root); }"
}
subscribe: [
{
stream: files
component: input
}
]
publish: [
{
stream: files
fields: [
meta
]
}
]
}
{
type: file_transfer_output
component: output
settings: {
destination_folder: s3a://punch/transferred
received_file_path: local_downloaded_file_path
hadoop_settings: {
fs.s3a.access.key: minioadmin
fs.s3a.secret.key: minioadmin
fs.s3a.endpoint: http://127.0.0.1:9000
}
}
subscribe: [
{
stream: files
component: punch
}
]
publish: [
{
stream: files
fields: [
meta
]
}
]
}
{
type: kafka_output
component: kafka_out
settings: {
topic: enfbordeaux
brokers: local
encoding: json
producer.acks: all
producer.batch.size: 16384
producer.linger.ms: 5
}
subscribe: [
{
stream: files
component: output
}
]
}
]
}
Constructor and Description |
---|
FileTransferBolt(org.thales.punch.libraries.storm.api.NodeSettings config,
String receivedFilePath,
String destination,
org.thales.punch.settings.api.ISettingsMap hadoopSettings) |
Modifier and Type | Method and Description |
---|---|
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector) |
void |
process(org.apache.storm.tuple.Tuple input)
File that will be transfer are read line by line.
|
public void process(org.apache.storm.tuple.Tuple input)
process
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
public void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class org.thales.punch.libraries.storm.api.BaseProcessingNode
Copyright © 2023. All rights reserved.