目录
前言
Flink容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照。这些快照充当一致的检查点,如果发生故障,系统可以回退到这些检查点。Flink绘制这些快照的机制在“分布式数据流的轻量级异步快照”中进行了介绍。它受 用于分布式快照的标准Chandy-Lamport算法的启发, 并且专门针对Flink的执行模型进行了量身定制。
问题描述
flink任务,从kafka中获取数据,经过处理,写入到另外的一个kafka中,开启了checkpoint,配置如下:
CheckpointConfig config = env.getCheckpointConfig();
// 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkpoint的周期, 每隔3000 ms进行启动一个检查点
config.setCheckpointInterval(3 * 60 * 1000);
// 设置模式为AT_LEAST_ONCE,降低性能损耗
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
config.setMinPauseBetweenCheckpoints(500);
// 检查点必须在2分钟内完成,或者被丢弃【checkpoint的超时时间】
config.setCheckpointTimeout(2 * 60 * 1000);
// 同一时间只允许进行一个检查点
config.setMaxConcurrentCheckpoints(1);
通过flink web页面发现checkpoint总是失败,checkpoint超时(时间>CheckpointTimeout)。
问题定位
在解决这个问题之前,我们先应该了解下flink checkpoint的基本原理
checkpoint的基本原理
- jobManager发起checkpoint
- source Task将Barriers 注入到数据流中向下流动
- 中间operator从所有的输入通道中接收到Barriers后(对齐),制作快照,给jobmanager发送ack消息,同时将Barriers发送到其所有输出
- 最后sink完成checpoint后,整个checkpoint完成
注:flink提供了俩种语义,Exactly Once和At Least Once语义,俩者之间不同点在于,在并行度下,operator存在多个输入端,operator从其中一个输入端接收到Barriers后,会存在俩种情况
- Exactly Once:停止处理数据,等待所有输入端Barriers到达
- At Least Once:继续处理数据,不会阻塞处理
思路
通过原理我们可以知道,排除配置问题(状态后端配置错误,因为统一配置,所以基本不会存在配置错误)外,影响checkpoint时间的因素Barriers对齐时间,
影响Barriers对齐其实本质是数据的流动问题,而影响数据流动的因素有俩个
- 反压:导致数据流动堵塞
- 数据倾斜:导致某一个输入端的Barriers到达慢,对齐时间长(对齐需要所有输入端都到达才可以
现象
我们通过flink web管理台里可以看到,如下图的失败详情,可以看到一个并行度为24的算子,17和23这俩个subtask,迟迟没有对齐,
我们打开对应的任务监控,如下图,发现了数据倾斜问题,那么到底是什么原因导致的数据倾斜呢,
在flink任务里可以看到这个东西,这个代表的就是flink的分区器,
flink提供了如下的分区器策略
- ForwardPartitioner:FORWARD
- ShufflePartitioner,SHUFFLE
- RebalancePartitioner:REBALANCE
- RescalePartitioner:RESCALE
- KeyGroupStreamPartitioner:HASH
- CustomPartitionerWrapper:CUSTOM
Keyby:hash,最容易造成数据倾斜,通过上边的图,我们基本可以判断,是keyby导致到数据倾斜
问题解决
既然直接知道是keyby导致的数据倾斜,在keyby里key后缀增加随机数,使得数据均匀分布
.keyBy(new KeySelector<JSONObject, String>() {
@Override
public String getKey(JSONObject jsonObject) throws Exception {
String eventcode = jsonObject.getString(Constants.EVENT_CODE);
return eventcode + RandomUtils.nextInt(0, 128);
}
})
问题解决!