clickhouse分布式表实践和原理

  • 分布式表

    一个逻辑上的表, 可以理解为数据库中的视图, 一般查询都查询分布式表. 分布式表引擎会将我们的查询请求路由本地表进行查询, 然后进行汇总最终返回给用户.

  • 本地表:

    实际存储数据的表

1 多切片单副本分布式表 

  • conf.xml中的开启远程连接
  • 配置/etc/metrika.xml文件
    <!-- Listen specified host. use :: (wildcard IPv6 address), if you want to accept connections both with IPv4 and IPv6 from everywhere. -->
    <!-- <listen_host>::</listen_host> -->
    <!-- Same for hosts with disabled ipv6: -->
    <!-- <listen_host>0.0.0.0</listen_host> -->
    <listen_host>::</listen_host>

vi /etc/metrika.xml   注意修改每台机器上的

<macros>
    <replica>ck1</replica>
</macros>
<networks>

<yandex>
<clickhouse_remote_servers>
    <cluster1>
        <shard>
             <internal_replication>true</internal_replication>
            <replica>
                <host>ck1</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <replica>
                <internal_replication>true</internal_replication>
                <host>ck2</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <internal_replication>true</internal_replication>
            <replica>
                <host>ck3</host>
                <port>9000</port>
            </replica>
        </shard>
    </cluster1>
</clickhouse_remote_servers>


<zookeeper-servers>
  <node index="1">
    <host>ck1</host>
    <port>2181</port>
  </node>

  <node index="2">
    <host>ck2</host>
    <port>2181</port>
  </node>
  <node index="3">
    <host>ck3</host>
    <port>2181</port>
  </node>
</zookeeper-servers>

<macros>
    <replica>ck1</replica>
</macros>
<networks>
   <ip>::/0</ip>
</networks>

<clickhouse_compression>
<case>
  <min_part_size>10000000000</min_part_size>                                        
  <min_part_size_ratio>0.01</min_part_size_ratio>                                                                                                                                       
  <method>lz4</method>
</case>
</clickhouse_compression>
</yandex>

在集群中的每台机器上创建一样的普通的表

create table t1(id Int8 , name String , age UInt8 ,gender String) engine=MergeTree order by id ;

创建分布式表
create table d_t1(id Int8 , name String , age UInt8 ,gender String) ENGINE=Distributed(cluster1, doit1, t1, id);

2 多切片多副本分布式表

vi /etc/metrika.xml   注意修改每台机器上的

1) 每台机器配置参数如下 
<yandex>
<clickhouse_remote_servers>
    <mycluster>
        <shard>
            <weight>1</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <host>ck1</host>
                <port>9000</port>
                <default_database>mycluster_shard_1</default_database>
            </replica>
            <replica>
                <host>ck2</host>
                <port>9000</port>
                <default_database>mycluster_shard_1</default_database>
            </replica>
        </shard>
		
        <shard>
            <weight>1</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <host>ck2</host>
                <port>9000</port>
                <default_database>mycluster_shard_2</default_database>
            </replica>
            <replica>
                <host>ck3</host>
                <port>9000</port>
                <default_database>mycluster_shard_2</default_database>
            </replica>
        </shard>
        <shard>
            <weight>1</weight>
            <internal_replication>true</internal_replication>
            <replica>
                <host>ck3</host>
                <port>9000</port>
                <default_database>mycluster_shard_3</default_database>
            </replica>
            <replica>
                <host>ck1</host>
                <port>9000</port>
                <default_database>mycluster_shard_3</default_database>
            </replica>
        </shard>
    </mycluster>
</clickhouse_remote_servers>
<zookeeper-servers>
        <node index="1">
            <host>ck1</host>
            <port>2181</port>
        </node>
        <node index="2">
            <host>ck2</host>
            <port>2181</port>
        </node>
        <node index="3">
            <host>ck3</host>
            <port>2181</port>
        </node>
</zookeeper-servers>
</yandex>

 初始化3节点3分片2副本的集群

在ck1服务器上通过客户端执行
clickhouse-client -h ck001 -u huxl --password xxxadmin
create database mycluster_shard_1;
create database mycluster_shard_2;
create database mycluster_shard_3;
在ck2服务器上通过客户端执行
clickhouse-client -h ck002 -u huxl --password xxxadmin
create database mycluster_shard_1;
create database mycluster_shard_2;
create database mycluster_shard_3;
在ck3服务器上通过客户端执行
clickhouse-client -h ck003 -u huxl --password xxxadmin
create database mycluster_shard_1;
create database mycluster_shard_2;
create database mycluster_shard_3;

