org.apache.spark.deploy.yarn.Client.scala中的monitorApplication方法:
/** * Report the state of an application until it has exited, either successfully or * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED, * or KILLED). * * @param appId ID of the application to monitor. * @param returnOnRunning Whether to also return the application state when it is RUNNING. * @param logApplicationReport Whether to log details of the application report every iteration. * @return A pair of the yarn application state and the final application state. */ def monitorApplication( appId: ApplicationId, returnOnRunning: Boolean = false, logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", ) var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) val report: ApplicationReport = try { getApplicationReport(appId) } catch { case e: ApplicationNotFoundException => logError(s"Application $appId not found.") return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) case NonFatal(e) => logError(s"Failed to contact YARN for application $appId.", e) return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED) } val state = report.getYarnApplicationState if (logApplicationReport) { logInfo(s"Application report for $appId (state: $state)") // If DEBUG is enabled, log report details every iteration // Otherwise, log them every time the application changes state if (log.isDebugEnabled) { logDebug(formatReportDetails(report)) } else if (lastState != state) { logInfo(formatReportDetails(report)) } } if (lastState != state) { state match { case YarnApplicationState.RUNNING => reportLauncherState(SparkAppHandle.State.RUNNING) case YarnApplicationState.FINISHED => // reportLauncherState(SparkAppHandle.State.FINISHED) report.getFinalApplicationStatus match { case FinalApplicationStatus.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case FinalApplicationStatus.KILLED => reportLauncherState(SparkAppHandle.State.KILLED) case _ => reportLauncherState(SparkAppHandle.State.FINISHED) } case YarnApplicationState.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case YarnApplicationState.KILLED => reportLauncherState(SparkAppHandle.State.KILLED) case _ => } } if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { cleanupStagingDir(appId) return (state, report.getFinalApplicationStatus) } if (returnOnRunning && state == YarnApplicationState.RUNNING) { return (state, report.getFinalApplicationStatus) } lastState = state } // Never reached, but keeps compiler happy throw new SparkException("While loop is depleted! This should never happen...") }
其中:
if (lastState != state) { state match { case YarnApplicationState.RUNNING => reportLauncherState(SparkAppHandle.State.RUNNING) case YarnApplicationState.FINISHED => // reportLauncherState(SparkAppHandle.State.FINISHED) report.getFinalApplicationStatus match { case FinalApplicationStatus.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case FinalApplicationStatus.KILLED => reportLauncherState(SparkAppHandle.State.KILLED) case _ => reportLauncherState(SparkAppHandle.State.FINISHED) } case YarnApplicationState.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case YarnApplicationState.KILLED => reportLauncherState(SparkAppHandle.State.KILLED) case _ => } }
yarn state为finished的时候的状态细分不够明确,将原来的 reportLauncherState(SparkAppHandle.State.FAILED)注释掉,改成:
report.getFinalApplicationStatus match { case FinalApplicationStatus.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case FinalApplicationStatus.KILLED => reportLauncherState(SparkAppHandle.State.KILLED) case _ => reportLauncherState(SparkAppHandle.State.FINISHED) }
因为完成状态的final state可能很多种状态,KILLED、FAILED、SUCCESS都可能是final state。
如果只返回一个finished状态给SparkLauncher的SparkAppHandle的话,其实我们在自己的代码中是无法知道这个spark 任务到底是成功了还是失败了,只知道它完成了。
所以要细分一下完成状态,自己用SparkLauncher提交JOB的时候可以监控JOB在失败的时候报警。
此BUG在spark1.6.0中存在对应CDH5.7到CDH5.9的spark都有这个问题,新的版本中已经修复此BUG。
如果在使用CDH版本的spark,那么就自己改一下代码重新编译打包一下,部署一个自己的spark on yarn服务吧。