七 ClickHouse分布式

1 在集群的每个节点上安装ck服务
2 <listen_host>::<listen_host>
3 配置zookeeper 正常启动 

集群是副本和分片的基础,它将ClickHouse的服务拓扑由单节点延 伸到多个节点,但它并不像Hadoop生态的某些系统那样,要求所有节点组成一个单一的大集群。ClickHouse的集群配置非常灵活,用户既可以将所有节点组成一个单一集群,也可以按照业务的诉求,把节点划分为多个小的集群。在每个小的集群区域之间,它们的节点、分区和副本数量可以各不相同

 七 ClickHouse分布式

另一种是从功能作用层面区分,使用副本的主要目的是防止数据丢失,增加数据存储的冗余;而使用分片的主要目的是实现数据的水平切分,

经讲过MergerTree的命名规则。如果在*MergeTree的前面增加Replicated的前缀,则能够组合 成一个新的变种引擎,即Replicated-MergeTree复制表!

 七 ClickHouse分布式

 

 

 七 ClickHouse分布式

 

 

 只有使用了ReplicatedMergeTree复制表系列引擎,才能应用副本的能力(后面会介绍另一种副本的实现方式)。或者用一种更为直接的方式理解,即使用ReplicatedMergeTree的数据表就是副本。 ReplicatedMergeTree是MergeTree的派生引擎,它在MergeTree的 基础上加入了分布式协同的能力,

 七 ClickHouse分布式

 

 

 

在MergeTree中,一个数据分区由开始创建到全部完成,会历经两类存储区域。

(1)内存:数据首先会被写入内存缓冲区。

(2)本地磁盘:数据接着会被写入tmp临时目录分区,待全部完成后再将临时目录重命名为正式分区。

ReplicatedMergeTree在上述基础之上增加了ZooKeeper的部分,它会进一步在ZooKeeper内创建一系列的监听节点,并以此实现多个实例之间的通信。在整个通信过程中,ZooKeeper并不会涉及表数据的传输。

 

  • 依赖ZooKeeper:在执行INSERT和ALTER查询的时候,ReplicatedMergeTree需要借助ZooKeeper的分布式协同能力,以实现多个副本之间的同步。但是在查询副本的时候,并不需要使用 ZooKeeper。关于这方面的更多信息,会在稍后详细介绍。

  • 表级别的副本:副本是在表级别定义的,所以每张表的副本配置都可以按照它的实际需求进行个性化定义,包括副本的数量,以及副本在集群内的分布位置等。

  • 多主架构(Multi Master):可以在任意一个副本上执行INSERT和ALTER查询,它们的效果是相同的。这些操作会借助ZooKeeper的协同能力被分发至每个副本以本地形式执行。

  • Block数据块:在执行INSERT命令写入数据时,会依据 max_insert_block_size的大小(默认1048576行)将数据切分成若干个Block数据块。所以Block数据块是数据写入的基本单元,并且具有 写入的原子性和唯一性。

  • 原子性:在数据写入时,一个Block块内的数据要么全部写入成功,要么全部失败。

  • 唯一性:在写入一个Block数据块的时候,会按照当前Block数据块的数据顺序、数据行和数据大小等指标,计算Hash信息摘要并记录在案。在此之后,如果某个待写入的Block数据块与先前已被写入的 Block数据块拥有相同的Hash摘要(Block数据块内数据顺序、数据大小和数据行均相同),则该Block数据块会被忽略。这项设计可以预防由异常原因引起的Block数据块重复写入的问题。

7.0 分片概念

通过引入数据副本,虽然能够有效降低数据的丢失风险(多份存储),并提升查询的性能(分摊查询、读写分离),但是仍然有一个问题没有解决,那就是数据表的容量问题。到目前为止,每个副本自