初始化存储数据副本的表

//在ck1上执行
create table mycluster_shard_1.tbl_rep( 
  c1 Int32, 
  mydate Date 
) engine = ReplicatedMergeTree('/clickhouse/tables/1/dtbl', '1', mydate, (mydate), 8192);

//在ck2上执行
create table mycluster_shard_1.tbl_rep ( 
  c1 Int32,
  mydate Date
) engine = ReplicatedMergeTree('/clickhouse/tables/1/dtbl', '2', mydate, (mydate), 8192);

//在ck2上执行
create table mycluster_shard_2.tbl_rep (
  c1 Int32,
  mydate Date
) engine = ReplicatedMergeTree('/clickhouse/tables/2/dtbl', '1', mydate, (mydate), 8192);

//在ck3上执行
create table mycluster_shard_2.tbl_rep (
  c1 Int32,
  mydate Date
) engine = ReplicatedMergeTree('/clickhouse/tables/2/dtbl', '2', mydate, (mydate), 8192);

//在ck3上执行
create table mycluster_shard_3.tbl_rep (
  c1 Int32,
  mydate Date
) engine = ReplicatedMergeTree('/clickhouse/tables/3/dtbl', '1', mydate, (mydate), 8192);

//在ck1上执行
create table mycluster_shard_3.tbl_rep (
  c1 Int32,
  mydate Date
) engine = ReplicatedMergeTree('/clickhouse/tables/3/dtbl', '2', mydate, (mydate), 8192);

创建分布式表

4)创建分布式表 
//在ck1上执行
use default;
create table dtbl (
  c1 Int32,
  mydate Date
) engine=Distributed('mycluster', '', tbl_rep, rand());

插入数据测试

insert into table dtbl values(2, '2019-04-19')
select * from dtbl;


发现数据存储在第三片上 , 所以数据在ck1  和ck3上的mycluster_shard_3数据库表中存储数据

3 总结

3.1 不写分布式表

  1. 分布式表接收到数据后会将数据拆分成多个parts, 并转发数据到其它服务器, 会引起服务器间网络流量增加、服务器merge的工作量增加, 导致写入速度变慢, 并且增加了Too many parts的可能性.
  2. 数据的一致性问题, 先在分布式表所在的机器进行落盘, 然后异步的发送到本地表所在机器进行存储,中间没有一致性的校验, 而且在分布式表所在机器时如果机器出现down机, 会存在数据丢失风险.
  3. 数据写入默认是异步的,短时间内可能造成不一致.
  4. 对zookeeper的压力比较大(待验证). 没经过正式测试, 只是看到了有人提出.

 

3.2 Replication & Sharding

ClickHouse依靠ReplicatedMergeTree引擎族与ZooKeeper实现了复制表机制, 成为其高可用的基础.

ClickHouse像ElasticSearch一样具有数据分片(shard)的概念, 这也是分布式存储的特点之一, 即通过并行读写提高效率. ClickHouse依靠Distributed引擎实现了分布式表机制, 在所有分片(本地表)上建立视图进行分布式查询.

在实际操作中,为了最大化性能与稳定性,分片和副本几乎总是一同使用。

clickhouse分布式表实践和原理

3.3 Replicated Table & ReplicatedMergeTree Engines

不同于HDFS的副本机制(基于集群实现), Clickhouse的副本机制是基于表实现的. 用户在创建每张表的时候, 可以决定该表是否高可用.

CREATE TABLE IF NOT EXISTS {local_table} ({columns}) 
ENGINE = ReplicatedMergeTree('/clickhouse/tables/#_tenant_id_#/#__appname__#/#_at_date_#/{shard}/hits', '{replica}')
partition by toString(_at_date_) sample by intHash64(toInt64(toDateTime(_at_timestamp_)))
order by (_at_date_, _at_timestamp_, intHash64(toInt64(toDateTime(_at_timestamp_))))

ClickHouse的副本机制之所以叫“复制表”,是因为它工作在表级别,而不是集群级别(如HDFS)。也就是说,用户在创建表时可以通过指定引擎选择该表是否高可用,每张表的分片与副本都是互相独立的。

