检查点,保存点,与状态恢复
Flink是一个分布式数据处理系统,这种场景下,它需要处理各种异常,例如进程终止、机器故障、网络中断等。因为tasks在本地维护它们的state,Flink必须确保在出现故障的情况下,state不会丢失,并且保持一致性。
在这一节,我们会介绍Flink用于保证exactly-once state 一致性的检查点与恢复机制。我们也会讨论Flink独特的保存点功能。
一致性检查点(consistent checkpoints)
Flink的恢复机制基于应用状态的一致检查点。在有状态的流应用中,一个一致性检查点是:在所有tasks处理了一个(相同的)输入后,当前时间点每个task的state副本。在为application做一个一致性检查点时,遵循的一个基本算法步骤如下:
1.
暂停所有输入流的消费
2.
等待所有未被处理的data完全被处理,表示所有tasks已经处理了它们所有的输入数据
3.
做检查点:复制每个task的state到一个远端持久性存储。在所有tasks完成了它们的副本后,检查点完成
4.
恢复消费输入流
需要注意的是,Flink并不是实现的这个朴素机制。我们之后会介绍在Flink中更复杂的检查点算法。
下图展示了一致性检查点:
此应用有一个source task,消费一个递增数的流,如1,2,3等等。流中的数据被分区到一个基数流,一个偶数流。在一个sum
operator中,有两个task,分别用于累加基数与偶数。Source task 存储当前输入流的偏移量作为state。Sum
task 将当前的累加和作为state并存储。上图中,在输入偏移量为5时,Flink做了一个检查点,此时两个task的累加和分别为6和9。
从一致性检查点恢复
在流的执行过程中,Flink定期为application的state做一致性检查点。在发生故障时,Flink使用最新的检查点,以一致性地还原application的状态并重新开始执行。下图展示了恢复的过程:
应用在恢复时有三个步骤:
1.
重启整个application
2.
重置所有stateful tasks的状态为最近的检查点
3.
恢复所有tasks的处理
这个检查点与恢复机制可以为应用state提供exactly-once一致性,因为所有的状态都可以恢复到做时间点的那一刻。一个数据源是否可以重制它的输入流,取决于它的实现,以及流被消费的源头(如外部系统或是接口)。例如,event
log如Kafka可以提供一个流中当前偏移量之前的records。相反,若是一个流是从一个socket消费的,则无法被重置,因为sockets在消费完一个数据后,会将它丢弃。因此,一个application在仅当所有输入流是由可重置的data
sources 消费时,它才能够以exactly-once state 一致性的方式运行。
在一个application从一个检查点重新开始运行后,它的内部state会与做检查点时的状态完全一致。然后它开始消费并处理所有位于检查点与故障时间点之间的数据。尽管这里隐含的是Flink会处理某些记录两次(在故障前与故障后),这个机制仍达到了exactly-once
state 一致性,因为所有的operator的状态都被重置到了它未见到这些数据之前的时间点。
我们必须指出,Flink的检查点与恢复机制仅重置一个流应用的internal state。在恢复时,取决于应用的sink
operator的不同,一些records可能会多次释放给下游的流,例如一个event log,一个文件系统,或是一个数据库。对于某些存储系统,Flink的sink
函数可以提供exactly-once
输出,例如,在检查点完成时才提交释放的records。另一个适用于大部分存储系统的方法是:幂等更新(idempotent
updates)。
Flink检查点算法
上文提到,Flink的恢复机制基于的是一致性检查点。一个朴素的实现是:暂停,做检查点,然后恢复应用执行。但是这种“stop-the-world”的行为,即使对于能接受中等延时的应用来说,也是不切实际的。在Flink中,它基于Chandy-Lamport算法(用于做分布式快照)实现了检查点机制。此算法并不停止整个应用的运行,而是将做快照的操作从流处理解耦出来,这样一些tasks可以持续运行,而其他tasks可以持久化它们的状态。下面我们介绍一下此算法是如何工作的。
Flink的检查点算法使用了一个特殊的record类型,称为一个检查点分界(checkpoint
barrier)。类似于水印,检查点barriers由source
operator注入到常规的流记录中,并且无法被其他records 赶超。每个检查点barrier会携带一个检查点ID,用于辨别它属于哪个检查点,并且将一个流在逻辑上分成两部分。在一个barrier之前,对state的所有修改,包含于此barrier的检查点。若是在一个barrier之后对state的所有修改,则包含于下一个检查点。
我们使用一个简单的流处理应用解释此算法的步骤。此应用由两个source tasks组成,每个task都消费一个递增数字的流。Source
tasks的输出被分区到两个流中,分别是奇数流和偶数流。每个分区都由一个task处理,用于计算接收到的数字的总和,并将更新后的sum值传递给一个sink。此应用的示意图如下:
JobManager 向每个source task发送一条包含一个新checkpointID的消息,以初始化一个检查点,如下图:
当一个data source task 收到这条消息时,它会停止释放records,在state
backend触发一个它本地状态的一个检查点,via
all outgoing stream partition,广播(带有检查点ID的)检查点barrier
。state
backend 在它的状态检查点完成后,会提醒task,task会在JobManager承认(acknowledge)检查点。在所有barriers被发出去后,source继续它的常规操作。通过注入barrier到它的输出流,source函数决定了:在流的哪个位置做检查点。下图显示了在两个source
tasks 对它们本地状态做完检查点,并释放检查点barrier后的流应用。
由source tasks释放的检查点barriers,会被传输到与它们相连的tasks中。类似于水印,检查点barrier被广播到所有相连的并行tasks中,以确保每个task都能从它们的每个输入流收到一个barrier。当一个task收到一个新检查点的barrier时,它会等待barriers从它所有的输入分区到达。在它等待时,对于尚未提供barrier的流分区,它会继续处理这些流分区中的records。对于已经提供了barrier的流分区,records不会被立即处理,而是被放入缓存。这个等待所有barrier到达的过程称为分界校准(barrier
alignment),如下图所示:
一旦一个task从它所有输入分区中,收到了全部的barriers。它开始在state
backend初始化检查点,并广播检查点barrier到它所有的下游tasks,如下图:
在所有检查点barriers已经被释放后,task开始处理被缓存的记录。在所有被缓存的记录被释放后,task
继续处理它的输入流。下图显示了应用在这个时间点的运行状况:
最终,检查点barriers 到达一个sink task。当一个sink
task 收到一个barrier时,它会做一个barrier
调整(alignment),给它自己的状态做检查点,并向JobManager确认(acknowledge)它已收到barrier。JobManager在收到一个application的所有task发送的checkpoint
acknowledge后,它会记录:此application的检查点完成。下图显示了检查点算法的最后一步,完成的检查点可以用于从故障中恢复一个application。
做检查点的性能影响
Flink的检查点算法可以在不停止整个application的情况下,从流应用中生成一致性分布式的检查点。然而,它会增加application的处理延时(processing
latency)。Flink
实现了轻微调整,以在某些特定条件下缓解性能影响。
在一个task对它的状态做检查点时,它会阻塞,并缓存它的输入。因为state可以变的很大,并且检查点的操作需要通过网络写入数据到一个远端存储系统,所以做检查点的操作可能会很容易就花费几秒到几分钟,这对于延时敏感的application来说,延时过长了。在Flink的设计中,做一个检查点是由state
backend负责的。一个task的state如何精确的被复制,取决于state
backend的实现。例如,FileSystem
state backend与RocksDB state backend支持异步做检查点。当一个检查点被触发时,state
backend在本地创建一个检查点的副本。在本地副本创建完成后,task继续它的正常处理。一个后端线程会异步地复制本地快照到远端存储,并在它完成检查点后提醒task。异步检查点可以显著地降低一个task从暂停到继续处理数据,这中间的时间。另外,RocksDB
state backend也有增量检查点的功能,可以减少数据的传输量。
另一个用于减少检查点算法对处理延时影响的技术是:微调barrier排列步骤。若是一个应用需要非常短的延时,并且可以容忍at-least-once
状态保证。Flink可以被配置为在buffer
alignment时对所有到达的记录做处理,而不是将这些记录为已经到达的barrier缓存下来。对于一个检查点,在它所有的barriers都到达后,operator为它的状态做检查点,现在这里可能也会包括:本应属于下一个检查点的records对state
做的修改。在错误发生时,这些records会被再次处理,也就是说,这里检查点提供的是at-least-once
一致性保证,而不是excatly-once
一致性保证。
保存点(Savepoints)
Flink的恢复算法是基于state检查点。检查点是定期执行并且会根据配置的策略自动丢弃。因为检查点的目的是用于确保一个application可以在错误发生时,自动恢复并重启,所以在一个application被明确地取消(终止)后,检查点也会被删除。然而,state的一致性快照除了用于错误恢复,也可以用于很多其他地方。
Flink其中一个非常有价值并很有特点的功能是保存点(savepoints)。原理上,保存点与检查点用的是相同的算法创建的,所以保存点其实就是:检查点加上一些额外的元数据。Flink不会自动做一个保存点,所以一个用户(或是外部调度器)需要明确地触发创建保存点。Flink也不会自动清理保存点。更多有关触发与清除保存点的操作会在之后的章节里详述。
使用保存点
给定一个application与一个与它兼容的保存点,我们可以从此保存点启动一个application。这个可以将应用的state初始化为保存点的state,并且让application从保存点被创建时地地方开始运行。这个行为看起来与应用从故障恢复其实是完全一致的,所以故障恢复实际上仅仅只是一个特殊的用法。使用保存点可以在同一个集群上以同样的配置启动同样的application。从一个保存点启动一个application可以让你做更多的事情。
·
你可以启动从保存点启动一个不同的但是兼容的应用。所以,你可以修复应用逻辑中的bug,并尽可能多的重新处理流的数据源中的事件,以修复(repair)应用的结果。被修改的application也可以用于跑A/B
测试,或是不同的业务逻辑测试。需要注意的是:application与保存点必须是兼容的,也就是说,application必须能够从保存点中加载state。
·
可以启动同样一个application,使用不同的并行度并对application做扩展或是缩容
·
可以在一个不同的集群上启动同一个应用。这个可以允许你迁移一个应用到一个更新的Flink版本,或是一个不同的集群
·
可以使用保存点暂停一个应用,并在之后恢复它。这个可以使得释放资源给更优先的应用变得可能,或是数据数据并不是持续提供的
·
可以做一个检查点用于版本,并将一个应用的state归档
因为保存点是一个非常强大的功能,许多用户会定期做检查点,以让应用可以及时的还原到之前的某个时间点。我们见过的一个较为有趣的使用方法是,使用保存点,不断地将一个流应用迁移到更省成本的集群中去。
从一个保存点启动一个应用
所有上面提到的保存点的使用场景,均遵循同样的模式。首先,为一个运行中的应用做保存点,然后使用它的state启动一个应用。在这节,我们会描述:从一个保存点启动一个应用时,Flink如何初始化它的state。
一个应用由多个operators组成。每个operator可以定义一个或多个keyed以及operator
states。Operators以(一个或多个)task并行的方式执行。所以,一个典型的application包含多个state,这些state的分布跨越多个(可能执行在不同TaskManager进程中的)operator
tasks。
下图显示了一个有三个operators的应用,每个operator
有两个tasks。Operator
1 有一个operator
state(OS-1)。另一个operator(OP-2)有两个keyed
states(KS-1
和
KS-2)。当一个保存点被制作时,所有tasks的states被复制到一个持久性存储位置。
Figure 3-26. Taking a savepoint from an
application and restoring an application from a savepoint
在保存点中的state 副本,以operator标识符和state名字的方式,进行组织管理。Operator标识符与state
名字是必须的,它们用于将一个保存点的states数据映射到一个正在启动应用中的operators的states。当一个应用程序从一个保存点启动后,Flink将保存点的数据重新分布到对应的operators的tasks中。
需要注意的是,保存点并不包含operator tasks 的信息。因为当一个应用以不同的并行度启动时,它的tasks数量是可能会变的。我们在之前已经讨论过Flink扩展有状态operators时使用的策略。
如果一个修改后的应用是从一个保存点启动的,则在保存点中的一个state,只能被映射到包含了对应operator标识符和state
名字的一个operator的应用中。默认情况下,Flink会分配独一无二的operator标识符。然而,一个operator的标志符是基于它上游operators的标志符,确定性地生成的。所以,当一个operators的上游operators有变动时,它本身的标志符也会改变。例如,当一个operator加入或是被移除。事实上,使用默认operator
标志符的application,在如何不丢失状态的情况下进行更新的场景下,能力是非常有限的。所以我们强烈建议手动为operators分配独一无二的标志符,而不是依赖于Flink默认的分配。我们会在之后的章节详细讨论此方法。
总结
在这章我们讨论的Flink的high-level架构,以及它内部的网络栈,事件-时间处理模型,状态管理,以及故障恢复机制。这些信息在设计高级流模型、建立配置集群、操作流应用、以及推算性能时,都会十分有用的
Normal
0
false
false
false
EN-US
ZH-CN
X-NONE
/* Style Definitions */
table.MsoNormalTable
{mso-style-name:"Table Normal";
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0in 5.4pt 0in 5.4pt;
mso-para-margin-top:0in;
mso-para-margin-right:0in;
mso-para-margin-bottom:8.0pt;
mso-para-margin-left:0in;
line-height:107%;
mso-pagination:widow-orphan;
font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-bidi-font-family:"Times New Roman";
mso-bidi-theme-font:minor-bidi;}
References:
Vasiliki
Kalavri, F* Hueske. Stream Processing With Apache Flink. 2019