Could not flush and close the file system output stream

一个Flink消费Kafka数据的程序,Flink on Yarn模式,之前在测试和生产环境发布,都是正常的,没有任何问题,但是后来把测试环境重启了一下,又重新发了一次,竟然报错了,错误如下:

2019-07-01 15:19:25,984 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Sink: Coupon Sink (1/1) (28578957b82c7fccd680cc4fb5fbb7cd) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 8 for operator Source: Custom Source -> Sink: Coupon Sink (1/1).}
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 8 for operator Source: Custom Source -> Sink: Coupon Sink (1/1).
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
	... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://cxhadoop/flink/checkpoints/292e9f2140f8abc69acaadb99cfd4c58/chk-8/91154fad-3667-4dd3-9b1d-a503c0054207 in order to obtain the stream state handle
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
	... 5 more
Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://cxhadoop/flink/checkpoints/292e9f2140f8abc69acaadb99cfd4c58/chk-8/91154fad-3667-4dd3-9b1d-a503c0054207 in order to obtain the stream state handle
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
	... 7 more
Caused by: java.io.IOException: DataStreamer Exception: 
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:695)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hdfs.protocol.HdfsConstants
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1413)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)

从日志中可以看出是checkpoint文件出了问题。虽然错误很多,但是主要有两个关键性问题
Could not materialize checkpoint 8 for operator SourceCould not flush and close the file system output stream。前面的错误是导致后面的错误的原因,不难分析出是checkpoint文件创建出现了问题。查看了一下flink中关于checkpoint的配置:

state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: hdfs://cxhadoop/flink/checkpoints
state.checkpoints.num-retained: 20

# Default target directory for savepoints, optional.
#
state.savepoints.dir: hdfs://cxhadoop/flink/savepoints

从中发现了多了state.checkpoints.num-retained这一行配置,该配置是checkpoint目录最多保留的个数,如果超出这个配置,无法创建了。再去看看checkpoint目录中的个数,发现早就超出了这个配置,这个配置导致了flink程序的报错。每次启动flink程序,都无法正常创建checkpoint目录文件,所以将这个配置注释掉即可,不限制保留个数。

上一篇:IO_综合_对接流


下一篇:innodb_flush_log_at_trx_commit参数