Skip to content

Clickhouse Output

This output node enables you to insert data into a Clickhouse Database.

It inserts data by executing an SQL query dynamically constructed from the node input parameters.

Requirements

The destination database and table should already exist. Input tuple fields must match the column names and data types of the destination table.
You must also have a user account with the necessary credentials (read and write).

Settings

Essential Settings

Name Type Default Value Description
hosts List<String> - Hosts of the database.
cluster_id String - Clickhouse cluster name from platform properties
username (Required) String - Username to connect to database.
password (Required) String - Password to connect to database.
database (Required) String - Database name.
table (Required) String - Table name.
**
column_names** List<String> - Column names with the strategy to use. See Column names.
bulk_size (Required) Integer - Bulk size limit before flushing data into table.

Error Handling

The clickhouse output reacts differently depending on the error. If the error indicates a connection problem towards one of the clickhouse target server, the error is logged, but the processing continues and there will be later retries.

If the error is related to a schema or table incompatibility on a record, then

  • If _ppf_errors stream is published, the faulty event is output there. You can publish the following fields

    • _ppf_error_message => will contain the Clickhouse exception . e.g: "_ppf_error_message": " java.lang.RuntimeException: Failed to execute SQL || Failed to execute SQL || ClickHouse exception, code: 6, host: localhost, port: 8123; Code: 6, e.displayText() = DB::Exception: Cannot parse string '2021x-09-21T06:59: 19.003545+0100' as DateTime64(3): syntax error at position 19 (parsed just '2021x-09-21T06:59:1') "
    • _ppf_error_document => will contain the SQL insert query that was rejected for this single record (even if you requested a batched insertion)
    • ANY other field that was in the subscribed stream (such as your event or log field)
  • Otherwise, the output node output triggers a fatal exit.

If you set several server addresses, the clickhouse output node will load-balance the traffic towards all of them. Failed servers are automatically removed from the set of chosen servers. A periodic reconnection will take place to eventually recover the failed server.

Advanced Settings

Name Type Default Value Description
pool_name String my_hiraki_pool HirakiCP pool name.
pool_size Integer 10 HirakiCP connection pool size.
query_timeout_sec Integer 30 sql query timeout
batch_expiration_timeout String 10s Period of inactivity before flushing.
reconnection_interval String 10s Period between retrying to connect to clickhouse target server.
source_tuple_field String <null> If provided, then columns values will not be read from fields in the storm tuple, but within a single Map tuple field. If you use source_tuple_field setting, and use a punchlet node as source, you need to use "field_publishing_strategy": "tuple" setting in this node, to avoid sending the tuple as json, because the JDBC nodes like this one do not decode the "json" encoding of a tuple field.

Column Names

You can provide the column names, and optionally a strategy using the following format: <name>:<strategy>. If you do not provide column names, then the column names will be read from the first incoming tuple (with 'cast' strategy).

There are two strategies for column values processing :

  • cast: the default strategy. Handles raw values. Data will be sent to clickhouse as a string. Clickhouse will automatically cast the value to the correct type. With this strategy, data must be correctly formatted.
  • function: Handles SQL functions. Data will be sent to clickhouse as such. Clickhouse will try to use the provided SQL function. With this strategy, data should use a sql function, or an error may be raised.

Warning

BE CAREFUL if you combine this setting with not-provided column_names setting, as the first tuple will be used to guess columns for ALL tuples, so if your first tuple lacks some column keys, these columns will NEVER be output...

Bulk Settings

The ClickhouseOutput uses a bulk strategy. The input data is written to the destination table if :

  • Bulk size is reached : i.e. enough tuples have been received.
  • batch Expiration timeout is reached : if no now tuples have been received during this period, the query will be sent, and the bulk will start over. This allows writing data even if the bulk is not complete.

If none of these conditions are reached, the data is kept in memory and not written to the database. A good practice is to set a high bulk size to improve performance, and to set the expiration timeout according to the throughput you expect to have.

Example

This example is provided as part of the standalone clickhouse getting started guide. We will use a database named default, hosted at localhost:8123. Username is default and password is empty.

First we need to create the table : here is an example to be used with the standalone flight demo channel. Refer to the clickhouse documentation for details.

create table flights (
  uniquecarrier_id UUID, \
  event_timestamp DateTime, \
  arrival_timestamp DateTime, \
  departure_timestamp DateTime, \
  flight_consumption UInt32, \
  flight_ip IPv4) \ 
  ENGINE = MergeTree \
  PARTITION BY toYYYYMMDD(event_timestamp) \
  ORDER BY event_timestamp \
  ;

