Skip to content

Clickhouse Output

This output enables you to insert data into a Clickhouse Database.

It will insert data by executing an SQL query that will be dynamically constructed based on the parameters you provided.

Requirements

The destination database and table should already exist.
Input tuple fields should match column names and data types of destination table.
You must have a user account with the necessary privileges (read and write).

Settings

Main Settings

Name Type Default Value Description
host (Required) String - Host of the database.
port (Required) Integer - Port of the database.
username (Required) String - Username to connect to database.
password (Required) String - Password to connect to database.
database (Required) String - Database name.
table (Required) String - Table name.
column_names (Required) List<String> - Column names with the strategy to use. See Column names.
bulk_size (Required) Integer - Bulk size limit before flushing data into table.
expiration_timeout String 10s Period of inactivity before flushing.

Advanced Settings

Name Type Default Value Description
pool_name String my_hiraki_pool HirakiCP pool name.
pool_size Integer 10 HirakiCP pool size.

Column Names

Column names must be formatted as such : <name>:<strategy>

There are two strategies for column values processing :

  • cast: Main strategy. Handles raw values. Data will be sent to clickhouse as a string. Clickhouse will automatically cast the value to the correct type. With this strategy, data must be correctly formatted.
  • function: Handles SQL functions. Data will be sent to clickhouse as such. Clickhouse will try to use the provided SQL function. With this strategy, data should use a sql function, or an error may be raised.

Flush reasons

The ClickhouseOutput uses a bulk strategy. Data will be written to table if :

  • Bulk size is reached : If enough tuples have been received, the query will be sent.
  • Expiration timeout is reached : If no tuples have been received during this period, the query will be sent, and the bulk will start over. This allows writing data even if the bulk is not complete.

If none of these conditions are reached, data will be kept in memory and not written to the database. A good practice is to set a high bulk size to improve performance, and to set the expiration timeout according to the throughput you expect to have.

Example

We'll use a database named default, hosted at localhost:8123. Username is default and password is empty.

First we need to create the table :

create table flights(arr_timestamp timestamp, dep_timestamp timestamp, uniquecarrier varchar) ENGINE=Log;

How it works:

Given the required parameters to connect to the database (user, password, database_name, port, host), this node will try to insert data while dynamically casting the inserted data to the right type base on the destination table metadata.

The ClickHouse output will try to match each tuple field name to the destination table column name. It will try to cast the data passed on each field to match your table schema dynamically.

Below is a valid configuration example you can use with punchlinectl for testing purpose

dag:
[
  {
    component: generator
    type: generator_input
    settings:
    {
      messages:
      [
        {
          logs:
          {
            dep_timestamp: 2017-04-23 06:30:00
            uniquecarrier: test
            arr_timestamp: 2017-04-23 06:30:00
          }
        }
        {
          logs:
          {
            uniquecarrier: test
            arr_timestamp: 2017-04-23 06:30:00
            dep_timestamp: 2017-04-23 06:30:00
          }
        }
        {
          logs:
          {
            arr_timestamp: 2017-04-23 06:30:00
            dep_timestamp: 2017-04-23 06:30:00
            uniquecarrier: test
          }
        }
      ]
    }
  }
  {
    component: clickhouse
    type: clickhouse_output
    settings:
    {
      host: localhost
      port: 8123
      username: default
      password: ""
      database: default
      table: flights
      bulk_size: 3
      column_names:
      [
        arr_timestamp:cast
        dep_timestamp:cast
        uniquecarrier:cast
      ]
    }
    subscribe:
    [
      {
        component: generator,
        stream: logs,
        fields:
        [
          arr_timestamp,
          dep_timestamp,
          uniquecarrier
        ]
      }
    ]
  }
]

The corresponding query would be :

INSERT INTO flights (arr_timestamp,dep_timestamp,uniquecarrier) VALUES ('2017-04-23 06:30:00','2017-04-23 06:30:00','test'),('2017 ...
The table content would now be :

| arr_timestamp       | dep_timestamp       | uniquecarrier |
| ---                 | ---                 | ---           |
| 2017-04-23 06:30:00 | 2017-04-23 06:30:00 | test          |
| 2017-04-23 06:30:00 | 2017-04-23 06:30:00 | test          |
| 2017-04-23 06:30:00 | 2017-04-23 06:30:00 | test          |