身,仍然保存了数据表的全量数据。所以在业务量十分庞大的场景中,依靠副本并不能解决单表的性能瓶颈。想要从根本上解决这类问题,需要借助另外一种手段,即进一步将数据水平切分,也就是我们将要介绍的数据分片。ClickHouse中的每个服务节点都可称为一个shard(分片)。从理论上来讲,假设有N(N>=1)张数据表A,分布在N个ClickHouse服务节点,而这些数据表彼此之间没有重复数据,那么就可以说数据表A拥有N个分片。然而在工程实践中,如果只有这些分片表,那么整个 Sharding(分片)方案基本是不可用的。对于一个完整的方案来说,还需要考虑数据在写入时,如何被均匀地写至各个shard,以及数据在查询时,如何路由到每个shard,并组合成结果集。所以,ClickHouse

的数据分片需要结合Distributed表引擎一同使用

七 ClickHouse分布式

 

 

 

 

 

 

Distributed表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作

7.1 配置zookeeper

需要在每台CK的节点上配置ZK的位置

ClickHouse使用一组zookeeper标签定义相关配置,默认情况下,在全局配置config.xml中定义即可。但是各个副本所使用的Zookeeper 配置通常是相同的,为了便于在多个节点之间复制配置文件,更常见的做法是将这一部分配置抽离出来,独立使用一个文件保存。

首先,在服务器的/etc/clickhouse-server/config.d目录下创建一个名为metrika.xml的配置文件:

<?xml version="1.0"?>
<yandex>
 <zookeeper-servers> 
 <node index="1"> 
 <host>doit01</host>
 <port>2181</port>
 </node>
  <node index="2"> 
 <host>doit02</host>
 <port>2181</port>
 </node>
  <node index="3"> 
 <host>doit03</host>
 <port>2181</port>
 </node>
 </zookeeper-servers>
</yandex>

  

接着,在全局配置config.xml中使用<include_from>标签导入刚才定义的配置

9 <include_from>/etc/clickhouse-server/config.d/metrika.xml</include_from>

引用ZK的地址

 402  <zookeeper incl="zookeeper-servers" optional="false" />

incl与metrika.xml配置文件内的节点名称要彼此对应。至此,整个配置过程就完成了。

ClickHouse在它的系统表中,颇为贴心地提供了一张名为zookeeper的代理表。通过这张表,可以使用SQL查询的方式读取远端ZooKeeper内的数据。有一点需要注意,在用于查询的SQL语句中,必须指定path条件,

将配置文件同步到其他集群节点!!

scp -r config.d/  linux02:$PWD
scp -r config.d/  linux03:$PWD
​
scp config.xml   linux02:$PWD 
scp config.xml   linux03:$PWD  

7.2 创建副本表

在创建副本表以前, 首先要启动集群中的zookeeper

首先,由于增加了数据的冗余存储,所以降低了数据丢失的风险;其次,由于副本采用了多主

架构,所以每个副本实例都可以作为数据读、写的入口,这无疑分摊了节点的负载。

在使用单使用副本功能的时候 , 我们对CK集群不需要任何的配置就可以实现数据的多副本存储!只需要在建表的时候指定engine和ZK的位置即可 ;

ENGINE = ReplicatedMergeTree('zk_path', 'replica_name') 
​
-- /clickhouse/tables/{shard}/table_name
​
-- /clickhouse/tables/ 是约定俗成的路径固定前缀,表示存放数据表的根路径。 

  

·{shard}表示分片编号,通常用数值替代,例如01、02、03。一张数据表可以有多个分片,而每个分片都拥有自己的副本。

·table_name表示数据表的名称,为了方便维护,通常与物理表的名字相同(虽然ClickHouse并不强制要求路径中的表名称和物理表名相同);而replica_name的作用是定义在ZooKeeper中创建的副本名称,该名称是区分不同副本实例的唯一标识。一种约定成俗的命名方式是使用所在服务器的域名称。

对于zk_path而言,同一张数据表的同一个分片的不同副本,应该定义相同的路径;而对于replica_name而言,同一张数据表的同一个分片的不同副本,应该定义不同的名称

1) 一个分片 , 多个副本表

-- lixnu01 机器
create table tb_demo1 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo1', 'linux01') 
order by id ;
-- lixnu02 机器
create table tb_demo1 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo1', 'linux02') 
order by id ;
-- lixnu03 机器
create table tb_demo1 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo1', 'linux03') 
order by id ;

  

