Skip to content

Clickhouse Output

This output node enables you to insert data into a Clickhouse Database.

It inserts data by executing an SQL query dynamically constructed from the node input parameters.

Requirements

The destination database and table should already exist. Input tuple fields must match the column names and data types of the destination table.
You must also have a user account with the necessary credentials (read and write).

Settings

Essential Settings

Name Type Default Value Description
hosts List<String> - Hosts of the database.
cluster_id String - Clickhouse cluster name from platform properties
username (Required) String - Username to connect to database.
password (Required) String - Password to connect to database.
database (Required) String - Database name.
table (Required) String - Table name.
column_names (Required) List<String> - Column names with the strategy to use. See Column names.
bulk_size (Required) Integer - Bulk size limit before flushing data into table.

Error Handling

The clickhouse output reacts differently depending on the error. If the error is related to an schema or table incompatibility the clickhous output triggers a fatal exit. If the error indicates a connection problem towards one of the clickhoue target server, the error is logged, but the processing continues.

If you set several server addresses, the clickhouse output node will load-balance the traffic towards all of them. Failed servers are automatically removed from the set of chosen servers. A periodic reconnection will take place to eventually recover the failed server.

Advanced Settings

Name Type Default Value Description
pool_name String my_hiraki_pool HirakiCP pool name.
pool_size Integer 10 HirakiCP connection pool size.
query_timeout_sec Integer 30 sql querty timeout
batch_expiration_timeout String 10s Period of inactivity before flushing.
reconnection_interval String 10s Period between retrying to connect to clickhouse target server.

Column Names

You must provide the column names, and optionally a strategy using the following format: <name>:<strategy>.

There are two strategies for column values processing :

  • cast: the default strategy. Handles raw values. Data will be sent to clickhouse as a string. Clickhouse will automatically cast the value to the correct type. With this strategy, data must be correctly formatted.
  • function: Handles SQL functions. Data will be sent to clickhouse as such. Clickhouse will try to use the provided SQL function. With this strategy, data should use a sql function, or an error may be raised.

Bulk Settings

The ClickhouseOutput uses a bulk strategy. The input data is written to the destination table if :

  • Bulk size is reached : i.e. enough tuples have been received.
  • batch Expiration timeout is reached : if no now tuples have been received during this period, the query will be sent, and the bulk will start over. This allows writing data even if the bulk is not complete.

If none of these conditions are reached, the data is kept in memory and not written to the database. A good practice is to set a high bulk size to improve performance, and to set the expiration timeout according to the throughput you expect to have.

Example

This example is provided as part of the standalone clickhouse getting started guide. We will use a database named default, hosted at localhost:8123. Username is default and password is empty.

First we need to create the table : here is an example to be used with the standalone flight demo channel. Refer to the clickhouse documentation for details.

create table flights (
  uniquecarrier_id UUID, \
  event_timestamp DateTime, \
  arrival_timestamp DateTime, \
  departure_timestamp DateTime, \
  flight_consumption UInt32, \
  flight_ip IPv4) \ 
  ENGINE = MergeTree \
  PARTITION BY toYYYYMMDD(event_timestamp) \
  ORDER BY event_timestamp \
  ;

For example on the standalone punch use the client as follows, then cut and paste the create table statement :

$PUNCHPLATFORM_CLICKHOUSE_INSTALL_DIR/usr/bin/clickhouse client --port 9100

How it works:

Given the required parameters to connect to the database (user, password, database_name, port, host), the clickhouse ouput node will try to insert data while dynamically casting the inserted data to the right type base on the destination table metadata.

Each input tuple field is matched to the destination table column using the name correspondance.

Below is a valid configuration example you can use with punchlinectl for testing purpose. Checkout the clickhouse standalone getting started guide.

version: '6.0'
runtime: storm
type: punchline
dag:
- type: syslog_input
  settings:
    listen:
      proto: tcp
      host: 0.0.0.0
      port: 9909
  publish:
  - stream: logs
    fields:
    - log
- component: punchlet
  type: punchlet_node
  settings:
    punchlet_code: '{kv().on([logs][log]).into([logs]);}'
  subscribe:
  - component: syslog_input
    stream: logs
  publish:
  - stream: logs
    fields:
    - event_timestamp
    - uniquecarrier_id
    - departure_timestamp
    - arrival_timestamp
    - flight_consumption
    - flight_ip
- component: output
  type: clickhouse_output
  settings:
    hosts:
    - localhost:8123
    username: default
    password: ''
    database: default
    table: flights
    bulk_size: 10
    column_names:
    - event_timestamp
    - uniquecarrier_id
    - departure_timestamp
    - arrival_timestamp
    - flight_consumption
    - flight_ip
  subscribe:
  - component: punchlet
    stream: logs
settings:
  topology.worker.childopts: -server -Xms1g -Xmx4g

The table content looks like:

┌─────────────────────uniquecarrier_id─┬─────event_timestamp─┬───arrival_timestamp─┬─departure_timestamp─┬─flight_consumption─┬─flight_ip──────┐
│ c68a3b35-3fbf-4e42-9b59-62ff73c4268b │ 2021-02-14 09:13:58 │ 2020-02-03 11:20:53 │ 2020-02-01 01:00:00 │              22569 │ 192.168.99.79  │
│ 4e6ad8e0-307c-4f38-a9c5-a0b0ec4f33aa │ 2021-02-14 09:13:58 │ 2020-02-04 09:32:47 │ 2020-02-01 02:00:00 │               2826 │ 192.168.99.86  │
│ 785873e3-3ac0-4527-8acb-78813cf865a4 │ 2021-02-14 09:13:58 │ 2020-02-02 07:46:39 │ 2020-02-01 03:00:00 │              22382 │ 192.168.99.207 │
│ bf6a5983-b911-4210-aad1-3d52533e1d70 │ 2021-02-14 09:13:58 │ 2020-02-04 21:54:59 │ 2020-02-01 04:00:00 │              10480 │ 192.168.99.129 │
│ b6550c4b-fb13-460d-b88b-7b967244a039 │ 2021-02-14 09:13:58 │ 2020-02-02 05:16:52 │ 2020-02-01 05:00:00 │              14923 │ 192.168.99.20  │
│ e4102d0d-372f-41c2-9a1f-67ef96de1718 │ 2021-02-14 09:13:58 │ 2020-02-03 21:32:07 │ 2020-02-01 06:00:00 │              29558 │ 192.168.99.227 │
│ df494d46-e020-4d0c-a804-c4c1476a48ba │ 2021-02-14 09:13:58 │ 2020-02-01 13:08:00 │ 2020-02-01 07:00:00 │               3284 │ 192.168.99.72  │
│ 9bcfab97-8e92-4d67-a0fb-ca45827c4cc1 │ 2021-02-14 09:13:58 │ 2020-02-04 04:00:15 │ 2020-02-01 08:00:00 │              15636 │ 192.168.99.3   │
│ a0cdf9ef-1b06-4ab4-9fda-50a464534ec0 │ 2021-02-14 09:13:58 │ 2020-02-02 06:05:24 │ 2020-02-01 09:00:00 │              27845 │ 192.168.99.235 │
│ 1b234b9b-98fb-4436-8ebc-6dd49c1220b0 │ 2021-02-14 09:13:58 │ 2020-02-01 15:00:03 │ 2020-02-01 10:00:00 │              13438 │ 192.168.99.61  │
└──────────────────────────────────────┴─────────────────────┴─────────────────────┴─────────────────────┴────────────────────┴────────────────┘