Clickhouse Output¶
This output node enables you to insert data into a Clickhouse Database.
It inserts data by executing an SQL query dynamically constructed from the node input parameters.
Requirements¶
The destination database and table should already exist. Input tuple fields must match the column names and data types
of the destination table.
You must also have a user account with the necessary credentials (read and write).
Settings¶
Essential Settings¶
Name | Type | Default Value | Description |
---|---|---|---|
hosts | List<String> |
- | Hosts of the database. |
cluster_id | String | - | Clickhouse cluster name from platform properties |
username (Required) | String | - | Username to connect to database. |
password (Required) | String | - | Password to connect to database. |
database (Required) | String | - | Database name. |
table (Required) | String | - | Table name. |
** | |||
column_names** List<String> |
- | Column names with the strategy to use. See Column names. | |
bulk_size (Required) | Integer | - | Bulk size limit before flushing data into table. |
Error Handling¶
The clickhouse output reacts differently depending on the error. If the error indicates a connection problem towards one of the clickhouse target server, the error is logged, but the processing continues and there will be later retries.
If the error is related to a schema or table incompatibility on a record, then
-
If _ppf_errors stream is published, the faulty event is output there. You can publish the following fields
- _ppf_error_message => will contain the Clickhouse exception . e.g: "_ppf_error_message": " java.lang.RuntimeException: Failed to execute SQL || Failed to execute SQL || ClickHouse exception, code: 6, host: localhost, port: 8123; Code: 6, e.displayText() = DB::Exception: Cannot parse string '2021x-09-21T06:59: 19.003545+0100' as DateTime64(3): syntax error at position 19 (parsed just '2021x-09-21T06:59:1') "
- _ppf_error_document => will contain the SQL insert query that was rejected for this single record (even if you requested a batched insertion)
- ANY other field that was in the subscribed stream (such as your event or log field)
-
Otherwise, the output node output triggers a fatal exit.
If you set several server addresses, the clickhouse output node will load-balance the traffic towards all of them. Failed servers are automatically removed from the set of chosen servers. A periodic reconnection will take place to eventually recover the failed server.
Advanced Settings¶
Name | Type | Default Value | Description |
---|---|---|---|
pool_name | String | my_hiraki_pool | HirakiCP pool name. |
pool_size | Integer | 10 | HirakiCP connection pool size. |
query_timeout_sec | Integer | 30 | sql query timeout |
batch_expiration_timeout | String | 10s | Period of inactivity before flushing. |
reconnection_interval | String | 10s | Period between retrying to connect to clickhouse target server. |
source_tuple_field | String | <null> |
If provided, then columns values will not be read from fields in the storm tuple, but within a single Map tuple field. If you use source_tuple_field setting, and use a punchlet node as source, you need to use "field_publishing_strategy": "tuple" setting in this node, to avoid sending the tuple as json, because the JDBC nodes like this one do not decode the "json" encoding of a tuple field. |
Column Names¶
You can provide the column names, and optionally a strategy using the following format: <name>
:<strategy>
. If you do
not provide column names, then the column names will be read from the first incoming tuple (with 'cast' strategy).
There are two strategies for column values processing :
cast
: the default strategy. Handles raw values. Data will be sent to clickhouse as a string. Clickhouse will automatically cast the value to the correct type. With this strategy, data must be correctly formatted.function
: Handles SQL functions. Data will be sent to clickhouse as such. Clickhouse will try to use the provided SQL function. With this strategy, data should use a sql function, or an error may be raised.
Warning
BE CAREFUL if you combine this setting with not-provided column_names
setting, as the first tuple will be
used to guess columns for ALL tuples, so if your first tuple lacks some column keys, these columns will NEVER be
output...
Bulk Settings¶
The ClickhouseOutput uses a bulk strategy. The input data is written to the destination table if :
- Bulk size is reached : i.e. enough tuples have been received.
- batch Expiration timeout is reached : if no now tuples have been received during this period, the query will be sent, and the bulk will start over. This allows writing data even if the bulk is not complete.
If none of these conditions are reached, the data is kept in memory and not written to the database. A good practice is to set a high bulk size to improve performance, and to set the expiration timeout according to the throughput you expect to have.
Example¶
This example is provided as part of the standalone clickhouse getting started guide. We will use a database named default, hosted at localhost:8123. Username is default and password is empty.
First we need to create the table : here is an example to be used with the standalone flight demo channel. Refer to the clickhouse documentation for details.
create table flights (
uniquecarrier_id UUID, \
event_timestamp DateTime, \
arrival_timestamp DateTime, \
departure_timestamp DateTime, \
flight_consumption UInt32, \
flight_ip IPv4) \
ENGINE = MergeTree \
PARTITION BY toYYYYMMDD(event_timestamp) \
ORDER BY event_timestamp \
;
For example on the standalone punch use the client as follows, then cut and paste the create table
statement :
$PUNCHPLATFORM_CLICKHOUSE_INSTALL_DIR/usr/bin/clickhouse client --port 9100
How it works:
Given the required parameters to connect to the database (user, password, database_name, port, host), the clickhouse output node will try to insert data while dynamically casting the inserted data to the right type base on the destination table metadata.
Each input tuple field is matched to the destination table column using the name correspondence.
Below is a valid configuration example you can use with punchlinectl for testing purpose. Checkout the clickhouse standalone getting started guide.
version: '6.0'
runtime: storm
type: punchline
dag:
- type: syslog_input
settings:
listen:
proto: tcp
host: 0.0.0.0
port: 9909
publish:
- stream: logs
fields:
- log
- component: punchlet
type: punchlet_node
settings:
punchlet_code: '{kv().on([logs][log]).into([logs]);}'
subscribe:
- component: syslog_input
stream: logs
publish:
- stream: logs
fields:
- event_timestamp
- uniquecarrier_id
- departure_timestamp
- arrival_timestamp
- flight_consumption
- flight_ip
- component: output
type: clickhouse_output
settings:
hosts:
- localhost:8123
username: default
password: ''
database: default
table: flights
bulk_size: 10
column_names:
- event_timestamp
- uniquecarrier_id
- departure_timestamp
- arrival_timestamp
- flight_consumption
- flight_ip
subscribe:
- component: punchlet
stream: logs
settings:
topology.worker.childopts: -server -Xms1g -Xmx4g
The table content looks like:
┌─────────────────────uniquecarrier_id─┬─────event_timestamp─┬───arrival_timestamp─┬─departure_timestamp─┬─flight_consumption─┬─flight_ip──────┐
│ c68a3b35-3fbf-4e42-9b59-62ff73c4268b │ 2021-02-14 09:13:58 │ 2020-02-03 11:20:53 │ 2020-02-01 01:00:00 │ 22569 │ 192.168.99.79 │
│ 4e6ad8e0-307c-4f38-a9c5-a0b0ec4f33aa │ 2021-02-14 09:13:58 │ 2020-02-04 09:32:47 │ 2020-02-01 02:00:00 │ 2826 │ 192.168.99.86 │
│ 785873e3-3ac0-4527-8acb-78813cf865a4 │ 2021-02-14 09:13:58 │ 2020-02-02 07:46:39 │ 2020-02-01 03:00:00 │ 22382 │ 192.168.99.207 │
│ bf6a5983-b911-4210-aad1-3d52533e1d70 │ 2021-02-14 09:13:58 │ 2020-02-04 21:54:59 │ 2020-02-01 04:00:00 │ 10480 │ 192.168.99.129 │
│ b6550c4b-fb13-460d-b88b-7b967244a039 │ 2021-02-14 09:13:58 │ 2020-02-02 05:16:52 │ 2020-02-01 05:00:00 │ 14923 │ 192.168.99.20 │
│ e4102d0d-372f-41c2-9a1f-67ef96de1718 │ 2021-02-14 09:13:58 │ 2020-02-03 21:32:07 │ 2020-02-01 06:00:00 │ 29558 │ 192.168.99.227 │
│ df494d46-e020-4d0c-a804-c4c1476a48ba │ 2021-02-14 09:13:58 │ 2020-02-01 13:08:00 │ 2020-02-01 07:00:00 │ 3284 │ 192.168.99.72 │
│ 9bcfab97-8e92-4d67-a0fb-ca45827c4cc1 │ 2021-02-14 09:13:58 │ 2020-02-04 04:00:15 │ 2020-02-01 08:00:00 │ 15636 │ 192.168.99.3 │
│ a0cdf9ef-1b06-4ab4-9fda-50a464534ec0 │ 2021-02-14 09:13:58 │ 2020-02-02 06:05:24 │ 2020-02-01 09:00:00 │ 27845 │ 192.168.99.235 │
│ 1b234b9b-98fb-4436-8ebc-6dd49c1220b0 │ 2021-02-14 09:13:58 │ 2020-02-01 15:00:03 │ 2020-02-01 10:00:00 │ 13438 │ 192.168.99.61 │
└──────────────────────────────────────┴─────────────────────┴─────────────────────┴─────────────────────┴────────────────────┴────────────────┘