前言
之前文章有讲过flink的有状态算子。有状态算子就是讲算子产生的中间结果存储在Flink的一些内存数据结构中,比如ValueState、MapState等等。这可以算一种计算机制。比如对流中的某个字段进行累加,我们必须要保存累加的中间结果,下一个事件才能知道加谁。
但是距离容灾还差一段距离。比如job失败了,程序都没有了,内存中也就没有状态数据了。这时候就需要检查点了。所以,检查点就是定期保存任务状态的机制。但是状态又保存到哪里了呢?之前说过的状态后端。
检查点是用于错误恢复的,但这是有条件的:
- 一方面是数据源必须是可重放(replay)的,主要有两类,一类是以kafka为代表的消息队列,另一类是如hdfs这样的文件系统(非分布式文件系统当然也是可以的)
- 另一方面是可以持久化存储的状态的系统,主要是分布式文件系统。
检查点配置项详解
package it.kenn.checkpoint;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* checkpoint 测试
*/
public class CKDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//参数为生成checkpoint的时间,以及CheckpointingMode,默认为EXACTLY_ONCE,一般采用默认就可以,如果任务有超低延时需求,可以使用至少一次
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
//设置检查点模式也可以使用下面方式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
//设置超时时间,超时时间是说,这个检查点设置了,但是过去500ms还没有完成就认为这个检查点超时了,然后把这个检查点终止,也就是不再使用这个检查点了
env.getCheckpointConfig().setCheckpointTimeout(500);
/**设置检查点尝试之间的最小间隔。此设置定义检查点协调器在可能触发另一个检查点(相对于最大并发检查点数)后多久可以触发另一个检查点
* 比如下面的设置,下一个检查点将在上一个检查点完成后,不超过5s内启动
* 这个参数跟检查点生成的间隔时间是有点冲突的,这个参数不考虑上面提到的检查点持续时间和检查点间隔。或者说设置下面的参数以后,检查点自动就变成5s了,
* 如果觉得5s不够可以再将间隔设置的大一点,但是不能小于5s了
* 设置检查点的interval和设置检查点之间的间隔时间有啥不同呢?
* interval是物理时间的间隔,即时间只要过去1s了,就会生成一个检查点。但是设置检查点之间的间隔是说检查点完成1s了就会设置间隔,这个是跟检查点完成时间相关的
* 比如存储检查点的系统比较慢,完成一个检查点平均10s,然后下面检查点之间的间隔设置为5s,那么两个检查点生成的时间间隔就是15s
* 再简单点说,interval设置的是两个检查点生成时刻的间隔,而下面参数设置的是第一个检查点结束和第二个检查点创建(还没有结束)之间的间隔
**/
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
//设置检查点最大并发数.比如设置为1,表示只能有一个检查点,当这个检查点完成以后才有可能产生下一个检查点。这也是默认参数。如果定义了上面的设置,则不能再定义这条设置项
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置外部检查点。可以将检查点的元数据信息定期写入外部系统,这样当job失败时,检查点不会被清除。这样如果job失败,可以从检查点恢复job。这条很重要,下面一节会详细说明
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//检查点和保存点都可以用于容灾,这个配置是说如果有一个时间更近的保存点,我也不用,而更加愿意使用检查点进行容灾
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
//设置使用非对齐检查点。它可以极大减少背压情况下检查点的时间,但是只有在精确一次的检查点并且允许的最大检查点并发数量为1的情况下才能使用
env.getCheckpointConfig().enableUnalignedCheckpoints();
}
}
检查点保存配置
默认情况下,状态(state)保存在TM的内存中,而检查点(checkpoint)保存在JM的内存中。检查点是为本应用的job任务容灾准备的,即如果应用仍在跑的,但是某个job失败了可以通过检查点进行恢复。但如果应用被取消了,或者应用都宕机了,检查点的数据也就不存在了(因为是存在JM中,应有董没了,内存中的数据肯定也没有了)。要想应用宕机,检查点仍然存在就要设置将检查点存一份在外部系统中。
ExternalizedCheckpointCleanup
模式配置当取消应用时,检查点该如何操作。
-
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
表示取消作业是保留检查点,这里说的取消是正常取消不是任务失败。如果重启任务,检查点不会自动清除,如果需要清除则需要手动清除。
-
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
表示取消作业时删除检查点,如果任务失败,检查点不会删除。也就是说任务失败可以从检查点恢复任务。
检查点目录结构
检查点保存目录可以在flink-conf.yml文件中进行全局配置也可以对某个应用进行单独配置
# yml中的配置是全局的,即每个应用都用这个路径
state.checkpoints.dir: hdfs:///checkpoints/
//在代码中对某个应用进行配置,这其实就是在设置状态后端
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));
在配置的目录下,以jobid再分目录,每个jobid对应的目录下存放本应用的所有检查点。
从检查点恢复应用
检查点的数据已经持久化到分布式文件系统中了。如果重启应用的时候想恢复状态可以执行下面命令
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
总结
- 检查点是用来存储状态(state)的,默认情况下保存在JM中,如果任务取消或者失败,检查点就丢了
- 可以通过设置检查点的外部存储选项将检查点保存在外部分布式文件系统中。
- 如果用于容灾,只将检查点保存在外部系统中也是不够的,需要flink处理的数据是可重放数据源来的,比如消息队列和各种文件系统
- 重启任务从检查点恢复需要指定检查点的存放路径