简介:本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB级的数据规模,简单的架构,被国内外公司广泛采用。本系列技术文章,将详细展开介绍 ClickHouse。
前言
本文翻译自 Altinity 针对 ClickHouse 的系列技术文章。面向联机分析处理(OLAP)的开源分析引擎 ClickHouse,因其优良的查询性能,PB 级的数据规模,简单的架构,被国内外公司广泛采用。
阿里云 EMR-OLAP 团队,基于开源 ClickHouse 进行了系列优化,提供了开源 OLAP 分析引擎 ClickHouse 的云上托管服务。EMR ClickHouse 完全兼容开源版本的产品特性,同时提供集群快速部署、集群管理、扩容、缩容和监控告警等云上产品功能,并且在开源的基础上优化了 ClickHouse 的读写性能,提升了 ClickHouse 与 EMR 其他组件快速集成的能力。访问 ClickHouse - E-MapReduce - 阿里云 了解详情。
译者:何源(荆杭),阿里云计算平台事业部高级产品专家
(图源Altinity,侵删)在 ClickHouse 中处理实时更新
目录
- ClickHouse 更新的简短历史
- 用例
- 实现更新
- 结论
- 后续
在 OLAP 数据库中,可变数据通常不受欢迎。ClickHouse 也不欢迎可变数据。像其他一些 OLAP 产品一样,ClickHouse 最初甚至不支持更新。后来添加了更新功能,但是像其他许多功能一样,都是以“ClickHouse 方式”添加的。
即使是现在,ClickHouse 更新也是异步的,因此很难在交互式应用程序中使用。尽管如此,在许多用例中,用户需要对现有数据进行修改,并期望立即看到效果。ClickHouse 能做到吗?当然可以。
ClickHouse 更新的简短历史
早在 2016 年,ClickHouse 团队就发布了一篇题为“如何在 ClickHouse 中更新数据”的文章。当时 ClickHouse 并不支持数据修改,只能使用特殊的插入结构来模拟更新,并且数据必须按分区丢弃。
为满足 GDPR 的要求,ClickHouse 团队在 2018 年提供了 UPDATE 和 DELETE。后续文章ClickHouse 中的更新和删除目前仍然是 Altinity 博客中阅读量最多的文章之一。这种异步、非原子性的更新以 ALTER TABLE UPDATE 语句的形式实现,并且可能会打乱大量数据。这对于批量操作和不频繁的更新是很有用的,因为它们不需要即时的结果。尽管“正常”的 SQL 更新每年都妥妥地出现在路线图中,但依然没能在 ClickHouse 中实现。如果需要实时更新行为,我们必须使用其他方法。让我们考虑一个实际的用例,并比较在 ClickHouse 中的不同实现方法。
用例
考虑一个生成各种报警的系统。用户或机器学习算法会不时查询数据库,以查看新的报警并进行确认。确认操作需要修改数据库中的报警记录。一旦得到确认,报警将从用户的视图中消失。这看起来像是一个 OLTP 操作,与 ClickHouse 格格不入。
由于我们无法使用更新,因此只能转而插入修改后的记录。一旦数据库中有两条记录,我们就需要一种有效的方法来获取最新的记录。为此,我们将尝试 3 种不同的方法:
- ReplacingMergeTree
- 聚合函数
- AggregatingMergeTree
ReplacingMergeTree
我们首先创建一个用来存储报警的表。
CREATE TABLE alerts( tenant_id UInt32, alert_id String, timestamp DateTime Codec(Delta, LZ4), alert_data String, acked UInt8 DEFAULT 0, ack_time DateTime DEFAULT toDateTime(0), ack_user LowCardinality(String) DEFAULT '' ) ENGINE = ReplacingMergeTree(ack_time) PARTITION BY tuple() ORDER BY (tenant_id, timestamp, alert_id);
为简单起见,将所有报警特定列都打包到一个通用的“alert_data”列中。但是可以想象到,报警可能包含数十甚至数百列。此外,在我们的示例中,“alert_id”是一个随机字符串。
请注意 ReplacingMergeTree 引擎。ReplacingMergeTee 是一个特殊的表引擎,它借助 ORDER BY 语句按主键替换数据——具有相同键值的新版本行将替换旧版本行。在我们的用例中,“行数据的新旧程度”由“ack_time”列确定。替换是在后台合并操作中进行的,它不会立即发生,也不能保证会发生,因此查询结果的一致性是个问题。不过,ClickHouse 有一种特殊的语法来处理这样的表,我们在下面的查询中就会用到该语法。
在运行查询之前,我们先用一些数据填充这个表。我们为 1000 个租户生成 1000 万个报警:
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data) SELECT toUInt32(rand(1)%1000+1) AS tenant_id, randomPrintableASCII(64) as alert_id, toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp, randomPrintableASCII(1024) as alert_data FROM numbers(10000000);
接下来,我们确认 99% 的报警,为“acked”、“ack_user”和“ack_time”列提供新值。我们只是插入一个新行,而不是更新。
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time) SELECT tenant_id, alert_id, timestamp, alert_data, 1 as acked, concat('user', toString(rand()%1000)) as ack_user, now() as ack_time FROM alerts WHERE cityHash64(alert_id) % 99 != 0;
如果我们现在查询这个表,会看到如下结果:
SELECT count() FROM alerts ┌──count()─┐ │ 19898060 │ └──────────┘ 1 rows in set. Elapsed: 0.008 sec.
表中显然既有已确认的行,也有未确认的行。所以替换还没有发生。为了查看“真实”数据,我们必须添加 FINAL 关键字。
SELECT count() FROM alerts FINAL ┌──count()─┐ │ 10000000 │ └──────────┘ 1 rows in set. Elapsed: 3.693 sec. Processed 19.90 million rows, 1.71 GB (5.39 million rows/s., 463.39 MB/s.)
现在计数是正确了,但是看看查询时间增加了多少!使用 FINAL 后,ClickHouse 执行查询时必须扫描所有的行,并按主键合并它们。这样能得到正确答案,但造成了大量开销。让我们看看,只筛选未确认的行会不会有更好的效果。
SELECT count() FROM alerts FINAL WHERE NOT acked ┌─count()─┐ │ 101940 │ └─────────┘ 1 rows in set. Elapsed: 3.570 sec. Processed 19.07 million rows, 1.64 GB (5.34 million rows/s., 459.38 MB/s.)
尽管计数显著减少,但查询时间和处理的数据量还是一样。筛选无助于加快查询速度。随着表增大,成本可能会更加巨大。它不能扩展。
注:为了提高可读性,所有查询和查询时间都像在“clickhouse-client”中运行一样显示。实际上,我们尝试了多次查询,以确保结果一致,并使用“clickhouse-benchmark”实用程序进行确认。
好吧,查询整个表没什么帮助。我们的用例还能使用 ReplacingMergeTree 吗?让我们随机选择一个 tenant_id,然后选择所有未确认的记录——想象用户正在查看监控视图。我喜欢 Ray Bradbury,那就选 451 好了。由于“alert_data”的值只是随机生成的,因此我们将计算一个校验和,用来确认多种方法的结果相同:
SELECT count(), sum(cityHash64(*)) AS data FROM alerts FINAL WHERE (tenant_id = 451) AND (NOT acked) ┌─count()─┬─────────────────data─┐ │ 90 │ 18441617166277032220 │ └─────────┴──────────────────────┘ 1 rows in set. Elapsed: 0.278 sec. Processed 106.50 thousand rows, 119.52 MB (383.45 thousand rows/s., 430.33 MB/s.)
太快了!我们只用了 278 毫秒就查询了所有未确认的数据。为什么这次很快?区别就在于筛选条件。“tenant_id”是某个主键的一部分,所以 ClickHouse 可以在 FINAL 之前筛选数据。在这种情况下,ReplacingMergeTree 就变得高效了。
我们也试试用户筛选器,并查询由特定用户确认的报警数量。列的基数是相同的——我们有 1000 个用户,可以试试 user451。
SELECT count() FROM alerts FINAL WHERE (ack_user = 'user451') AND acked ┌─count()─┐ │ 9725 │ └─────────┘ 1 rows in set. Elapsed: 4.778 sec. Processed 19.04 million rows, 1.69 GB (3.98 million rows/s., 353.21 MB/s.)
这个速度非常慢,因为没有使用索引。ClickHouse 扫描了全部 1904 万行。请注意,我们不能将“ack_user”添加到索引,因为它将破坏 ReplacingMergeTree 语义。不过,我们可以用 PREWHERE 进行一个巧妙的处理:
SELECT count() FROM alerts FINAL PREWHERE (ack_user = 'user451') AND acked ┌─count()─┐ │ 9725 │ └─────────┘ 1 rows in set. Elapsed: 0.639 sec. Processed 19.04 million rows, 942.40 MB (29.80 million rows/s., 1.48 GB/s.)
PREWHERE 是一个特别的妙招,能让 ClickHouse 以不同方式应用筛选器。通常情况下 ClickHouse 是足够智能的,可以自动将条件移动到 PREWHERE,因此用户不必在意。这次没有发生,幸好我们检查过了。
聚合函数
ClickHouse 因支持各种聚合函数而闻名,最新版本可支持 100 多种。结合 9 个聚合函数组合子(参见 Combinators | ClickHouse Documentation),这为有经验的用户提供了很高的灵活性。对于此用例,我们不需要任何高级函数,仅使用以下 3 个函数:“argMax”、“max”和“any”。
可以使用“argMax”聚合函数执行针对第 451 个租户的相同查询,如下所示:
SELECT count(), sum(cityHash64(*)) data FROM ( SELECT tenant_id, alert_id, timestamp, argMax(alert_data, ack_time) alert_data, argMax(acked, ack_time) acked, max(ack_time) ack_time_, argMax(ack_user, ack_time) ack_user FROM alerts GROUP BY tenant_id, alert_id, timestamp ) WHERE tenant_id=451 AND NOT acked; ┌─count()─┬─────────────────data─┐ │ 90 │ 18441617166277032220 │ └─────────┴──────────────────────┘ 1 rows in set. Elapsed: 0.059 sec. Processed 73.73 thousand rows, 82.74 MB (1.25 million rows/s., 1.40 GB/s.)
同样的结果,同样的行数,但性能是之前的 4 倍!这就是 ClickHouse 聚合的效率。缺点在于,查询变得更加复杂。但是我们可以让它变得更简单。
请注意,当确认报警时,我们只更新以下 3 列:
- acked: 0 => 1
- ack_time: 0 => now()
- ack_user: ‘’ => ‘user1’
在所有 3 种情况下,列值都会增加!因此,我们可以使用“max”代替略显臃肿的“argMax”。由于我们不更改“alert_data”,因此不需要对此列进行任何实际聚合。ClickHouse 有一个很好用的“any”聚合函数,可以实现这一点。它可以在没有额外开销的情况下选取任何值:
SELECT count(), sum(cityHash64(*)) data FROM ( SELECT tenant_id, alert_id, timestamp, any(alert_data) alert_data, max(acked) acked, max(ack_time) ack_time, max(ack_user) ack_user FROM alerts GROUP BY tenant_id, alert_id, timestamp ) WHERE tenant_id=451 AND NOT acked; ┌─count()─┬─────────────────data─┐ │ 90 │ 18441617166277032220 │ └─────────┴──────────────────────┘ 1 rows in set. Elapsed: 0.055 sec. Processed 73.73 thousand rows, 82.74 MB (1.34 million rows/s., 1.50 GB/s.)
查询变简单了,而且更快了一点!原因就在于使用“any”函数后,ClickHouse 不需要对“alert_data”列计算“max”!
AggregatingMergeTree
AggregatingMergeTree 是 ClickHouse 最强大的功能之一。与物化视图结合使用时,它可以实现实时数据聚合。既然我们在之前的方法中使用了聚合函数,那么能否用 AggregatingMergeTree 使其更加完善呢?实际上,这并没有什么改善。
我们一次只更新一行,所以一个组只有两行要聚合。对于这种情况,AggregatingMergeTree 不是最好的选择。不过我们有个小技巧。我们知道,报警总是先以非确认状态插入,然后再变成确认状态。用户确认报警后,只有 3 列需要修改。如果我们不重复其他列的数据,可以节省磁盘空间并提高性能吗?
让我们创建一个使用“max”聚合函数来实现聚合的表。我们也可以用“any”代替“max”,但列必须是可以设置为空的——“any”会选择一个非空值。
DROP TABLE alerts_amt_max; CREATE TABLE alerts_amt_max ( tenant_id UInt32, alert_id String, timestamp DateTime Codec(Delta, LZ4), alert_data SimpleAggregateFunction(max, String), acked SimpleAggregateFunction(max, UInt8), ack_time SimpleAggregateFunction(max, DateTime), ack_user SimpleAggregateFunction(max, LowCardinality(String)) ) Engine = AggregatingMergeTree() ORDER BY (tenant_id, timestamp, alert_id);
由于原始数据是随机的,因此我们将使用“alerts”中的现有数据填充新表。我们将像之前一样分两次插入,一次是未确认的报警,另一次是已确认的报警:
INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked; INSERT INTO alerts_amt_max SELECT tenant_id, alert_id, timestamp, '' as alert_data, acked, ack_time, ack_user FROM alerts WHERE acked;
请注意,对于已确认的事件,我们会插入一个空字符串,而不是“alert_data”。我们知道数据不会改变,我们只能存储一次!聚合函数将填补空白。在实际应用中,我们可以跳过所有不变的列,让它们获得默认值。
有了数据后,我们先检查数据大小:
SELECT table, sum(rows) AS r, sum(data_compressed_bytes) AS c, sum(data_uncompressed_bytes) AS uc, uc / c AS ratio FROM system.parts WHERE active AND (database = 'last_state') GROUP BY table ┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬──────────────ratio─┐ │ alerts │ 19039439 │ 20926009562 │ 21049307710 │ 1.0058921003373666 │ │ alerts_amt_max │ 19039439 │ 10723636061 │ 10902048178 │ 1.0166372782501314 │ └────────────────┴──────────┴─────────────┴─────────────┴────────────────────┘
好吧,由于有随机字符串,我们几乎没有压缩。但是,由于我们不必存储“alerts_data”两次,所以相较于不聚合,聚合后数据规模可以缩小一半。
现在我们试试对聚合表进行查询:
SELECT count(), sum(cityHash64(*)) data FROM ( SELECT tenant_id, alert_id, timestamp, max(alert_data) alert_data, max(acked) acked, max(ack_time) ack_time, max(ack_user) ack_user FROM alerts_amt_max GROUP BY tenant_id, alert_id, timestamp ) WHERE tenant_id=451 AND NOT acked; ┌─count()─┬─────────────────data─┐ │ 90 │ 18441617166277032220 │ └─────────┴──────────────────────┘ 1 rows in set. Elapsed: 0.036 sec. Processed 73.73 thousand rows, 40.75 MB (2.04 million rows/s., 1.13 GB/s.)
多亏了 AggregatingMergeTree,我们处理的数据更少(之前是 82MB,现在是 40MB),效率更高。
实现更新
ClickHouse 会尽最大努力在后台合并数据,从而删除重复的行并执行聚合。然而,有时强制合并是有意义的,例如为了释放磁盘空间。这可以通过 OPTIMIZE FINAL 语句来实现。OPTIMIZE 操作速度慢、代价高,因此不能频繁执行。让我们看看它对查询性能有什么影响。
OPTIMIZE TABLE alerts FINAL Ok. 0 rows in set. Elapsed: 105.675 sec. OPTIMIZE TABLE alerts_amt_max FINAL Ok. 0 rows in set. Elapsed: 70.121 sec.
执行 OPTIMIZE FINAL 后,两个表的行数相同,数据也相同。
┌─table──────────┬────────r─┬───────────c─┬──────────uc─┬────────────ratio─┐ │ alerts │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │ │ alerts_amt_max │ 10000000 │ 10616223201 │ 10859490300 │ 1.02291465565429 │ └────────────────┴──────────┴─────────────┴─────────────┴──────────────────┘
不同方法之间的性能差异变得不那么明显了。汇总表如下:
结论
ClickHouse 提供了丰富的工具集来处理实时更新,如 ReplacingMergeTree、CollapsingMergeTree(本文未提及)、AggregatingMergeTree 和聚合函数。所有这些方法都具有以下三个共性:
- 通过插入新版本来“修改”数据。ClickHouse 中的插入速度非常快。
- 有一些有效的方法来模拟类似于 OLTP 数据库的更新语义。
- 然而,实际的修改并不会立即发生。
具体方法的选择取决于应用程序的用例。对用户来说,ReplacingMergeTree 是直截了当的,也是最方便的方法,但只适用于中小型的表,或者数据总是按主键查询的情况。使用聚合函数可以提供更高的灵活性和性能,但需要大量的查询重写。最后,AggregatingMergeTree 可以节约存储空间,只保留修改过的列。这些都是 ClickHouse DB 设计人员的好工具,可根据具体需要来应用。
后续
您已经了解了在 ClickHouse 中处理实时更新相关内容,本系列还包括其他内容:
- 使用新的 TTL move,将数据存储在合适的地方
- 在 ClickHouse 物化视图中使用 Join
- ClickHouse 聚合函数和聚合状态
- ClickHouse 中的嵌套数据结构
原文链接
本文为阿里云原创内容,未经允许不得转载。