For example on the standalone punch use the client as follows, then cut and paste the create table statement :

$PUNCHPLATFORM_CLICKHOUSE_INSTALL_DIR/usr/bin/clickhouse client --port 9100

How it works:

Given the required parameters to connect to the database (user, password, database_name, port, host), the clickhouse output node will try to insert data while dynamically casting the inserted data to the right type base on the destination table metadata.

Each input tuple field is matched to the destination table column using the name correspondence.

Below is a valid configuration example you can use with punchlinectl for testing purpose. Checkout the clickhouse standalone getting started guide.

version: '6.0'
runtime: storm
type: punchline
dag:
- type: syslog_input
  settings:
    listen:
      proto: tcp
      host: 0.0.0.0
      port: 9909
  publish:
  - stream: logs
    fields:
    - log
- component: punchlet
  type: punchlet_node
  settings:
    punchlet_code: '{kv().on([logs][log]).into([logs]);}'
  subscribe:
  - component: syslog_input
    stream: logs
  publish:
  - stream: logs
    fields:
    - event_timestamp
    - uniquecarrier_id
    - departure_timestamp
    - arrival_timestamp
    - flight_consumption
    - flight_ip
- component: output
  type: clickhouse_output
  settings:
    hosts:
    - localhost:8123
    username: default
    password: ''
    database: default
    table: flights
    bulk_size: 10
    column_names:
    - event_timestamp
    - uniquecarrier_id
    - departure_timestamp
    - arrival_timestamp
    - flight_consumption
    - flight_ip
  subscribe:
  - component: punchlet
    stream: logs
settings:
  topology.worker.childopts: -server -Xms1g -Xmx4g

The table content looks like:

┌─────────────────────uniquecarrier_id─┬─────event_timestamp─┬───arrival_timestamp─┬─departure_timestamp─┬─flight_consumption─┬─flight_ip──────┐
│ c68a3b35-3fbf-4e42-9b59-62ff73c4268b │ 2021-02-14 09:13:58 │ 2020-02-03 11:20:53 │ 2020-02-01 01:00:00 │              22569 │ 192.168.99.79  │
│ 4e6ad8e0-307c-4f38-a9c5-a0b0ec4f33aa │ 2021-02-14 09:13:58 │ 2020-02-04 09:32:47 │ 2020-02-01 02:00:00 │               2826 │ 192.168.99.86  │
│ 785873e3-3ac0-4527-8acb-78813cf865a4 │ 2021-02-14 09:13:58 │ 2020-02-02 07:46:39 │ 2020-02-01 03:00:00 │              22382 │ 192.168.99.207 │
│ bf6a5983-b911-4210-aad1-3d52533e1d70 │ 2021-02-14 09:13:58 │ 2020-02-04 21:54:59 │ 2020-02-01 04:00:00 │              10480 │ 192.168.99.129 │
│ b6550c4b-fb13-460d-b88b-7b967244a039 │ 2021-02-14 09:13:58 │ 2020-02-02 05:16:52 │ 2020-02-01 05:00:00 │              14923 │ 192.168.99.20  │
│ e4102d0d-372f-41c2-9a1f-67ef96de1718 │ 2021-02-14 09:13:58 │ 2020-02-03 21:32:07 │ 2020-02-01 06:00:00 │              29558 │ 192.168.99.227 │
│ df494d46-e020-4d0c-a804-c4c1476a48ba │ 2021-02-14 09:13:58 │ 2020-02-01 13:08:00 │ 2020-02-01 07:00:00 │               3284 │ 192.168.99.72  │
│ 9bcfab97-8e92-4d67-a0fb-ca45827c4cc1 │ 2021-02-14 09:13:58 │ 2020-02-04 04:00:15 │ 2020-02-01 08:00:00 │              15636 │ 192.168.99.3   │
│ a0cdf9ef-1b06-4ab4-9fda-50a464534ec0 │ 2021-02-14 09:13:58 │ 2020-02-02 06:05:24 │ 2020-02-01 09:00:00 │              27845 │ 192.168.99.235 │
│ 1b234b9b-98fb-4436-8ebc-6dd49c1220b0 │ 2021-02-14 09:13:58 │ 2020-02-01 15:00:03 │ 2020-02-01 10:00:00 │              13438 │ 192.168.99.61  │
└──────────────────────────────────────┴─────────────────────┴─────────────────────┴─────────────────────┴────────────────────┴────────────────┘