public class ClickHouseOutput extends AbstractJDBCOutput
Note that the table on which you want to insert data must already be created inside a specific database. A preconfigured ClickHouse database on the system running this bolt.
Data should already be valid data type representation (ex: if table contain Date column, your tuple should contain a field matching the column name and the data type representation in string). Each stream should contain unique field name.
create table flights(arr_timestamp timestamp, dep_timestamp timestamp, uniquecarrier varchar) ENGINE=Log;
How it works:
Given the required parameters to connect to the database (user, password, database_name, port, host), this bolt will try to insert data while dynamically casting the inserted data to the right type base on the table metadata on which you want to insert data.
The ClickHouse bolt will try to match the provided field name found in a tuple to the table column name on which you want to insert data. It will try to cast the data passed on each field to match your table schema dynamically.
Below is a valid configuration example you can use with punchlinectl for testing purpose
dag:
[
{
component: generator
type: generator_input
settings:
{
messages:
[
{
logs:
{
dep_timestamp: 2017-04-23 06:30:00
uniquecarrier: test
arr_timestamp: 2017-04-23 06:30:00
}
}
{
logs:
{
uniquecarrier: test
arr_timestamp: 2017-04-23 06:30:00
dep_timestamp: 2017-04-23 06:30:00
}
}
{
logs:
{
arr_timestamp: 2017-04-23 06:30:00
dep_timestamp: 2017-04-23 06:30:00
uniquecarrier: test
}
}
]
}
}
{
component: clickhouse
type: clickhouse_output
settings:
{
host: localhost
port: 8123
username: default
password: ""
database: default
table: flights
bulk_size: 3
column_names:
[
arr_timestamp:cast
dep_timestamp:cast
uniquecarrier:cast
]
}
subscribe:
[
{
component: generator,
stream: logs,
fields:
[
arr_timestamp,
dep_timestamp,
uniquecarrier
]
}
]
}
]
ackFail, bulkSize, COLUMN_NAME_WRAPPER_CHAR_SETTING, COLUMN_NAMES_SETTING, COLUMN_TYPE_CAST, COLUMN_TYPE_FUNCTION, columnNames, columnNameWrapper, columnsNamesSeparator, CONVERT_BOOLEAN_TO_INT_SETTING, convertBooleanToInt, database, expirationTimeout, hikariConfigMaps, hosts, jdbcHandleIterator, lastUpdateTimestamp, password, poolName, poolSize, queryQueueTime, reconnectionTimeout, SOURCE_TUPLE_FIELD_SETTING, sourceTupleField, table, username
Constructor and Description |
---|
ClickHouseOutput(org.thales.punch.libraries.storm.api.NodeSettings boltSettings) |
Modifier and Type | Method and Description |
---|---|
protected String |
getDefaultColumnNameWrapperChar() |
protected Boolean |
getDefaultConvertBooleanToInt() |
void |
prepare(Map stormConf,
org.apache.storm.task.TopologyContext context,
org.apache.storm.task.OutputCollector collector) |
addTupleToQuery, cleanup, failAll, formatColumnForAutoCast, formatColumnForFunction, getComponentConfiguration, process
public ClickHouseOutput(org.thales.punch.libraries.storm.api.NodeSettings boltSettings) throws org.thales.punch.exceptions.ConfigurationException
org.thales.punch.exceptions.ConfigurationException
protected String getDefaultColumnNameWrapperChar()
getDefaultColumnNameWrapperChar
in class AbstractJDBCOutput
protected Boolean getDefaultConvertBooleanToInt()
getDefaultConvertBooleanToInt
in class AbstractJDBCOutput
public void prepare(Map stormConf, org.apache.storm.task.TopologyContext context, org.apache.storm.task.OutputCollector collector)
prepare
in interface org.apache.storm.task.IBolt
prepare
in class AbstractJDBCOutput
Copyright © 2023. All rights reserved.