public class OmniSciOutput extends AbstractJDBCOutput
REQUIREMENTS: The destination table should already be present inside a specific database. A preconfigured OmniSci database on the system running this bolt. Data should already be valid data type representation (ex: if table contain Date column, your tuple should contain a field matching the column name and the data type representation in string). Each stream should contain unique field name.
create table flights(arr_timestamp timestamp, dep_timestamp timestamp, uniquecarrier varchar(50));
How it works:
Given the required parameters to connect to the database (user, password, database_name, port, host), this bolt will try to insert data while dynamically casting the inserted data to the right type, based on the destination table's metadata.
The OmniSci bolt will try to match the provided field name found in a tuple to the table column name on which you want to insert data. It will try to cast the data passed on each field to match your table schema dynamically.
The "jdbc" insert_type only allows single line insert. To insert multiple rows, use the "thrift_transport" insert_type.
Below is a valid configuration example you can use with the punchlinectl for testing purpose
dag:
[
{
component: generator
type: generator_input
settings:
{
messages:
[
{
logs:
{
dep_timestamp: 2017-04-23 06:30:00
uniquecarrier: test
arr_timestamp: 2017-04-23 06:30:00
}
}
{
logs:
{
uniquecarrier: test
arr_timestamp: 2017-04-23 06:30:00
dep_timestamp: 2017-04-23 06:30:00
}
}
{
logs:
{
arr_timestamp: 2017-04-23 06:30:00
dep_timestamp: 2017-04-23 06:30:00
uniquecarrier: test
}
}
]
}
}
{
component: omnisci
type: omnisci_output
settings:
{
host: localhost
port: 6274
username: admin
password: HyperInteractive
database: omnisci
table: flights
insert_type: thrift_transport // can be jdbc (in that case, bulk_size should be set to 1)
bulk_size: 3
column_names:
[
arr_timestamp:cast
dep_timestamp:cast
uniquecarrier:cast
]
}
subscribe:
[
{
component: generator
stream: logs
fields:
[
arr_timestamp
dep_timestamp
uniquecarrier
]
}
]
}
]
ackFail, bulkSize, COLUMN_NAME_WRAPPER_CHAR_SETTING, COLUMN_NAMES_SETTING, COLUMN_TYPE_CAST, COLUMN_TYPE_FUNCTION, columnNames, columnNameWrapper, columnsNamesSeparator, CONVERT_BOOLEAN_TO_INT_SETTING, convertBooleanToInt, database, expirationTimeout, hikariConfigMaps, hosts, jdbcHandleIterator, lastUpdateTimestamp, password, poolName, poolSize, queryQueueTime, reconnectionTimeout, SOURCE_TUPLE_FIELD_SETTING, sourceTupleField, table, username
Constructor and Description |
---|
OmniSciOutput(org.thales.punch.libraries.storm.api.NodeSettings boltSettings) |
Modifier and Type | Method and Description |
---|---|
protected void |
ackAll() |
void |
cleanup() |
Map<String,Object> |
getComponentConfiguration() |
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector) |
void |
process(org.apache.storm.tuple.Tuple input) |
addTupleToQuery, failAll, formatColumnForAutoCast, formatColumnForFunction, getDefaultColumnNameWrapperChar, getDefaultConvertBooleanToInt
public OmniSciOutput(org.thales.punch.libraries.storm.api.NodeSettings boltSettings) throws org.thales.punch.exceptions.ConfigurationException
org.thales.punch.exceptions.ConfigurationException
public void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class AbstractJDBCOutput
public void process(org.apache.storm.tuple.Tuple input)
process
in class AbstractJDBCOutput
public void cleanup()
cleanup
in interface org.apache.storm.task.IBolt
cleanup
in class AbstractJDBCOutput
public Map<String,Object> getComponentConfiguration()
getComponentConfiguration
in interface org.apache.storm.topology.IComponent
getComponentConfiguration
in class AbstractJDBCOutput
protected void ackAll()
Copyright © 2022. All rights reserved.