查看zookeeper中的内容

[zk: localhost:2181(CONNECTED) 0] ls /
[a, zookeeper, clickhouse, DNS, datanode1, server1, hbase]
[zk: localhost:2181(CONNECTED) 1] ls /clickhouse
[tables, task_queue]
[zk: localhost:2181(CONNECTED) 2] ls /clickhouse/tables
[01]
[zk: localhost:2181(CONNECTED) 3] ls /clickhouse/tables/01
[tb_demo1]
[zk: localhost:2181(CONNECTED) 4] ls /clickhouse/tables/01/tb_demo1
[metadata, temp, mutations, log, leader_election, columns, blocks, nonincrement_block_numbers, replicas, quorum, block_numbers]
[zk: localhost:2181(CONNECTED) 5] ls /clickhouse/tables/01/tb_demo1/replicas
[linux02, linux03, linux01]

  

SELECT *
FROM system.zookeeper
WHERE path = '/' ;
在任何一台节点上,插入数据, 在其他节点上都能同步数据

2) 两个分片 , 一个分片有副本一个分片没有副本

-- lixnu01 机器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux01') 
order by id ;
-- lixnu02 机器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux02') 
order by id ;
-- lixnu03 机器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux03') 
order by id ;
​
-------------------
​
-- lixnu01 机器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux01') 
order by id ;
-- lixnu02 机器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/01/tb_demo2', 'linux02') 
order by id ;
-- lixnu03 机器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/02/tb_demo2', 'linux03') 
order by id ;
​
-- lixnu04 机器
create table tb_demo2 (
    id Int8 ,
    name String)engine=ReplicatedMergeTree('/clickhouse/tables/02/tb_demo2', 'linux03') 
order by id ;

7.3 分布式引擎

Distributed表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作,

七 ClickHouse分布式

 

 

一般使用分布式表的目的有两种,

  • 一种是表存储多个副本并且有大量的并发操作,我们可以使用分布式表来分摊请求压力解决并发问题

  • 一种是表特别大有多个切片组成 ,并且每切片数据也可以存储数据副本

  • 本地表:通常以_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片

  • 分布式表:通常以_all为后缀进行命名。分布式表只能使用Distributed表引擎,它与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。

ENGINE = Distributed(cluster, database, table [,sharding_key]) 

 

  • cluster:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,它会使用集群的配置信息来找到相应的host节点。

  • database和table:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。

  • sharding_key:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个host节点的本地表。

 

7.3.1 没有副本

本示例是,使用某个集群 , 创建多分片无副本的表配置了一个集群 cluster1 集群中有三台机器ck1 ck2 ck3,没有副本,如果在这个集群上建表, 表数据会有三个切片 ,没有存储数据副本

<clickhouse_remote_servers>
<cluster1>
<!-- 集群名为cluster1 整个集群中每个表有三个分片,分别在lx01 lx02 lx03上 -->
 <shard>
 <replica>
 <host>linux01</host>
 <port>9000</port>
 </replica>
 </shard>
 <shard>
 <replica>
 <host>linux02</host>
 <port>9000</port>
 </replica>
 </shard>
 <shard>
 <replica>
 <host>linux03</host>
 <port>9000</port>
 </replica>
 </shard>
</cluster1>
 <cluster2>
<!-- 集群名为cluster2 一个切片 三个副本 -->
 <shard>  
 <replica>
 <host>linux01</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>linux02</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>linux03</host>
 <port>9000</port>
 </replica>
 </shard>
</cluster2>
    <!--集群三  多个分片  保留副本 注意一个主机只使用一次 -->
<cluster3>
 <shard>  
 <replica>
 <host>doit01</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>doit02</host>
 <port>9000</port>
 </replica>
 </shard>
  <shard>  
 <replica>
 <host>doit03</host>
 <port>9000</port>
 </replica>
 <replica>
 <host>doit04</host>
 <port>9000</port>
 </replica>
 </shard>
</cluster3>
    
</clickhouse_remote_servers>

  