目前支持复制表的引擎是ReplicatedMergeTree引擎族,它与平时最常用的MergeTree引擎族是正交的,如下图所示。

clickhouse分布式表实践和原理

CREATE TABLE IF NOT EXISTS test.events_local ON CLUSTER '{cluster}' (
  ts_date Date,
  ts_date_time DateTime,
  user_id Int64,
  event_type String,
  site_id Int64,
  groupon_id Int64,
  category_id Int64,
  merchandise_id Int64,
  search_text String
  -- A lot more columns...
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test/events_local','{replica}')
PARTITION BY ts_date
ORDER BY (ts_date,toStartOfHour(ts_date_time),site_id,event_type)
SETTINGS index_granularity = 8192;

其中,ON CLUSTER语法表示分布式DDL,即执行一次就可在集群所有实例上创建同样的本地表。集群标识符{cluster}、分片标识符{shard}和副本标识符{replica}来自之前提到过的复制表宏配置,即config.xml中<macros>一节的内容,配合ON CLUSTER语法一同使用,可以避免建表时在每个实例上反复修改这些值。

ReplicatedMergeTree引擎族接收两个参数:

  • ZK中该表相关数据的存储路径,ClickHouse官方建议规范化,如上面的格式/clickhouse/tables/{shard}/[database_name]/[table_name]
  • 副本名称,一般用{replica}即可。

观察一下上述ZK路径下的znode结构与内容。

[zk: localhost:2181(CONNECTED) 0] ls /clickhouse/tables/01/test/events_local
[metadata, temp, mutations, log, leader_election, columns, blocks, nonincrement_block_numbers, replicas, quorum, block_numbers]

[zk: localhost:2181(CONNECTED) 1] get /clickhouse/tables/04/test/events_local/columns
columns format version: 1
9 columns:
`ts_date` Date
`ts_date_time` DateTime
`user_id` Int64
`event_type` String
`site_id` Int64
`groupon_id` Int64
`category_id` Int64
`merchandise_id` Int64
`search_text` String
# ...................

[zk: localhost:2181(CONNECTED) 2] get /clickhouse/tables/07/test/events_local/metadata
metadata format version: 1
date column: 
sampling expression: 
index granularity: 8192
mode: 0
sign column: 
primary key: ts_date, toStartOfHour(ts_date_time), site_id, event_type
data format version: 1
partition key: ts_date
granularity bytes: 10485760
# ...................

ReplicatedMergeTree引擎族在ZK中存储大量数据,包括且不限于表结构信息、元数据、操作日志、副本状态、数据块校验值、数据part merge过程中的选主信息等等。可见,ZK在复制表机制下扮演了元数据存储、日志框架、分布式协调服务三重角色,任务很重,所以需要额外保证ZK集群的可用性以及资源(尤其是硬盘资源)。

下图大致示出复制表执行插入操作时的流程(internal_replication配置项为true)。即先写入一个副本,再通过config.xml中配置的interserver HTTP port端口(默认是9009)将数据复制到其他实例上去,同时更新ZK集群上记录的信息。

clickhouse分布式表实践和原理

3.4 Distributed Table & Distributed Engine

ClickHouse分布式表的本质并不是一张表,而是一些本地物理表(分片)的分布式视图,本身并不存储数据。

支持分布式表的引擎是Distributed,建表DDL语句示例如下,_all只是分布式表名比较通用的后缀而已。

CREATE TABLE IF NOT EXISTS test.events_all ON CLUSTER sht_ck_cluster_1
AS test.events_local
ENGINE = Distributed(sht_ck_cluster_1,test,events_local,rand());

Distributed引擎需要以下几个参数:

  • 集群标识符
    注意不是复制表宏中的标识符,而是<remote_servers>中指定的那个。
  • 本地表所在的数据库名称
  • 本地表名称
  • (可选的)分片键(sharding key)
    该键与config.xml中配置的分片权重(weight)一同决定写入分布式表时的路由,即数据最终落到哪个物理表上。它可以是表中一列的原始数据(如site_id),也可以是函数调用的结果,如上面的SQL语句采用了随机值rand()。注意该键要尽量保证数据均匀分布,另外一个常用的操作是采用区分度较高的列的哈希值,如intHash64(user_id)

在分布式表上执行查询的流程简图如下所示。发出查询后,各个实例之间会交换自己持有的分片的表数据,最终汇总到同一个实例上返回给用户。

clickhouse分布式表实践和原理

而在写入时,我们有两种选择:一是写分布式表,二是写underlying的本地表。孰优孰劣呢?

直接写分布式表的优点自然是可以让ClickHouse控制数据到分片的路由,缺点就多一些:

  • 数据是先写到一个分布式表的实例中并缓存起来,再逐渐分发到各个分片上去,实际是双写了数据(写入放大),浪费资源;
  • 数据写入默认是异步的,短时间内可能造成不一致;
  • 目标表中会产生较多的小parts,使merge(即compaction)过程压力增大。

相对而言,直接写本地表是同步操作,更快,parts的大小也比较合适,但是就要求应用层额外实现sharding和路由逻辑,如轮询或者随机等。

clickhouse分布式表实践和原理

应用层路由并不是什么难事,所以如果条件允许,在生产环境中总是推荐写本地表、读分布式表。举个例子,在笔者最近引入的Flink-ClickHouse Sink连接器中,就采用了随机路由,部分代码如下。

    private Request buildRequest(ClickhouseRequestBlank requestBlank) {
        String resultCSV = String.join(" , ", requestBlank.getValues());
        String query = String.format("INSERT INTO %s VALUES %s", requestBlank.getTargetTable(), resultCSV);
        String host = sinkSettings.getClickhouseClusterSettings().getRandomHostUrl();

        BoundRequestBuilder builder = asyncHttpClient
                .preparePost(host)
                .setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=utf-8")
                .setBody(query);

        if (sinkSettings.getClickhouseClusterSettings().isAuthorizationRequired()) {
            builder.setHeader(HttpHeaders.Names.AUTHORIZATION, "Basic " + sinkSettings.getClickhouseClusterSettings().getCredentials());
        }

        return builder.build();
    }

    public String getRandomHostUrl() {
        currentHostId = ThreadLocalRandom.current().nextInt(hostsWithPorts.size());
        return hostsWithPorts.get(currentHostId);
    }

 

3.5 数据同步

  1. 写入到一个节点
  2. 通过interserver HTTP port端口同步到其他实例上
  3. 更新zookeeper集群记录的信息

ck的replicatedMergeTree引擎方案有太多的信息存储在zk上, 当数据量增大, ck节点数增多, 会导致服务非常不稳定, 目前我们的ck集群规模还小, 这个问题还不严重, 但依旧会出现很多和zk有关的问题(详见遇到的问题).

实际上 ClickHouse 把 ZK 当成了三种服务的结合, 而不仅把它当作一个 Coordinate service(协调服务), 可能这也是大家使用 ZK 的常用用法。ClickHouse 还会把它当作 Log Service(日志服务),很多行为日志等数字的信息也会存在 ZK 上;还会作为表的 catalog service(元数据存储),像表的一些 schema 信息也会在 ZK 上做校验,这就会导致 ZK 上接入的数量与数据总量会成线性关系。

目前针对这个问题, clickhouse社区提出了一个mini checksum方案, 但是这并没有彻底解决 znode 与数据量成线性关系的问题. 目前看到比较好的方案是字节的:

我们就基于 MergeTree 存储引擎开发了一套自己的高可用方案。我们的想法很简单,就是把更多 ZK 上的信息卸载下来,ZK 只作为 coordinate Service。只让它做三件简单的事情:行为日志的 Sequence Number 分配、Block ID 的分配和数据的元信息,这样就能保证数据和行为在全局内是唯一的。

关于节点,它维护自身的数据信息和行为日志信息,Log 和数据的信息在一个 shard 内部的副本之间,通过 Gossip 协议进行交互。我们保留了原生的 multi-master 写入特性,这样多个副本都是可以写的,好处就是能够简化数据导入。图 6 是一个简单的框架图。

以这个图为例,如果往 Replica 1 上写,它会从 ZK 上获得一个 ID,就是 Log ID,然后把这些行为和 Log Push 到集群内部 shard 内部活着的副本上去,然后当其他副本收到这些信息之后,它会主动去 Pull 数据,实现数据的最终一致性。我们现在所有集群加起来 znode 数不超过三百万,服务的高可用基本上得到了保障,压力也不会随着数据增加而增加。

clickhouse分布式表实践和原理

上一篇:自定义实现一个loghub(或kafka)的动态分片消费者负载均衡?


下一篇:ElasticSearch 调优