public class PostgresqlOutput extends AbstractJDBCOutput
REQUIREMENTS: The table on which you want to insert data should already be present inside a specific database. A preconfigured Postgresql database on the system running this bolt. Data should already be valid data type representation (ex: if table contain Date column, your tuple should contain a field matching the column name and the data type representation in string). Each stream should contain unique field name.
create table mytopology_table (text1 TEXT, text2 TEXT);
How it works:
Given the required parameters to connect to the database (user, password, database_name, port, host), this bolt will try to insert data while dynamically casting the inserted data to the right type base on the table metadata on which you want to insert data.
The postgresql bolt will try to match the provided field name found in a tuple to the table column name on which you want to insert data. It will try to cast the data passed on each field to match your table schema dynamically.
Below is a valid configuration example you can use with the punchlinectl for testing purpose
dag: [
{
{
component: generator
type: generator_input
settings:
{
messages:
[
{
logs:
{
text1: this is a text 1
text2: this is a text22
}
}
{
logs:
{
text1: this is a text 3
text2: this is a text44
}
}
{
logs:
{
text1: this is a text 5
text2: this is a text66
}
}
]
}
}
{
component: postgres
type: postgresql_output,
settings:
{
host: localhost,
username: yuechun,
port: 5432,
password: yuechun,
database: mytopology,
table: mytopology_table,
bulk_size: 1,
pool_size: 10,
pool_name: myPool,
column_names:
[
text1
text2
]
},
subscribe:
[
{
component: generator,
stream: log",
fields:
[
text1
text2
]
}
]
}
]
ackFail, bulkSize, COLUMN_NAME_WRAPPER_CHAR_SETTING, COLUMN_NAMES_SETTING, COLUMN_TYPE_CAST, COLUMN_TYPE_FUNCTION, columnNames, columnNameWrapper, columnsNamesSeparator, CONVERT_BOOLEAN_TO_INT_SETTING, convertBooleanToInt, database, expirationTimeout, hikariConfigMaps, hosts, jdbcHandleIterator, lastUpdateTimestamp, password, poolName, poolSize, queryQueueTime, reconnectionTimeout, SOURCE_TUPLE_FIELD_SETTING, sourceTupleField, table, username
Constructor and Description |
---|
PostgresqlOutput(org.thales.punch.libraries.storm.api.NodeSettings boltSettings) |
Modifier and Type | Method and Description |
---|---|
protected Object |
formatColumnForAutoCast(Object value) |
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector) |
addTupleToQuery, cleanup, failAll, formatColumnForFunction, getComponentConfiguration, getDefaultColumnNameWrapperChar, getDefaultConvertBooleanToInt, process
public PostgresqlOutput(org.thales.punch.libraries.storm.api.NodeSettings boltSettings) throws org.thales.punch.exceptions.ConfigurationException
org.thales.punch.exceptions.ConfigurationException
public void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class AbstractJDBCOutput
protected Object formatColumnForAutoCast(Object value)
formatColumnForAutoCast
in class AbstractJDBCOutput
Copyright © 2023. All rights reserved.