一个Flink消费Kafka数据的程序,Flink on Yarn模式,之前在测试和生产环境发布,都是正常的,没有任何问题,但是后来把测试环境重启了一下,又重新发了一次,竟然报错了,错误如下:
2019-07-01 15:19:25,984 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Coupon Sink (1/1) (28578957b82c7fccd680cc4fb5fbb7cd) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 8 for operator Source: Custom Source -> Sink: Coupon Sink (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 8 for operator Source: Custom Source -> Sink: Coupon Sink (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://cxhadoop/flink/checkpoints/292e9f2140f8abc69acaadb99cfd4c58/chk-8/91154fad-3667-4dd3-9b1d-a503c0054207 in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://cxhadoop/flink/checkpoints/292e9f2140f8abc69acaadb99cfd4c58/chk-8/91154fad-3667-4dd3-9b1d-a503c0054207 in order to obtain the stream state handle
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: java.io.IOException: DataStreamer Exception:
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:695)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hdfs.protocol.HdfsConstants
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1413)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1357)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:587)
从日志中可以看出是checkpoint文件出了问题。虽然错误很多,但是主要有两个关键性问题
Could not materialize checkpoint 8 for operator Source和Could not flush and close the file system output stream。前面的错误是导致后面的错误的原因,不难分析出是checkpoint文件创建出现了问题。查看了一下flink中关于checkpoint的配置:
state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: hdfs://cxhadoop/flink/checkpoints
state.checkpoints.num-retained: 20
# Default target directory for savepoints, optional.
#
state.savepoints.dir: hdfs://cxhadoop/flink/savepoints
从中发现了多了state.checkpoints.num-retained这一行配置,该配置是checkpoint目录最多保留的个数,如果超出这个配置,无法创建了。再去看看checkpoint目录中的个数,发现早就超出了这个配置,这个配置导致了flink程序的报错。每次启动flink程序,都无法正常创建checkpoint目录文件,所以将这个配置注释掉即可,不限制保留个数。