Flink checkpoint失败

 

目录

前言

问题描述

问题定位

checkpoint的基本原理

思路

现象

问题解决

前言

Flink容错机制的核心部分是绘制分布式数据流和操作员状态的一致快照。这些快照充当一致的检查点,如果发生故障,系统可以回退到这些检查点。Flink绘制这些快照的机制在“分布式数据流的轻量级异步快照”中进行了介绍。它受 用于分布式快照的标准Chandy-Lamport算法的启发, 并且专门针对Flink的执行模型进行了量身定制。

问题描述

flink任务,从kafka中获取数据,经过处理,写入到另外的一个kafka中,开启了checkpoint,配置如下:

        CheckpointConfig config = env.getCheckpointConfig();

        // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 设置checkpoint的周期, 每隔3000 ms进行启动一个检查点
        config.setCheckpointInterval(3 * 60 * 1000);

        // 设置模式为AT_LEAST_ONCE,降低性能损耗
        config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        config.setMinPauseBetweenCheckpoints(500);

        // 检查点必须在2分钟内完成,或者被丢弃【checkpoint的超时时间】
        config.setCheckpointTimeout(2 * 60 * 1000);

        // 同一时间只允许进行一个检查点
        config.setMaxConcurrentCheckpoints(1);

通过flink web页面发现checkpoint总是失败,checkpoint超时(时间>CheckpointTimeout)。

问题定位

在解决这个问题之前,我们先应该了解下flink checkpoint的基本原理

checkpoint的基本原理

  1. jobManager发起checkpoint
  2. source Task将Barriers 注入到数据流中向下流动
  3. 中间operator从所有的输入通道中接收到Barriers后(对齐),制作快照,给jobmanager发送ack消息,同时将Barriers发送到其所有输出
  4. 最后sink完成checpoint后,整个checkpoint完成

注:flink提供了俩种语义,Exactly Once和At Least Once语义,俩者之间不同点在于,在并行度下,operator存在多个输入端,operator从其中一个输入端接收到Barriers后,会存在俩种情况

  1. Exactly Once:停止处理数据,等待所有输入端Barriers到达
  2. At Least Once:继续处理数据,不会阻塞处理

 

Flink checkpoint失败

思路

通过原理我们可以知道,排除配置问题(状态后端配置错误,因为统一配置,所以基本不会存在配置错误)外,影响checkpoint时间的因素Barriers对齐时间,

影响Barriers对齐其实本质是数据的流动问题,而影响数据流动的因素有俩个

  1. 反压:导致数据流动堵塞
  2. 数据倾斜:导致某一个输入端的Barriers到达慢,对齐时间长(对齐需要所有输入端都到达才可以

现象

我们通过flink web管理台里可以看到,如下图的失败详情,可以看到一个并行度为24的算子,17和23这俩个subtask,迟迟没有对齐,

Flink checkpoint失败

我们打开对应的任务监控,如下图,发现了数据倾斜问题,那么到底是什么原因导致的数据倾斜呢,

Flink checkpoint失败

在flink任务里可以看到这个东西,这个代表的就是flink的分区器,

Flink checkpoint失败

flink提供了如下的分区器策略

  1. ForwardPartitioner:FORWARD
  2. ShufflePartitioner,SHUFFLE
  3. RebalancePartitioner:REBALANCE
  4. RescalePartitioner:RESCALE
  5. KeyGroupStreamPartitioner:HASH
  6. CustomPartitionerWrapper:CUSTOM

Keyby:hash,最容易造成数据倾斜,通过上边的图,我们基本可以判断,是keyby导致到数据倾斜

 

问题解决

 

既然直接知道是keyby导致的数据倾斜,在keyby里key后缀增加随机数,使得数据均匀分布

.keyBy(new KeySelector<JSONObject, String>() {
    @Override
    public String getKey(JSONObject jsonObject) throws Exception {

        String eventcode = jsonObject.getString(Constants.EVENT_CODE);

        return eventcode + RandomUtils.nextInt(0, 128);
    }
})

问题解决!

Flink checkpoint失败

 

上一篇:二、InnoDB存储引擎


下一篇:Spark2.x精通:Checkpoint源码深度剖析