一、背景
目前 yarn 集群 360 个FLink实时作业,90% 都是使用 flink1.13.3 + cdc2.1 ,在12月17号发现一个流任务:xxx_mysql_kafka 运行无异常,但是一直不往 Kafka 发送最新数据
二、问题排查
1、 根据该任务 application id 查看具体运行在那个 yarn 节点上
- 堡垒机登录该节点,切换到 yarn 用户下,使用 jstat -gcutl pid 查看该 jvm 进程,发现频繁 young gc 且 full gc 次数过多,达到 30 次
查看到 FUll GC 非常频繁
- 使用 jmap 将该 jvm 进程的 heap 内存 dump 下来,用 mat 进行分析:/usr/java/jdk1.8.0_144/bin/jmap -dump:format=b,file=3512.hprof 3512,将 dump 文件导入 mat 中
- 根据 mat 提示信息,定位问题代码,发现出现问题的对象是 ResultSetImpl ,这是从 MySQL 读取数据用到的对象。
使用 idea 打开 flink-cdc-connector 的代码,分支切换到 release-2.1 ,查看 MySqlSnapshotSplitReadTask 类的代码,问题出现下面这一段代码,flink-cdc 从 MySQL 读取数据,然后发送到下游,异常的是这一批次读取出来的对象有 50w+ 条,这应该是出问题的具体原因
5、排查这一批次分片读取 50W 数据的原因,任务启动时设置的 chunk.size 为 2w,理论上应该每批次读取 2w 才对,查看 flink-cdc 源码,具体的分片操作在 ChunkSplitter类的 splitTableIntoChunks 函数
因为抽取的表主键类型是 bigint ,所以触发了 flink-cdc 的分片优化逻辑,虽然我们配置的 chunk.size 是 2w,但是经过 flink-cdc 内部优化,优化后的 chunk.size 为 2w * 120 = 240w 左右
具体的分片优化逻辑如下:
a. 计算该表数据的分布因子:最大主键id - 最小主键 id = 主键最大最小差
b. 主键最大最小差 / 该表数据量 = 数据分布因子
c. 数据分布因子 * 任务启动输入的 chunk.size = 优化后的分片大小
出问题表的计算过程:(202112061064180438(最大主键) - 202112061000977284(最小主键) / 529696(该表数据量) = 119.3197(数据分布因子),最终的分片大小为 119.3197*2w = 240w,
因为该表数据的最大id与最小id差值较大,但是该表数据量只有 50w+,导致 flink-cdc 做了一个反向优化,最后出问题了
查看该任务的 jobmnager 日志,验证了上面的猜测,该表的最终分片大小为:2443667
-
出现问题的具体原因已经找到,解决方案
a. 降低 split-key.even-distribution.factor.upper-bound 比例,默认为 1000,可调至 10,这样最大批次的数据量为 10*chunk.size b. 不使用增量抽数模式:snapshot :scan.incremental.snapshot.enabled = false