编译:辰山,阿里巴巴计算平台事业部 EMR 高级开发工程师,目前从事大数据存储方面的开发和优化工作
事务日志(Transaction log)是理解 Delta Lake 的一个关键点,很多 Delta Lake 的重要特性都是基于事务日志实现的,包括 ACID 事务性、可扩展元数据处理、时间回溯等等。本文将探讨什么是事务日志,如何在文件层面实现,以及怎样优雅地解决并发读写的问题。
什么是事务日志?
Delta Lake 的事务日志(简称 DeltaLog)是一种有序记录集,按序记录了 Delta Lake 表从生成伊始的所有事务操作。
事务日志有何作用?
单一信息源
Delta Lake 基于 Apache Spark 构建,用来支持多用户同时读写同一数据表。事务日志作为单一信息源——跟踪记录了用户所有的表操作,从而为用户提供了在任意时刻准确的数据视图。
当用户首次访问 Delta Lake 的表,或者对一张已打开的表提交新的查询但表中的数据在上一次访问之后已发生变化时,Spark 将会检查事务日志来确定该表经历了哪些事务操作,并将更新结果反馈给用户。这样的流程保证了用户所看到的数据版本永远保持与主分支一致,不会对同一个表产生有冲突的修改。
Delta Lake 原子性实现
原子性,作为 ACID 四个特性之一,保证了对数据湖的操作(例如 INSERT 或者 UPDATE)或者全部完成,或者全部不完成。如果没有原子性保证,那么很容易因为硬件或软件的错误导致表中的数据被部分修改,从而导致数据错乱。
事务日志提供了一种机制来保证 Delta Lake 的原子性。任何操作只要没有记录在事务日志中,都会被认为没有发生过。事务操作只有在完全执行成功后才会被记录到事务日志中,并且将事务日志作为单一信息源,这两者保证了数据的可靠性,保证用户可以安心处理 PB 级的数据。
事务日志如何工作?
将事务分解为原子提交
每当用户提交一个修改表的操作时(例如 INSERT, UPDATE 或 DELETE),Delta Lake 将该操作分解为包括如下所示的一系列离散的步骤:
- 添加文件:添加一个数据文件。
- 删除文件:删除一个数据文件。
- 更新元数据:更新表的元数据(例如修改表的名称、schema 或分区)。
- 设置事务:记录 Spark structured streaming 任务提交的 micro batch 的 ID。
- 更改协议:将事务日志切换到最新软件协议以支持新的特性。
- 提交信息:包含提交所需的信息——执行了什么操作、操作来源以及操作时间。
这些操作都会被记录在事务日志中,形成一系列原子的单元,称作提交。
例如,假设用户创建了一个事务,往表中新增一列,并且添加一些数据。Delta Lake 会将该事务分解成离散步骤,当事务完成后,将以下提交添加到事务日志中:
- 更新元数据:更改 schema 添加新列
- 添加文件:添加每个新的文件
事务日志在文件层面的实现
当用户创建一个 Delta Lake 的表时,会在 _delta_log 子目录下自动创建该表的事务日志。后续对表的修改操作都将被记录为有序的原子提交,写入事务日志中。每个提交都是一个 JSON 文件,序号从 000000.json 开始。之后的修改操作都将生成递增的文件序号,例如 000001.json、000002.json,以此类推。
举个例子,假如我们需要往数据文件 1.parquet 和 2.parquet 中添加新的记录,该事务会被自动写入到事务日志中,保存成 000000.json 文件。然后,我们又决定删除这些文件并且添加一个新的文件(3.parquet),这些操作将被记录成事务日志中的下一个新的提交 000001.json,如下图所示:
即使现在 1.parquet 和 2.parquet 已经不再是 Delta Lake 表中的数据,对它们的添加删除操作仍会记录在事务日志中,因为即便增删操作的作用最后相互抵消,但是这些操作是确实发生过的。Delta Lake 仍会保留这些原子提交,来保证当我们需要对事件进行审计,或者进行时间回溯查询表在某个历史时间点的视图时,我们都可以获得精确的结果。
另外,Spark 也不会从磁盘上删除这些文件,即使我们执行了删除了底层的数据文件的操作。用户可以通过 VACUMM 命令显示地删除不再需要的文件。
从 Checkpoint 文件快速重构状态
每隔 10 个提交,Delta Lake 会在 _delta_log 子目录下自动生成一个 Parquet 格式的 checkpoint 文件。
这些 checkpoint 文件保存了表在该时间点上的所有状态,而原生 Parquet 格式对 Spark 读取也比较友好和高效。换句话说,checkpoint 文件给 Spark 提供了一种捷径来重构表状态,避免低效地处理可能上千条的 JSON 格式的小文件。
为了同步提交进度,Spark 可以执行 listFrom 操作查看所有事务日志的文件,快速跳转到最新的 checkpoint 文件,这样只需处理该 checkpoint 之后的 JSON 提交即可。
下面详细阐述一下该工作流程,假设我们的提交一直创建到 000007.json,如下图所示,Spark 同步到该提交,也就是说已经将表的最新版本缓存在内存中。同时,其他提交者添加了新的提交一直创建到 000012.json。
为了包含这些新的事务并且更新我们的表状态,Spark 会运行 listFrom verion 7 操作来查看新的修改。
Spark 将会直接跳转到最新的 checkpoint 文件,而不是逐条处理所有的 JSON 文件,因为 checkpoint 文件包含了 commit #10 之前的所有表状态。现在,Spark 只需增量执行 0000011.json 和 0000012.json,来构建表的当前状态,然后将版本12缓存在内存中。通过这样的流程,Delta Lake 能够利用 Spark 来高效地维护任意时刻的表状态。
处理并发读写
我们已经阐述了事务日志的大致工作原理,接下来我们讨论一下如何处理并发。以上我们的示例基本覆盖了用户顺序提交事务的场景,或者说是没有冲突的场景。但如果 Delta Lake 处理并发读写会发生什么?
这个问题非常简单,由于 Delta Lake 是基于 Apache Spark 实现的,多个用户同时修改一个表完全是一种非常常见的场景,Delta Lake 使用了乐观锁来解决这个问题。
什么是乐观锁
乐观并发控制(又名“乐观锁”,Optimistic Concurrency Control,缩写“OCC”)是一种并发控制的方法,它假设多用户并发的事务在处理时不会彼此互相影响。乐观锁非常高效,因为在处理 PB 级大数据时,有很大概率不同用户处理的是数据的不同部分,乐观锁使得各事务能够在不产生锁的情况下处理各自影响的那部分数据。
举个例子,假设你和我在一起合作玩拼图游戏,只要我们负责拼不同的部分,比如你负责拼角,我负责拼边,那么我们完全可以同时处理我们各自负责的部分,最终能够以翻倍的速度完成拼图。只有当我们同时需要拿同一块拼图时才会发生冲突。这就是乐观锁的原理。
相对而言,有一些数据库使用了悲观锁,悲观锁假设了最坏的情况,就是说即使我们有 10,000 块拼图,也假设我们会同时拿同一块拼图,从而会造成大量的冲突情况。悲观锁规定同时只能有一个人对拼图进行操作,其他人不能同时操作拼图,这并不是一个完成拼图游戏的高效方式。
当然,即使使用乐观锁,还是会存在不同用户同时修改数据同一部分的场景,幸运的是,Delta Lake 有一套自己的协议来处理这个问题。
乐观处理冲突
为了提供 ACID 事务性,Delta Lake 有一套协议来规定 commit 如何排序(也就是数据库领域串行性 serializability 的概念),协议规定了如何处理同一时间点的有多个 commit。Delta Lake 通过互斥准则来处理这种场景,并且试图乐观处理冲突。协议允许 Delta Lake 实现 ACID 的隔离性准则(isolation),保证了经过多个并发提交后表的最终状态和单独顺序提交的结果是一致的。
通常来说,整个流程如下所示:
- 记录表的初始版本。
- 记录读写操作。
- 尝试提交。
- 如果另一个并发的提交已经成功,检查本次读的数据是否被修改。
- 重复以上步骤。
下图具体阐述了 Delta Lake 如何处理冲突,假设两个用户从同一个表中读取数据,然后同时去尝试添加数据。
- Delta Lake 在进行修改之前记录表的初始版本(version 0)。
- User 1 和 2 同时尝试添加新数据到表中,这样就会发生冲突,只有一个 commit 可以被接受并记录为 000001.json。
- Delta Lake 以互斥准则处理冲突,也就是说只有一个用户能够提交 000001.json,这里假设 User 1 的 commit 被接受,而 User 2 被拒绝。
- Delta Lake 将会乐观处理此冲突,而不是直接给 User 2 返回异常。检查是否有新的 commit 提交到该表,后台更新这些修改,然后基于更新后的表重试 User 2 的 commit (没有任何数据处理),最终成功生成 000002.json 的提交。
在大部分情况下,这种重试能够在后台无感知的完成,但也存在一些情况 Delta Lake 无法通过这种重试成功完成(例如 User 1 和 2 都同时删除同一个文件),在这种情况下将会返回异常给用户。
最后提一点,由于 Delta Lake 表的所有事务都直接保存在磁盘上,因此整个过程满足 ACID 的持久性(durability)特性,也就是说能够容忍诸如系统崩溃等错误情况。
其他应用场景
时间回溯
每张表的状态都是由记录在事务日志中中的所有 commit 所决定的,事务日志相当于提供了每一步操作的历史记录,详细记录了表从初始状态到当前状态的所有操作步骤。因此,我们可以通过遍历表从初始状态到某个时间点的所有 commit ,来重构出表在任意时间点的状态。这个强大的功能就是时间回溯,或者叫做数据版本控制。更多关于时间回溯的说明,可以参考 Introducing Delta Time Travel for Large Scale Data Lakes。
数据血缘和调试
事务日志确切地记录了 Delta Lake 表的所有修改,因此它能提供可信的数据血缘,这对治理、审计和合规目的很有用处。它也可以用来跟踪一些无意或者有错误的修改,从而能够回退到期望的版本。用户可以执行 DESCRIBE HISTORY 来查看指定修改附近的元数据。
总结
本文详细探讨了 Delta Lake 的事务日志,主要包含了以下几点:
- 事务日志是什么,怎样构建,以及提交(commit)如何在文件层面进行存储。
- 事务日志怎么作为单一信息源,实现 Delta Lake 的原子性特性。
- Delta Lake 如何计算表的状态,包括如何从最新 checkpoint 构建当前状态,以及怎样处理小文件问题。
- 通过 Apache Spark 来处理大规模元数据。
- 通过乐观锁实现并发读写。
- Delta Lake 如何通过互斥准则保证 commit 被正确排序,以及在冲突时如何解决。
原文链接:https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区数个Spark技术同学每日在线答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!