一、概述
当程序出现问题需要恢复 State 数据的时候,只有程序提供支持才可以实现 State 的容错。
State 的容错需要依靠 CheckPoint 机制,这样才可以保证 Exactly-once 这种语义。但是注
意的是,它只能保证 Flink 系统内的 Exactly-once,比如 Flink 内置支持的算子。针对 Source
和 Sink 组件,如果想要保证 Exactly-one 的话,则这些组件本身支持这种语义。
1)CheckPoint 原理
Flink 中基于异步轻量级的分布式快照技术提供的 Checkpoint 容错机制, 分布式快照可以将
同一时间点 Task/Operator 的状态数据全局统一快照处理,包括 Keyed State 和 Operator State。
Flink 会在输入的数据集上间隔性地生成checkpoint barrier,通过栅栏(barrier)将间隔时间段内的数据
划分到相应的 checkpoint 中。如下图:
从检查点(CheckPoint)恢复如下图:
2)CheckPoint 参数和设置
默认情况下 Flink 不开启检查点的,用户需要在程序中通过调用方法配置和开启检查
点,另外还可以调整其他相关参数;
※ Checkpoint 开启和时间间隔指定:
开启检查点并且指定检查点时间间隔为 1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值
streamEnv.enableCheckpointing(1000);
※ exactly-once 和 at-least-once 语义选择:
exactly-once 语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,
不允许出现丢数据或者数据重复,与此同时,Flink 的性能也相对较弱
at-least-once 语义更适合于实时和吞吐量要求非常高但对数据的一致性要求不高的场景
如下通过 setCheckpointingMode() 方法来设定语义模式,默认情况下使用的是 exactly-once 模式
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACT LY_ONCE);
//或者
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LE AST_ONCE)
※ Checkpoint 超时时间:
超时时间指定了每次 Checkpoint 执行过程中的上限时间范围,一旦 Checkpoint 执行时间超过该阀值,
Flink 将会中断 Checkpoint 过程,并按照超时处理。该指标可以通过 setCheckpointTimeout 方法设定,默认为 10 分钟
streamEnv.getCheckpointConfig.setCheckpointTimeout(50000)
※ 检查点之间最小时间间隔:
该参数主要目的是设定两个 Checkpoint 之间的最小时间间隔,防止出现例如状态数据过大而导致 Checkpoint 执行时间过长,从而到导致 Checkpoint 积压过多,最终 Flink 应用密集地触发Checkpoint 操作,会占用大量计算资源而影响到整个应用的性能
streamEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
※ 最大并行执行的检查点数量:
通过 setMaxConcurrentCheckpoints() 方法设定能够最大同时执行的 Checkpoint 数量。默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时出发多个 Checkpoint,进而提升 Checkpoint 整体的效率
streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
※ 是否删除 Checkpoint 中 保存的数据:
设置为 RETAIN_ON_CANCELLATION:表示一旦 Flink 处理程序被 cancel 后,会保留 CheckPoint 数据,
以便根据实际需要恢复到指定的 Checkpoint
设置为 ELETE_ON_CANCELLATION:表示一旦 Flink 处理程序被 cancel 后,会删除 CheckPoint 数据,
只有 Job 执行失败的时候才会保存 CheckPoint
//删除
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
//保留
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
※ TolerableCheckpointFailureNumber:
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务
streamEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)