Flink的常见异常众多,不可能面面俱到,所以想到哪儿写到哪儿,有漏掉的之后再补充。
部署和资源问题
(0) JDK版本过低
这不是个显式错误,但是JDK版本过低很有可能会导致Flink作业出现各种莫名其妙的问题,因此在生产环境中建议采用JDK 8的较高update(我们使用的是181)。
(1) Could not build the program from JAR file
该信息不甚准确,因为绝大多数情况下都不是JAR包本身有毛病,而是在作业提交过程中出现异常退出了。因此需要查看本次提交产生的客户端日志(默认位于$FLINK_HOME/logs目录下),再根据其中的信息定位并解决问题。
(2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...
一般都是因为用户依赖第三方包的版本与Flink框架依赖的版本有冲突导致。
(3) Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
就是字面意思,YARN集群内没有足够的资源启动Flink作业。检查一下当前YARN集群的状态、正在运行的YARN App以及Flink作业所处的队列,释放一些资源或者加入新的资源。
(4) java.util.concurrent.TimeoutException: Slot allocation request timed out
slot分配请求超时,是因为TaskManager申请资源时无法正常获得,按照上一条的思路检查即可。
(5) org.apache.flink.util.FlinkException: The assigned slot was removed
TaskManager的Container因为使用资源超限被kill掉了。首先需要保证每个slot分配到的内存量足够,特殊情况下可以手动配置SlotSharingGroup来减少单个slot*享Task的数量。如果资源没问题,那么多半就是程序内部发生了内存泄露。建议仔细查看TaskManager日志,并按处理JVM OOM问题的常规操作来排查。
(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out
TaskManager心跳超时。有可能是TaskManager已经失败,如果没有失败,那么有可能是因为网络不好导致JobManager没能收到心跳信号,或者TaskManager忙于GC,无法发送心跳信号。JobManager会重启心跳超时的TaskManager,如果频繁出现此异常,应该通过日志进一步定位问题所在。
作业问题
(1)org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
该异常几乎都是由于程序业务逻辑有误,或者数据流里存在未处理好的脏数据导致的,继续向下追溯异常栈一般就可以看到具体的出错原因,比较常见的如POJO内有空字段,或者抽取事件时间的时间戳为null等。
(2) java.lang.IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down
很多童鞋拿着这两条异常信息来求助,但实际上它们只是表示BufferPool、MemoryManager这些Flink运行时组件被销毁,亦即作业已经失败。具体的原因多种多样,根据经验,一般是上一条描述的情况居多(即Could not forward element to next operator错误会伴随出现),其次是JDK版本问题。具体情况还是要根据TaskManager日志具体分析。
(3) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]
Akka超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。如果负载或网络问题无法彻底缓解,需考虑调大akka.ask.timeout参数的值(默认只有10秒);另外,调用外部服务时尽量异步操作(Async I/O)。
(4) java.io.IOException: Too many open files
这个异常我们应该都不陌生,首先检查系统ulimit -n的文件描述符限制,再注意检查程序内是否有资源(如各种连接池的连接)未及时释放。值得注意的是,Flink使用RocksDB状态后端也有可能会抛出这个异常,此时需修改flink-conf.yaml中的state.backend.rocksdb.files.open参数,如果不限制,可以改为-1。
(5) org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '' are missing
在Flink内使用Java Lambda表达式时,由于类型擦除造成的副作用,注意调用returns()方法指定被擦除的类型。
检查点和状态问题
(1) Received checkpoint barrier for checkpoint before completing current checkpoint . Skipping current checkpoint
在当前检查点还未做完时,收到了更新的检查点的barrier,表示当前检查点不再需要而被取消掉,一般不需要特殊处理。
(2) Checkpoint expired before completing
首先应检查CheckpointConfig.setCheckpointTimeout()方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者barrier对齐太慢。
(3) org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible
我们知道Flink的状态是按key组织并保存的,如果程序逻辑内改了keyBy()逻辑或者key的序列化逻辑,就会导致检查点/保存点的数据无法正确恢复。所以如果必须要改key相关的东西,就弃用之前的状态数据吧。
(4) org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported
在1.9之前的Flink版本中,如果我们使用RocksDB状态后端,并且更改了自用MapState的schema,恢复作业时会抛出此异常,表示不支持更改schema。这个问题已经在FLINK-11947解决,升级版本即可。