本文是博主阅读Flink官方文档以及《Flink基础教程》后结合自己理解所写,若有表达有误的地方欢迎大伙留言指出。
1. 前言
流式计算分为有状态和无状态两种情况,所谓状态就是计算过程中的中间值。对于无状态计算,会独立观察每个独立事件,并根据最后一个事件输出结果。什么意思?大白话举例:对于一个流式系统,接受到一系列的数字,当数字大于N则输出,这时候在此之前的数字的值、和等情况,压根不关心,只和最后这个大于N的数字相关,这就是无状态计算。什么是有状态计算了?想求过去一分钟内所有数字的和或者平均数等,这种需要保存中间结果的情况是有状态的计算。
当分布式计系统中引入状态计算时,就无可避免一致性的问题。为什么了?因为若是计算过程中出现故障,中间数据咋办了?若是不保存,那就只能重新从头计算了,不然怎么保证计算结果的正确性。这就是要求系统具有容错性了。
2. 一致性
谈到容错性,就没法避免一致性这个概念。所谓一致性就是:成功处理故障并恢复之后得到的结果与没有发生任何故障是得到的结果相比,前者的正确性。换句大白话,就是故障的发生是否影响得到的结果。在流处理过程,一致性分为3个级别[1]:
- at-most-once:至多一次。故障发生之后,计算结果可能丢失,就是无法保证结果的正确性;
- at-least-once:至少一次。计算结果可能大于正确值,但绝不会小于正确值,就是计算程序发生故障后可能多算,但是绝不可能少算;
- exactly-once:精确一次。系统保证发生故障后得到的计算结果的值和正确值一致;
Flink的容错机制保证了exactly-once,也可以选择at-least-once。Flink的容错机制是通过对数据流不停的做快照(snapshot)实现的。针对FLink的容错机制需要注意的是:要完全保证exactly-once,Flink的数据源系统需要有“重放”功能,什么意思了?且听下面慢慢道来。
3. 检查点(Checkpoint)
Flink做快照的过程是基于“轻量级异步快照”的算法,其核心思想就是在计算过程中保存中间状态和在数据流中对应的位置,至于如何实现的会后续的博客中会详细说明。这些保存的信息(快照)就相当于是系统的检查点(checkpoint)(类似于window系统发生死机等问题时恢复系统到某个时间点的恢复点),做snapshot也是做一个checkpoint。在系统故障恢复时,系统会从最新的一个checkpoint开始重新计算,对应的数据源也会在对应的位置“重放“。这里的“重放”可能会导致数据的二次输出,这点的处理也在后续的博客中说明。
3.1 屏障(Barriers)
在Flink做分布式快照过程中核心一个元素Barriers的使用。这些Barriers是在数据接入到Flink之初就注入到数据流中,并随着数据流向每个算子(operator,这里所说的算子不是指类似map()等具体意义上个的,指在JobGraph中优化后的“顶点”),这里需要说明的有两点:
- 算子对Barriers是免疫的,即Barriers是不参与计算的;
- Barriers和数据的相对位置是保持不变的,而且Barriers之间是线性递增的;
如下图所示,Barriers将将数据流分成了一个个数据集。值得提醒的是,当barriers流经算子时,会触发与checkpoint相关的行为,保存的barriers的位置和状态(中间计算结果)。
Update:checkpoint是由JobManager中的CheckpointCoordinator周期性触发,然后在Task侧生成barrier,具体为:在Source task(TaskManager中)中barrier会根据命令周期性的在原始数据中注入barrier,而对非source task则是遇到barrier做checkpoint,即非source task其做checkpoint的时间间隔也许不是周期的,影响因素较多。此外,每个算子做checkpoint的方式也许不同。
可以打个比方,在河上有个大坝(相当于算子),接上级通知(Flink中的JobManager)要统计水流量等信息,所以有人在上游定期(source task)放一根木头(barrier)到河中,当第一木头遇到大坝时,大坝就记下通过大坝木头的位置、水流量等相关情况,即做checkpoint(实际生活中不太可能),当第二木头遇到大坝时记下第一个木头和第二根木头之间的水流量等情况,不需要重开始计算。这里先不管故障了,不然就不好解释相同的水的“重放”问题了。
当一个算子有多个数据源时,又如何做checkpoint了?
如下图,从左往右一共4副图。当算子收到其中一个数据源的barriers,而未收到另一个数据源的barriers时(如左1图),会将先到barriers的数据源中的数据先缓冲起来,等待另一个barriers(如左2图),当收到两个barriers(如左3图)即接收到全部数据源的barrier时,会做checkpoint,保存barriers位置和状态,发射缓冲中的数据,释放一个对应的barriers。这里需要注意是,当缓存中数据没有被发射完时,是不会处理后续数据的,这样是为了保证数据的有序性。
这里其实有一点需要注意的是,因为系统设置checkpoint的方式是通过时间间隔的形式(enableCheckpointing(intervalTime)
),所以会导致一个问题:当一个checkpoint所需时间远大于两次checkpoint之间的时间间隔时,就很有可能会导致后续的checkpoint会失败,若是这样情况比较严重时会导致任务失败,这样Flink系统的容错性的优势就等不到保证了,所以需要合理设计checkpoint间隔时间。
3.2 状态(State)
如下图所示,在一次snapshot中,算子会在接受到其数据源的所有barriers的以后snapshot它们的状态,然后在发射barriers到输出流中,直到最后所有的sink算子都完成snapshot才算完成一次snapshot。其中,在准备发射的barriers形成之前,state 形式是可以改变的,之后就不可以了。state的存贮方式是可以配置的,如HDFS,默认是在JobManager的内存中。
3.3 异步快照(asynchronous state snapshot)
上述描述中,需要等待算子接收到所有barriers后,开始做snapshot,存储对应的状态后,再进行下一次snapshot,其状态的存储是同步的,这样可能会造成因snapshot引起较大延时。可以让算子在存储快照时继续处理数据,让快照存储异步在后台运行。为此,算子必须能生成一个 state 对象,保证后续状态的修改不会改变这个 state 对象。例如 RocksDB 中使用的 copy-on-write(写时复制)类型的数据结构,即异步状态快照。对异步状态快照,其可以让算子接受到barriers后开始在后台异步拷贝其状态,而不必等待所有的barriers的到来。一旦后台的拷贝完成,将会通知JobManager。只有当所有的sink接收到这个barriers,和所有的有状态的算子都确认完成状态的备份时,一次snapshot才算完成。如何实现的,这点后续博客将仔细分析。
Ref:
[1]《Flink基础教程》
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html