同步配置文件 到集群中
-- 创建本地表 
create table tb_demo3 on cluster cluster1(
id  Int8 ,
name String 
)engine=MergeTree() 
order by  id ;
-- 创建分布式表 
create table demo3_all on cluster cluster1 engine=Distributed('cluster1','default','tb_demo3',id) as tb_demo3 ;
--向分布式表中插入数据 ,数据会根据插入规则将数据插入到不同的分片中

  

7.3.2 有副本的配置

<!-- 配置集群2 , 集群中的表有两个分片 ,其中分片1 有两个副本 -->
<cluster2>
 <shard>
    <replica>
        <host>linux01</host>
        <port>9000</port>
    </replica>
    <replica>
        <host>linux02</host>
        <port>9000</port>
    </replica>
 </shard>
 <shard>
    <replica>
        <host>linux03</host>
        <port>9000</port>
    </replica>
 </shard>
</cluster2>

  

-- 创建本地表
create table tb_demo4 on cluster cluster2(
id  Int8 ,
name String 
)engine=MergeTree() 
order by  id ;
-- 创建分布式表
create table demo4_all on cluster cluster2 engine=Distributed('cluster2','default','tb_demo4',id) as tb_demo4 ;

第一 : 1) 数据1分片的时候 , 多个副本 可以不使用分布式表 第二: 2)有多个分片的时候使用分布式表 给分片分配数据

多个分片的表
多个副本表 
多分片 多副本
  一个节点在一个集群中只能使用一次

  

 

 

7.4 分布式DDL

ClickHouse支持集群模式,一个集群拥有1到多个节点。CREATE、ALTER、DROP、RENMAE及TRUNCATE这些DDL语句,都支持分布式执行。这意味着,如果在集群中任意一个节点上执行DDL语句,那么集群中的 每个节点都会以相同的顺序执行相同的语句。这项特性意义非凡,它就如同批处理命令一样,省去了需要依次去单个节点执行DDL的烦恼。将一条普通的DDL语句转换成分布式执行十分简单,只需加上ON CLUSTER cluster_name声明即可。例如,执行下面的语句后将会对 ch_cluster集群内的所有节点广播这条DDL语句:

-- 建表 on cluster cluster1
create table tb_demo3 on cluster cluster1(
id  Int8 ,
name String 
)engine=MergeTree() 
order by  id ;
-- 删除集群中所有的本地表或者是分布式表
drop table if exists tb_demo3 on cluster cluster1;
-- 修改集群中的表结构 
alter table t3 on cluster cluster1 add column age Int8 ;
-- 删除字段 
-- 删除分区 

  

7.5 分布式协同原理

副本协同的核心流程主要有INSERT、MERGE、MUTATION和ALTER四种,分别对应了数据写入、分区合并、数据修改和元数据修改。INSERT和ALTER查询是分布式执行的。借助 ZooKeeper的事件通知机制,多个副本之间会自动进行有效协同,但是它们不会使用ZooKeeper存储任何分区数据。而其他查询并不支持分布式执行,包括SELECT、CREATE、DROP、RENAME和ATTACH。例如,为了创建多个副本,我们需要分别登录每个ClickHouse节点。接下来,会依次介绍上述流程的工作机理。为了便于理解,我先来整体认识一下各个流程的介绍方法。

7.5.1 insert原理

 七 ClickHouse分布式

 

 

7.5.2 Merge原理

无论MERGE操作从哪个副本发起,其合并计划都会交由主副本来制定,和insert一样

 七 ClickHouse分布式

 

 

7.5.3 mutation原理

alter table x update name=zss where

alter table x delete where

当对ReplicatedMergeTree执行ALTER DELETE或者ALTER UPDATE操作的时候,即会进入MUTATION部分的逻辑,它的核心流程如图

七 ClickHouse分布式

 

 

7.5.4 alter原理

当对ReplicatedMergeTree执行ALTER操作进行元数据修改的时候,即会进入ALTER部

分的逻辑,例如增加、删除表字段等。

上一篇:elasticsearch-hadoop使用示例


下一篇:ClickHouse-1(简介)