Clickhouse-copier数据复制-迁移案例
背景
因新环境上线,需要从旧环境中迁移数据至新环境。
方案调研
方案一:拷贝数据目录
先观察一下 ClickHouse 在文件系统上的目录结构(配置文件 /ect/clickhouse-server/config.xml 里面配置的
- data 目录里保存的是数据,每个数据库一个目录,内部每个表一个子目录。
- metadata 目录里保存的是元数据,即数据库和表结构。其中
- database.sql 是 创建数据库的 DDL(ATTACH DATABASE default ENGINE = Ordinary)
- database/table.sql 是建表的 DDL (ATTACH TABLE …).
这里的 DDL 使用的是 ATTACH 语句,基于这个信息,直接把 data 和 metadata 目录(要排除 system)复制到新集群,即可实现数据迁移。用一个小表做测试,验证可行。
操作流程
- 在源集群的硬盘上打包好对应数据库或表的 data 和 metadata 数据
- 拷贝到目标集群对应的目录
- 重启 clickhouse-server
方案二:使用 remote 表函数
ClickHouse 除了查询常规的表,还能使用表函数来构建一些特殊的「表」,其中 remote 函数 可用于查询另一个 ClickHouse 的表。
使用方式很简单:
- SELECT * FROM remote(‘addresses_expr’, db, table, ‘user’, ‘password’) LIMIT 10;
因此,可以借助这个功能实现数据迁移: - INSERT INTO <local_database>.<local_table> SELECT * FROM remote(‘remote_clickhouse_addr’, <remote_database>, <remote_table>, ‘<remote_user>’, ‘<remote_password>’)
操作流程
- 在源集群的 system.tables 表查询出数据库、表、DDL、分区、表引擎等信息
- 在目标集群上,运行 DDL 创建表,然后运行上述迁移语句复制数据
- 遍历所有表,执行 2
方案三:Clickhouse-copier(已采用)
clickhouse-copier
是官方的数据迁移工具,用于多个集群之间的数据迁移。
官网文档配置
Clickhouse-copier的工作流介绍
读取的配置文件有两个,一个是zk的zookeeper.xml配置,一个是保存在zk中迁移任务的task.xml配置。
- 连接到zk并且接收:
- 复制作业。
- 复制作业的状态。
- 读取写入到到zk的task.xml执行工作。
- clickhouse-copier 跟踪zk中 /task/path/description 并应用它们。 注:description中的内容为task.xml的配置
- 根据每个正在运行的进程都会选择源集群的“最接近的”分片,然后将数据复制到目标集群,并在必要时重新分片数据。
配置
zookeeper.xml
在装有clickhouse-copier服务器自定义路径中配置zookeeper.xml
该配置文件由日志级别信息和源端集群zk的配置信息两部分组成。 此配置文件用于clickhouse-copier启动。
<yandex>
<logger>
<level>trace</level>
<size>100M</size>
<count>3</count>
</logger>
<zookeeper>
<node index="1">
<host>zkip1</host>
<port>12181</port>
</node>
<node index="2">
<host>zkip2</host>
<port>12181</port>
</node>
<node index="3">
<host>zkip3</host>
<port>12181</port>
</node>
</zookeeper>
</yandex>
task.xml
该配置文件是用于clickhouse-copier运行前写入zk节点上的。
<yandex>
<remote_servers>
<!-- 源集群配置信息 -->
<src_manyun_ch_cluster>
<!-- 分片1 -->
<shard>
<internal_replication>true</internal_replication>
<!-- 副本1 -->
<replica>
<host>src_ck_ip1</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
<!-- 副本2 -->
<replica>
<host>src_ck_ip2</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
</shard>
<!-- 分片2 -->
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>src_ck_ip3</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
<replica>
<host>src_ck_ip4</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
</shard>
</src_manyun_ch_cluster>
<!-- 目标集群配置信息 -->
<des_manyun_ch_cluster>
<!-- 分片1 -->
<shard>
<internal_replication>true</internal_replication>
<!-- 副本1 -->
<replica>
<host>des_ck_ip1</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
<!-- 副本2 -->
<replica>
<host>des_ck_ip2</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
</shard>
<!-- 分片2 -->
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>des_ck_ip3</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
<replica>
<host>des_ck_ip4</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
</shard>
<!-- 分片3 -->
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>des_ck_ip5</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
<replica>
<host>des_ck_ip6</host>
<port>9000</port>
<user>root</user>
<password>pass</password>
</replica>
</shard>
</des_manyun_ch_cluster>
</remote_servers>
<!-- copier最大进程数 -->
<max_workers>8</max_workers>
<!-- 同步表信息 -->
<tables>
<!-- table_test_job为自定义分类标签,仅用来区分不同的表同步任务 -->
<table_test_job>
<!-- pull信息,源表位置 -->
<cluster_pull>src_manyun_ch_cluster</cluster_pull>
<database_pull>src库名</database_pull>
<table_pull>src表名</table_pull>
<!-- push信息,目标表位置 -->
<cluster_push>des_manyun_ch_cluster</cluster_push>
<database_push>des库名</database_push>
<table_push>des表名</table_push>
<!-- 目的表的engine信息,目标集群没有表的情况下,会根据此engine配置创建表 -->
<engine> ENGINE=ReplicatedMergeTree('/clickhouse/tables/{shard}/库名/表名', '{replica}')
PARTITION BY toYYYYMM(time)
ORDER BY (device_guid, point_code, time, device_type)
SETTINGS index_granularity = 8192
</engine>
<!-- 用于向目标集群插入数据的分片键,插入到目标的指定分片中 -->
<sharding_key>02</sharding_key>
<!-- 查询源数据时可以添加的过滤条件 -->
<!--
<where_condition> CounterID != 0 </where_condition>
-->
<!-- 指定同步的具体分区,若无此参数,则默认同步全部分区,partition值为system.part对应表的partition column -->
<!--
<enabled_partitions></enabled_partitions>
-->
<!--指定202111分区-->
<enabled_partitions>
<partition>'202111'</partition>
</enabled_partitions>
</table_test_job>
</tables>
</yandex>
将task.xml写入zk
在对应的zk
集群节点,执行
./zkCli.sh -server localhost:12181 create /clickhouse/copier_task
./zkCli.sh -server localhost:12181 create /clickhouse/copier_task/task
# 创建任务信息
./zkCli.sh -server localhost:12181 create /clickhouse/copier_task/task/description "`cat task.xml`"
# 查看任务信息
./zkCli.sh -server localhost:12181 get /clickhouse/copier_task/task/description
# 更新任务信息
./zkCli.sh -server localhost:12181 set /clickhouse/copier_task/task/description "`cat task.xml`"
执行过程
在装有clickhouse-copier服务器自定义路径中运行
有疑问的可以clickhouse-copier --help
以下命令会根据读取zk集群配置/clickhouse/copytasks/test/description
所配置的task.xml
,并将日志保存在当前目录下的logs中
启动任务
后台运行
clickhouse-copier --config zookeeper.xml --task-path /clickhouse/copier_task/task --base-dir ./logs &
日志分析
- 首先任务先要生成read_shard_n的文件路径,用于读取每个shard上的指定表的数据,任务日志如下:
2021.12.23 19:47:44.967318 [ 2676812 ] {} <Debug> ClusterCopier: Will copy 2 partitions from shard N2 (having a replica 172.16.0.122:9000, pull table ck_default.point_data_local of cluster src_manyun_ch_cluster
2021.12.23 19:50:24.851663 [ 2676811 ] {} <Debug> ClusterCopier: There are 2 destination partitions in shard N1 (having a replica 172.16.0.18:9000, pull table ck_default.point_data_local of cluster src_manyun_ch_cluster
- 然后是按照表的partition_key和orderby的所有字段的cityHash64取余10的结果作为条件将数据进行拆分,任务日志如下:
2021.12.23 19:50:24.855281 [ 2676811 ] {} <Debug> ClusterCopier: Checking shard N1 (having a replica 172.16.0.18:9000, pull table ck_default.point_data_local of cluster src_manyun_ch_cluster for partition 202110 piece 0 existence, executing query: SELECT 1 FROM _local.`.read_shard_0.des_manyun_ch_cluster.ck_default.point_data_test_local` WHERE (toYYYYMM(time) = (202110 AS partition_key)) AND (cityHash64(`device_guid`, `point_code`, `time`, `device_type`) % 10 = 0 ) LIMIT 1
注意:这里的split都是从%2Esplit%2Edes_manyun_ch_cluster%2Eck_default%2Epoint_data_test_local_piece中进行split的,并且是异步执行的,数据是流式的,没有数据直接落盘。
3. 对应这10个piece,在目的端会有10个表与之对应,表名为test_copy_piece_n(n为0,1,2,…,9),任务日志如下:
2021.12.23 19:50:43.641097 [ 2676807 ] {} <Debug> StorageDistributed (.read_shard_0.des_manyun_ch_cluster.ck_default.point_data_test_local): Auto-increment is 0
2021.12.23 19:50:43.641393 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local): Auto-increment is 0
2021.12.23 19:50:43.641728 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_0): Auto-increment is 0
2021.12.23 19:50:43.641986 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_1): Auto-increment is 0
2021.12.23 19:50:43.642230 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_2): Auto-increment is 0
2021.12.23 19:50:43.642467 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_3): Auto-increment is 0
2021.12.23 19:50:43.642702 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_4): Auto-increment is 0
2021.12.23 19:50:43.642936 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_5): Auto-increment is 0
2021.12.23 19:50:43.643168 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_6): Auto-increment is 0
2021.12.23 19:50:43.643398 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_7): Auto-increment is 0
2021.12.23 19:50:43.643619 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_8): Auto-increment is 0
2021.12.23 19:50:43.643857 [ 2676807 ] {} <Debug> StorageDistributed (.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_9): Auto-increment is 0
2021.12.23 19:50:43.648644 [ 2676807 ] {} <Debug> ClusterCopier: Processing /clickhouse/copier_task/task/tables/des_manyun_ch_cluster.ck_default.point_data_test_local/202112/piece_0/shards/1
并且可以通过下面日志可以看出,clickhouse_copier
也是用的insert into … select的方式进行数据迁移的。
2021.12.23 19:50:45.481535 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_0` VALUES
2021.12.23 20:01:08.933373 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_1` VALUES
2021.12.23 20:11:34.144447 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_2` VALUES
2021.12.23 20:22:05.581291 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_3` VALUES
2021.12.23 20:32:42.431329 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_4` VALUES
2021.12.23 20:43:09.202856 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_5` VALUES
2021.12.23 20:53:40.903325 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_6` VALUES
2021.12.23 21:04:11.449778 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_7` VALUES
2021.12.23 21:14:42.446846 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_8` VALUES
2021.12.23 21:25:12.636855 [ 2676807 ] {} <Debug> ClusterCopier: Executing INSERT query: INSERT INTO _local.`.split.des_manyun_ch_cluster.ck_default.point_data_test_local_piece_9` VALUES
- 在任务完成后,会将test_copy_piece_n表中的数据attach到test_copy中,任务日志如下:
2021.12.23 21:35:42.672750 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 0 to original table
2021.12.23 21:35:54.482164 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:35:54.482228 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 1 to original table
2021.12.23 21:36:06.144835 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:36:06.144906 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 2 to original table
2021.12.23 21:36:18.104823 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:36:18.104898 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 3 to original table
2021.12.23 21:36:30.188603 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:36:30.188673 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 4 to original table
2021.12.23 21:36:42.229121 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:36:42.229197 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 5 to original table
2021.12.23 21:36:55.973762 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:36:55.973846 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 6 to original table
2021.12.23 21:36:55.973902 [ 2676807 ] {} <Debug> ClusterCopier: Executing ALTER query: ALTER TABLE ck_default.point_data_test_local ATTACH PARTITION 202109 FROM ck_default.point_data_test_local_piece_6
2021.12.23 21:37:12.261403 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:37:12.261475 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 7 to original table
2021.12.23 21:37:12.261500 [ 2676807 ] {} <Debug> ClusterCopier: Executing ALTER query: ALTER TABLE ck_default.point_data_test_local ATTACH PARTITION 202109 FROM ck_default.point_data_test_local_piece_7
2021.12.23 21:37:27.623009 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:37:27.623069 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 8 to original table
2021.12.23 21:37:27.623087 [ 2676807 ] {} <Debug> ClusterCopier: Executing ALTER query: ALTER TABLE ck_default.point_data_test_local ATTACH PARTITION 202109 FROM ck_default.point_data_test_local_piece_8
2021.12.23 21:37:40.992927 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:37:40.992987 [ 2676807 ] {} <Debug> ClusterCopier: Trying to move partition 202109 piece 9 to original table
2021.12.23 21:37:40.993029 [ 2676807 ] {} <Debug> ClusterCopier: Executing ALTER query: ALTER TABLE ck_default.point_data_test_local ATTACH PARTITION 202109 FROM ck_default.point_data_test_local_piece_9
2021.12.23 21:37:55.531416 [ 2676807 ] {} <Information> ClusterCopier: Number of nodes that executed ALTER query successfully : 6
2021.12.23 21:37:55.536704 [ 2676807 ] {} <Information> ClusterCopier: It took 6297.173648588 seconds to copy partition 202109: 377.38 GB uncompressed bytes, 4.39 billion rows and 5356530 source blocks are copied
- 最后会将中间过程生成的test_copy_piece_n表都删除掉,任务日志如下:
2021.12.23 21:37:55.536783 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables
2021.12.23 21:37:55.536796 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 0
2021.12.23 21:37:55.536835 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_0
2021.12.23 21:37:55.538885 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.538944 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 1
2021.12.23 21:37:55.538961 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_1
2021.12.23 21:37:55.540567 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.540596 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 2
2021.12.23 21:37:55.540626 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_2
2021.12.23 21:37:55.542177 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.542206 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 3
2021.12.23 21:37:55.542220 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_3
2021.12.23 21:37:55.543788 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.543831 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 4
2021.12.23 21:37:55.543847 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_4
2021.12.23 21:37:55.545196 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.545225 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 5
2021.12.23 21:37:55.545253 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_5
2021.12.23 21:37:55.546966 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.546997 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 6
2021.12.23 21:37:55.547013 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_6
2021.12.23 21:37:55.548483 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.548532 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 7
2021.12.23 21:37:55.548549 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_7
2021.12.23 21:37:55.550149 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.550182 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 8
2021.12.23 21:37:55.550213 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_8
2021.12.23 21:37:55.551712 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.551741 [ 2676807 ] {} <Debug> ClusterCopier: Removing helping tables piece 9
2021.12.23 21:37:55.551769 [ 2676807 ] {} <Debug> ClusterCopier: Execute distributed DROP TABLE: DROP TABLE IF EXISTS ck_default.point_data_test_local_piece_9
2021.12.23 21:37:55.581328 [ 2676807 ] {} <Information> ClusterCopier: DROP TABLE query was successfully executed on 6 nodes.
2021.12.23 21:37:55.581638 [ 2676807 ] {} <Debug> MemoryTracker: Peak memory usage (for query): 2.25 GiB.
2021.12.23 21:37:55.581696 [ 2676807 ] {} <Debug> MemoryTracker: Peak memory usage (for query): 2.25 GiB.
至此,迁移任务执行完毕。
补充说明
clickhouse_copier
为了能保证任务失败可以继续执行,会在zk
上保留一些状态信息。
在zk上除了刚刚自行创建的description
,还自动生成了其余三个。tables记录了表的同步任务,如下:如task.xml
所配置的相同
[zk: localhost:12181(CONNECTED) 7] ls /clickhouse/copier_task/task/tables
[des_manyun_ch_cluster.ck_default.point_data_test_local]
task_active_workers记录了有哪些正在执行任务的worker
[zk: localhost:12181(CONNECTED) 7] ls /clickhouse/copier_task/task/task_active_workers
des_manyun_ch_cluster.ck_default.point_data_test_local_copy
task_active_workers_version记录了任务执行到哪个阶段,如下:
[zk: localhost:12181(CONNECTED) 11] get /clickhouse/copier_task/task/task_active_workers_version
/clickhouse/copier_task/task/tables/des_manyun_ch_cluster.ck_default.point_data_test_local/202109/piece_9/shards/2