一、问题描述:
任务逻辑是通过实时读取Kafka数据,一分钟计算一次数据,并利用Flink StreamingFileSink将数据落地到HDFS文件中。为了应对大促剧增的数据量,对当前运行稳定的集群进行了扩容处理,任务重启后发现写入的hdfs文件一直处于inprogress状态无法滚动生成正式文件。
任务运行一段时间可能会出现如下错误:
二、解决过程:
- 开始是猜想可能是并行度过多,导致产生大量临时文件,文件句柄太多,关闭耗时过久导致文件一分钟内一直无法完成合并?将并行度调整到1,发现问题并没有解决。
- 又猜想是因为调整了checkpoint参数,禁用掉checkpoint失败后触发Flink任务重启策略导致checkpoint受影响无法完成临时文件合并?将代码回退,发现问题还没有解决。
- 通过观察文件目录,发现临时文件名前缀,跟已合并文件名的前缀一致,猜测可能是文件已存在,临时文件无法完成合并为同名文件?备份并清理掉历史数据,发现文件正常生成, 问题解决!!。
此解决方法为问题发生之后临时急救方案,主要适用于当前任务不依赖历史数据,数据可以清理的任务。剖析其深度原因之后可从根本上避免此类问题。详细请继续阅读下列原因深度剖析。
三、原因深度剖析:
3.1、StreamingFileSink原理简介
- StreamingFileSink是一个Flink连接器,用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。StreamingFileSink 会将数据写入到桶中。由于输入流可能是*的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含文件(part file)的一部分。多余的文件(part file)将根据滚动策略重新创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。本描述来自官网翻译,翻译不准确的地方还请各位多多批评指正!
提示:使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成后,桶中临时文件转成正式文件。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
本图为Flink 官网(官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html)对于StreamingFileSink的图示,可以很形象的描述其落地原理。
为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。由上图可知,文件(part file)可以处于以下三种状态之一:
1).In-progress :
当前文件正在写入中
2).Pending :
当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
3).Finished :
在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。
Flink目前对于Hdfs-Sink 有两种实现方式,即BucketingSink以及StreamingFileSink。StreamingFileSink是在BucketingSink之后推出的。主要区别在于StreamingFileSink可以用于故障恢复,保证exactly-once,但是要求hadoop版本必须在2.7以上,因为用到了hdfs的truncate方法。BucketingSink相对用法比较简单,并且没有版本要求。StreamingFileSink的exactly-once主要基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的,这也是为什么官网提示使用时一定要打开checkpoint开关的原因。上述描述的桶物理上对应一个文件夹、subtask表示Flink同一任务的不同子任务,换言之,就是不同并行度。数据流中读到一个元素,根据项目的BucketAssigner可以计算出该元素属于哪个分区,通过状态管理器可以获取到该分区下目前最大的正在写的文件编号是多少?然后写到对应的文件中。官网图比较抽象,根据源码及对官网描述的理解,本人画了一张更加详细的StreamingFileSink示意图如下:
3.2、 回到问题,part-0-561.xxx,文件名中数字来自哪里?为什么每个文件生成之后其编号是累加的?为什么会重启之后数字又会重新开始编号? 文件中数字来自哪里?
-
文件中数字来自哪里?为什么每个文件生成之后其编号是累加的?
上述描述桶物理映射就是一个文件夹名称,源码对桶的解释为:桶就是StreamingFileSink输出的目录组织。对于StreamingFileSink中的每个传入元素,通过用户指定的bucketsassigner,决定该元素应写入哪个bucket。每个新生成的文件名都是由前缀,Flink子任务的数量,文件编号,后缀组成。这就是文件中数字的由来。源码显示,由名为partCounter的一个变量+1得到的结果就是文件中的数字。 -
为什么重启之后数字又会重新开始编号?
由于StreamingFileSink基于checkpoint实现Exactly-Once,那么其必须实现Flink中关于checkpoint的两个接口CheckpointedFunction(快照拍摄功能接口), CheckpointListener (快照拍摄状态监听接口)源码如下:
既然实现了这两个接口,那么就需要关注快照初始化时做了什么操作?快照开始拍摄时做了什么?快照拍摄完成之后做了什么?从源码的下列三个方法作为入口剖析:
快照初始化时,根据子任务的数量创建了若干个桶管理器,并初始化了桶管理器的一些属性(详细属性可以见源码org.apache.flink.streaming.api.functions.sink.filesystem.Buckets),其中有个属性maxPartCounter,用来记录当前正在写的,或者最新写完文件的编号,快照初始化时将其值初值为0并保存在一个ListState中,每个桶对应一个maxPartCounter。通过源码追踪发现该值最终传入bucket并赋值给partCounter,用于默认文件命名。源码跳转路径为:
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink#initializeState→
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper#StreamingFileSinkHelper→
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeState→
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeActiveBuckets→
org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory#restoreBucket→
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl#restoreBucket→
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#restore→
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#Bucket(int, BucketID, org.apache.flink.core.fs.Path, long, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener<BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)
this.partCounter = initialPartCounter(maxPartCounter);
初始化做了什么?分析源码发现,初始化时,创建了一个桶并初始化了桶的一些属性,其中桶编号的获取用到属性maxPartCounter,见名知意,其作用是用来记录当前part的最大编号,其值初始化为0。而桶编号就是每个临时文件名的前缀。
数据流中每读到一个元素,通过用户设置的分桶策略找到该元素对应的桶,并将其写入文件,并且跟上次的partcount对比,获取当前的最大值并将最大值进行更新。
拍摄快照时,先将maxPartCounter的状态清空,然后仅记录当前Checkpoint编号的每个桶的maxPartCounter值,当前checkpoint成功,那么每个桶的最新文件编号及被记录在当前的装填*下次获取。
快照拍摄完成之后,会将临时文件合并为Finished状态的文件,其中bucketWriter就跟文件生成相关,其文件名就是涉及到上述描述的文件编号。
-
为什么启动一段时间后可能会报错?
报错的直接原因是因为任务手动停止任务时,没有手动保存快照即savepoint,任务可能停止在两个checkpoint时间段之间,下次任务重启之后,任务从最后一个成功的checkpoint点继续执行任务,此时如果继续写文件会发现文件已被合并,就会报 File does not exist 错误信息,原因如下图所示:
savepoint与checkpoint的工作原理一致,只不过检查点是自动触发的,而savepoint需要命令行触发或者web控制台触发。和checkpoint一样,savepoint也保存到稳定存储当中,用户可以从savepoint重启作业,而不用从头开始。checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场.
四、解决方案:
在维护任务,手动停止任务时,一定要保存快照。扩容及代码维护之后,要指定快照重启任务就可以从根本上避免该问题的产生。
五、总结
问题的根本原因:在手动停止任务时,StreamingFileSink依赖Checkpoint状态来记录当前checkpoint id对应最新生成文件的编号,下一个checkpoint id有新数据读取到时,会根据上一次状态记录的文件最大编号的值累加得到新元素对应文件的文件名,在停止任务时,没有保存快照,导致最后一次chekcpoint成功生成的文件编号没有被记录而丢失,下次任务重启时不指定快照重启,快照会重新进行初始化,文件名中编号又被初始化为0,临时文件在合并为Finished状态时,发现同一目录下已存在同样的文件,而无法进行覆盖导致文件一直处于正在写入状态。所以,当把本目录下历史数据清除掉之后,所有写入的文件重新从0开始编号,能正常完成文件的写入。
教训:在后续Flink任务中,如果涉及到有状态记录,chekcpoint等操作,在停止任务时一定不能暴力停止,一定要保存快照,平滑执行停止操作,让其状态能安全保存。否则,可能有些累计求值的数据会永久丢失,需要重置Kafka offset才能恢复。
知其然知其所以然:本文章分析方法同样适用于Flink Kafka sink,Kafka sink保持Exactly-Once原理也是基于两阶段事务提交方式实现的,大家有兴趣可以利用同样的分析方法去阅读Flink FlinkKafkaProducer源码,甚至后续有其他sink操作,需要具备容错机制,也可以参考此处Flink源码去实现。