文章目录
- 1.前言
- 2. 基于事故现场对问题进行分析
- 2.1 日志分析
- 2.2 单独测试Topology代码试图重现问题
- 3. 源码解析
- 3.1 Client模式和Cluster模式下客户端的提交和启动过程
- 客户端提交时在两种模式下的处理逻辑
- ApplicationMaster启动时在两种模式下的处理逻辑
- 3.2 两种模式下的下层角色关系和启动以后的总体运行流程
- TaskScheduler和SchedulerBackend的创建
- DriverEndpoint的处理逻辑
- YarnSchedulerEndpoint的处理逻辑
- 3.3 SparkDriver和ApplicationMaster的通信
- ApplicationMaster的构建
- 3.4 ApplicationMaster和Yarn ResourceManager的通信
- 3.5 Executor的启动以及与Driver通信的建立
- 3.6 Spark的操作与Yarn上的Container状态的对应关系
- 3.7 动态分配的整个通信过程
- Driver根据物理执行计划生成Executor的放置的暗示信息
- Driver向ApplicationMaster发送Task的相关信息
- ApplicationMaster接收Driver的Task资源请求信息
- ApplicationMaster根据Task的请求信息向Yarn请求资源
- 启动Executor并将Task调度上去
- 3.8 Application执行结束以后关闭Executor
- 4. 事故后续处理
1.前言
我们线上一个分钟级的调度任务在正常情况下可以在1分钟之内完成。但是在最近经常超时失败,超时时间达到10min以上。从客户端看到的日志是SparkContext构造时与ApplicationMaster连接超时,似乎是连接问题导致的。后来发现并不是这样。
本文详细记录我们对该问题的处理过程,原理探究以及最后的解决方案。
抛开事故本身,我们从这次事故以及历次事故中得到的经验是:
- 找到根本原因很重要: 找出根本原因是系统越来越稳定、越来越好维护的唯一途径。
- 找到根本原因很耗时:一个开源系统,动辄几百台机器,一个错误往往处于一个很长的工作流中,涉及到很多的不同角色,这些不同的角色之间的关系需要梳理清楚,有些日志才能看懂。
- 并不总能在问题首次发生时找到原因: 在问题首次发生的时候,由于日志缺乏、现场缺乏等原因,我们往往无法定位到根本原因,这个特别正常。但是,着并不意味着我们无事可做,我们必须为在下一次问题出现的时候找到原因做好准备,比如,缩小怀疑范围的情况下添加关键日志,梳理一个问题再次发生的时候保存现场的流程(堆栈,内存dump,日志备份)等。
- 根本原因和表面的异常很可能南辕北辙:对于一个复杂的开源系统,一次事故的根本原因往往和最初呈现出来的现象不同,甚至南辕北辙;所以,在定位到根本原因以前进行合理的推测并采取行动去验证很重要,但是,不看一行日志、不看一行代码、不做一点儿测试的空对空的推测和讨论毫无意义,浪费时间。
- 即使不能找到根本原因,也可以在根本原因层面消除问题: 在我们的case中,我们没有找到机器具体发生了什么问题,但是,至少我们能确定是这台机器的问题并且关闭这台机器,那么这个操作是在根本原因层面消除了问题,也是在这个事故中我们能采取的最好的办法
- 最好能确定在第二次发生问题的时候定位到原因:在事故首次发生的时候,由于日志不足、现场丢失而无法定位根本原因,但是基于事后对代码的分析缩小了怀疑范围,通过增加的日志提供了更多信息,通过梳理好的事故处理流程来准确保存现场,我们因此能够确保在事故第二次发生的时候定位到根本原因,那么对这一次事故的处理就已经非常完美了。不苛求在事故首次发生的时候就找到根本原因,这也不现实。
- 影响稳定性的往往只有有限的几个bug:一个复杂的开源系统可能有无数的bug,但是在我们的使用场景下、工作负载下显著影响业务正常运行的bug可能并不是很多。在这些bug出现的时候准确地发现他们并解决,系统SLA可能会显著提升。
- 必须严格区分因果关系。对于一个不正常现象,比如负载变高,连接超时等等,必须清楚地区分出这是导致问题的原因还是问题导致的结果。这个区分不好,会误导我们往错误的方向越走越远,离根本原因越来越远。
2. 基于事故现场对问题进行分析
2.1 日志分析
下面讲述了我们对这个事故的分析过程。请注意,我们对整个事故的分析、定位的过程依赖了很多Spark和Yarn本身的知识,尤其是对照日志、代码进行事故定位,这些知识很多事在后来进行事故复盘的时候获取的。由此可见,解决事故和分析事故根本原因其实是两件不同的事情。
我们的分钟级Spark任务由Airflow进行调度,采用Client模式运行,即,ssh到指定的一台机器上运行Driver进程,Driver进程与ResourceManager通信,提交应用程序,Yarn会在集群中启动ApplicationMaster,然后Driver与ApplicationMaster通信,进行资源调度等等。
我们从客户端看到的问题是,SparkContext在构造的时候发生超时:
我们首先在Driver端看到的异常日志如下:
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:567)
at org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:344)
at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:295)
at org.apache.spark.ExecutorAllocationManager$$anon$2.run(ExecutorAllocationManager.scala:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 11 more
...
[2024-04-25 09:00:40,857] {ssh_operator.py:133} INFO - 24/04/25 09:00:40 WARN netty.NettyRpcEnv: Ignored failure: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
[2024-04-25 09:00:40,897] {ssh_operator.py:133} INFO - 24/04/25 09:00:40 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
[2024-04-25 09:00:55,897] {ssh_operator.py:133} INFO - 24/04/25 09:00:55 WARN cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
从日志可以看到,Driver由于2min的通信超时,启动两分钟以后就失败并退出了。
我们首先排除了Yarn本身的问题:
- SparkContext的超时问题在历史上偶尔出现,我们统计发现,只要SparkContext的构造超时,ApplicationMaster一定运行在某一台指定的机器上
- 并且, 我们运行这个关键任务的Yarn集群没有其它任务运行,是一个独立的隔离的Yarn集群,因此不会出现资源不够用、资源预留导致即使有可用资源也不给其它应用分配资源的情况
- 在事故发生的时候,我们立刻向Yarn提交了测试应用,应用正常运行并成功结束。
- 后面我们从日志可以看到,所有Container的状态都停留在ALLOCATED状态,即资源已经分配,只要ApplicationMaster进行换一次allocate()调用来认领这些已经分配的Container,这些Container的状态就会切换到ACQUIRED的预期状态。
- 如果ApplicationMaster本身没有Hang住,Driver和ApplicationMaster通信就不会Timeout
种种分析都将问题指向ApplicationMaster。
这个Driver端的RPC的默认超时时间是120s,似乎看起来是网络通信问题。但是,在很多情况下,Timeout也有可能是服务端GC导致了STW(Stop the World)从而无法响应客户端请求导致的。但是120s的STW却看起来不是特别合理。后面我们看到,这个Timeout很可能是ApplicaitionMaster的并发控制导致的,即这个RPC请求在ApplicaitonMaster端所调用的方法无法获得某个锁,而ApplicaitonMaster端获得锁的线程又出于某种原因无法结束。
在Client模式下,Driver端通过CoarseGrainedSchedulerBackend(区别Executor端的CoarseGrainedExecutorBackend)与ApplicationMaster进行通信。 我们看了一下CoarseGrainedSchedulerBackend.requestTotalExecutors()中,找到了ApplicationMaster对应的Container的日志,看看是否能有有用信息,对应日志如下所示:
24/04/25 08:57:17 INFO client.RMProxy: Connecting to ResourceManager at rccp102-8h.iad3.prod.conviva.com/10.6.6.123:8030
24/04/25 08:57:17 INFO yarn.YarnRMClient: Registering the ApplicationMaster
24/04/25 08:57:17 INFO util.Utils: Using initial executors = 112, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instan
ces
24/04/25 08:57:17 INFO yarn.YarnAllocator: Will request 112 executor container(s), each with 5 core(s) and 5940 MB memory (including 540 MB of overhead)
24/04/25 08:57:17 INFO yarn.YarnAllocator: Submitted 112 unlocalized container requests.
24/04/25 08:57:17 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
24/04/25 08:59:18 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. rccp102-8h.iad3.prod.conviva.com:43611
24/04/25 08:59:18 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
24/04/25 08:59:18 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. rccp102-8h.iad3.prod.conviva.com:43611
24/04/25 09:09:08 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
24/04/25 09:09:08 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
对于历史上成功执行的同一任务的日志如下:
24/06/12 15:10:14 INFO yarn.YarnAllocator: Submitted 112 unlocalized container requests.
24/06/12 15:10:14 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
24/06/12 15:10:15 INFO yarn.YarnAllocator: Launching container container_1715242483615_49376_01_000002 on host rccp103-10a.iad3.prod.conviva.com for executor with ID 1
24/06/12 15:10:15 INFO yarn.YarnAllocator: Launching container container_1715242483615_49376_01_000003 on host rccp103-10a.iad3.prod.conviva.com for executor with ID 2
24/06/12 15:10:15 INFO yarn.YarnAllocator: Launching container container_1715242483615_49376_01_000004 on host rccp103-10a.iad3.prod.conviva.com for executor with ID 3
对比失败应用和成功应用,可以看到,失败应用的ApplicationMaster在启动2min以后就知道了Driver已经不在了。但是随后跟ResourceManager的通信过程中有10分钟左右(08:59 ~ 09:09)没有任何的日志。正常情况下,应该打印Launching container
日志正常启动Container。
这10分钟的停顿时间很容易让我们认为是Full GC导致的停顿,因此Driver在跟ApplicationMaster通信的时候也超时了。但是我们并没有为ApplicationMaster打开GC日志。凭借一般经验,发生full gc似乎也不太可能,ApplicationMaster的代码是Spark的代码,没有任何的用户业务层逻辑,因此内存消耗应该是很稳定的。
所以,此时整个事件的基本轮廓是:
-
Driver会向Yarn 提交应用,Yarn启动对应的ApplicationMaster,这个Spark 的ApplicationMaster的Class是提交应用的时候指定的,是Spark自己定制的ApplicationMaster,其定义在
org.apache.spark.deploy.yarn.ApplicationMaster
中; -
随后,在ResourceManager端,我们看到这个Application的所有Container的状态都变成了ACQUIRED状态,这是Container被Kill以前的最后一个健康的状态:
2024-04-25 08:57:17,960 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1700028334993_233460_01_000035 Container Transitioned from ALLOCATED to ACQUIRED 2024-04-25 08:57:17,960 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1700028334993_233460_01_000036 Container Transitioned from ALLOCATED to ACQUIRED
-
ApplicationMaster在Yarn上的某个节点启动起来以后,Driver在方法
CoarseGrainedSchedulerBackend.requestTotalExecutors()
中通过RPC与ApplicationMaster通信,这个RPC请求的超时时间是2min。2min超时,通信失败,Driver自行退出。org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
-
2min以后,ApplicationMaster感知到了Driver的退出(底层的RPC连接断开),因此ApplicationMaster主动触发和Driver之间的断开连接:
24/04/25 08:59:18 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. rccp102-8h.iad3.prod.conviva.com:43611
-
又过了10分钟,即ApplicationMaster启动12分钟以后(距离上次的活跃时间是10min),ApplicationMaster进程也退出了。这个退出在ResourceManager端的日志如下。
2024-04-25 09:09:07,974 INFO org.apache.hadoop.yarn.util.AbstractLivelinessMonitor: Expired:appattempt_1700028334993_233460_000001 Timed out after 600 secs
显然,这个退出是由于ApplicationMaster的Liveness没有了心跳,因此ResourceManager主动将这个ApplicationMaster杀死,并试图启动第二个ApplicationMaster。这个ApplicationMaster的静默时间是由
yarn.am.liveness-monitor.expiry-interval-ms
配置的,默认是10min,与实际的现象吻合。
那么, 心跳是在哪里发送的呢? ApplicationMaster的Heartbeat消息并没有专门的接口发送给Yarn,而是通过以下接口进行辅助发送的:- 注册ApplicationMaster的registerApplicationMaster接口
- 用来获取资源分配结果的allocate()接口
- 结束ApplicationMaster的finishApplicationMaster()接口
由于registerApplicationMaster()和finishApplicationMaster()都只是在AM的生命周期开始和结束的时候才会调用,因此,频繁不断的心跳其实是通过allocate()接口来实现的。
allocate()接口是ApplicationMaster用来将相应的资源请求发送给Yarn,Yarn会将该调用视为一次心跳,同时在响应中放置新分配的Container的信息给ApplicationMaster。可以看出来,allocate()接口的调用是可以随时、随意进行的,也可以不携带任何资源请求信息,但是必须独立、不断调用以维持自己在ResourceManager中的liveness。
ResourceManager用来监控ApplicationMaster的存活,是在AbstractLivelinessMonitor中实现的,一个独立线程,不断检查所有的ApplicationMaster的最近心跳是否超过配置时间,查过了则会Kill这个ApplicationMaster。心跳的更新是通过receivedPing()方法进行的:
public synchronized void receivedPing(O ob) { //only put for the registered objects if (running.containsKey(ob)) { running.put(ob, clock.getTime()); } }
在allocate()方法被调用的时候,会调用receivePing()方法来更新最近的心跳时间:
@Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { ........ this.amLivelinessMonitor.receivedPing(appAttemptId);
所以,我们可以看到,从Driver这一端,因为连接到ApplicationMaster而无法响应,从而在2min以后就退出了。从ResourceManager这一端,同样因为10min都没有再收到ApplicationMaster的心跳而认为ApplicationMaster不再存活,因此将ApplicationMaster kill掉。所以基本可以认为ApplicationMaster的代码在某个位置hang住了。
后面我们会讲到,ApplicationMaster通过调用YarnAllocator. allocateResources() 进行资源请求的生成和申请,以及从Yarn中获取资源分配的结果:
--------------------------------------- YarnAllocator ------------------------------------
def allocateResources(): Unit = synchronized {
/**
* 这个方法没有任何返回值,主要是通过YarnClient发送amClient.addContainerRequest,这个请求里面的ContainerRequerst是根据当前的本地性要求的统计带有locality信息的。
* 真正获取container分配结果的是方法 amClient.allocate
*/
updateResourceRequests() // 这个方法会打印日志 Submitted 112 unlocalized container requests,在出问题的机器上打印了
val progressIndicator = 0.1f
/**
* 在updateResourceRequests()方法中调用了addContainerRequest()以后,会调用allocate()方法
* 调用了allocate()以后,RM端container的状态会变成acquired的状态
*/
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
// 这会打印日志 Launching container 和 Received %d containers from YARN, launching executors on
// 在出问题的机器上都没打印
handleAllocatedContainers(allocatedContainers.asScala)
}
- YarnAllocator. allocateResources() 会先通过
YarnAllocator.updateResourceRequests()
来生成对应的资源请求,但是这些请求会暂存在AMRMClient
, - 随后,Spark端的
YarnAllocator. allocateResources()
中会调用Yarn的APIAMRMClient.allocate()
进行资源的请求 -
AMRMClient.allocate()
方法调用发送资源请求以后,Yarn会在对应的response中返回新分配的Container,这些Container的状态都会因此从ALLOCATED进入ACQUIRED状态,代表这个Container已经被ApplicationMaster确认。 - Spark随后从
AMRMClient.allocate()
的Response中取出已经分配了资源的Container(对应了Yarn端已经进入ACQUIRED状态的Container),和Container对应的NodeManager通信,启动对应的Container:handleAllocatedContainers(allocatedContainers.asScala)
但是事故发生的时候,所有CONTAINER的状态停留在ACQUIRED状态,无法正常进入RUNNING状态,我们据此基本上可以认为,allocate()由于ApplicationMaster端被阻塞在某个地方,而没有完成handleAllocatedContainers()方法的调用以在NodeManager上启动Container。
关于ApplicationMaster的allocateResources()的调用时机,该方法会在ApplicationMaster启动的时候被调用一次,随后就会有一个专门的ReporterThread定期反复调用allocateResources()。
下面的日志说明,在ApplicationMaster启动的时候,YarnAllocator. allocateResources()
-> YarnAllocator.updateResourceRequests()
-> YarnAllocator.updateResourceRequests()
方法的确调用了一次。
24/04/25 08:57:17 INFO yarn.YarnAllocator: Submitted 112 unlocalized container requests.
同时,所有的Container都进入了ACQUIRED状态,说明Spark的ApplicationMaster在启动的时候已经通过调用AMRMClient.allocate()
方法将资源请求发送给了Yarn。因为这时候的ApplicationMaster刚启动,由于Yarn这一端的资源分配是异步的,所以即使立刻调用了allocate()方法,肯定也不会有Container已经分配成功从而进入ACQUIRED状态。所以,我们断定,所有Container既然能够进入ACQUIRED状态,说明后来的ReporterThread
启动成功,并且调用成功过一次AMRMClient.allocate()
。的确,下面的日志说明, ReporterThread
线程的确正常启动:
24/04/25 08:57:17 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
在allocateResources()方法中,会先通过updateResourceRequests()方法生成对应的Resource Request请求,这些请求会保存在Yarn的AMRMClient的本地暂存,然后,通过调用Yarn的allocate()方法,会将暂存的Resource Request发送给Yarn:
def allocateResources(): Unit = synchronized {
updateResourceRequests() // 这个方法会打印日志 Submitted 112 unlocalized container requests,在出问题的机器上打印了
val progressIndicator = 0.1
val allocateResponse = amClient.allocate(progressIndicator)
从这段代码我们可以得到一个重要信息:amClient.allocate() 的调用与updateResourceRequests()无关,即:无论updateResourceRequests()是否生成了新的资源请求放在amClient的本地缓存中,amClient.allocate() 都会被调用。这意味着:即使集群当前没有资源请求,amClient.allocate()也会不断被调用,因为,即使没有资源请求,ApplicationMaster也需要持续保持心跳。
在我们无法梳理清楚Driver端和ApplicationMaster通信的2min超时和所有Container无法从ACQUIRED状态进入RUNNING状态的关联关系的时候,我们突然惊讶地发现了synchronized关键字,即allocateResources()方法是一个同步方法。那么,如果allocateResources()方法出于某种原因Hang住,并且Driver端的调用也会触发这个ApplicationMaster对象的某一个synchronized方法,Driver端就会超时。
通过查看代码,我们确认,Driver端在hang住的API调用的位置,对应在ApplicationMaster上的确和allocateResources()会争抢同一把锁:
Driver端通过RequestExecutors消息向ApplicationMaster请求资源。ApplicationMaster收到RequestExecutors消息以后,会调用requestTotalExecutorsWithPreferredLocalities()方法,并且,这个方法的确是同步方法:
------------------------------------- ApplicationMaster --------------------------------------------
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: RequestExecutors => // 申请Executor的请求在这里接收
Option(allocator) match { // 如果已经构建了allocator
case Some(a) => // 只要一收到请求,requestTotalExecutorsWithPreferredLocalities
// 方法理就应该打印日志Driver requested a total number of
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
------------------------------------- ApplicationMaster --------------------------------------------
def requestTotalExecutorsWithPreferredLocalities(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
nodeBlacklist: Set[String]): Boolean = synchronized {
所以,整个hang住的流程基本就锁定在ApplicationMaster这一端的allocateResources()方法,这个synchronized
方法的无法退出,可以全部合理解释下面的问题:
- 为什么所有的Container的状态停留在ACQUIRED状态,无法进入RUNNING状态,即,为什么allocateResources()方法中val allocateResponse = amClient.allocate(progressIndicator)这一行代码被调用以后的代码比如handleAllocatedContainers()似乎hang住了导致allocateResources()无法退出;
- 为什么allocateResources()无法退出会导致Driver连接ApplicationMaster超时,因为ApplicationMaster端处理Driver连接的线程显然跟ReporterThread线程不是同一个线程。
在TODO这篇文章中,我们详细分析了以allocateResources()方法为入口的ApplicationMaster进行资源调度的基本流程。由于日志的缺失,我们根本无从断定ApplicationMaster在方法allocateResources()
中hang住的具体位置,但是在分析了allocateResources()整个代码,我们强烈怀疑故障发生在处理分配成功的Container和我们的请求进行匹配比对的过程:
---------------------------------------- YarnAllocator ---------------------------------------
/**
* Handle containers granted by the RM by launching executors on them.
*
* Due to the way the YARN allocation protocol works, certain healthy race conditions can result
* in YARN granting containers that we no longer need. In this case, we release them.
*
* Visible for testing.
* 在这里会打印 Launching container container_1714042499037_5294_01_000002 on host
*/
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host
.........
// Match remaining by rack
val remainingAfterRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterHostMatches) {
/**
* SparkRackResolver.
*/
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
......
// Assign remaining that are neither node-local nor rack-local
/**
* 如果Container分配成功,运行到这里,会打印 Launching container container_1714042499037_5294_01_000002 on host。但是失败的应用没有打印这个日志
*/
runAllocatedContainers(containersToUse)
}
这行代码的调用堆栈是YarnAllocator. allocateResources()
-> YarnAllocator.handleAllocatedContainers()
,即发生在已经通过Yarn的API AMRMClient.allocate() 获取了成功分配的Container以后对这个Container的处理过程,这个过程发生在方法handleAllocatedContainers()中,主要包括:
- 将Yarn分配的Container和我们的资源请求进行一次比对,以确定这些Container是分配给我们那些资源请求的,比对成功的(即这个Container对应了我们的这个资源请求),就可以删除这个资源请求避免重复申请。
- 对于比对成功的Container,ApplicationMaster就可以与这个Container对应的NodeManager通信以启动这个Container。
这个比对的过程是逐渐降级进行的,即先对比Host上匹配的Host-Local Request ,然后比对Rack上匹配的Rack-Local Request,最后再比对没有locality需求的资源请求。在Rack层的比对需要依赖系统的拓扑图解析出Rack信息。
我们怀疑这里的原因是,这个过程会涉及到读取系统的拓扑文件topology.map
,我司的DevOps后来发现Ubuntu 16(这台机器的)的一个普遍的bug,这个bug的表现是系统长时间运行会导致系统读写文件变满,通过dd命令进行测试,能够确认读写文件不正常,但是由于我不是操作系统和硬件专家,因此并不清楚这个问题是否是导致ApplicationMaster hang住。下文会详细介绍Spark端对系统拓扑结构进行分析的基本流程。
2.2 单独测试Topology代码试图重现问题
为了确认是否是系统的拓扑脚本导致的超时,我们单独提取了Spark中获取集群拓扑的解析过程的代码,在事发机器上单独运行看看是否会block住。我们查看了这一块儿的Spark逻辑,看到,在这块代码里,Spark作为调用者,其实是使用了Yarn的Library来完成的。因为Yarn在资源分配的时候其实也是Topology-Aware 的 。
SparkResolver的resolve()方法是调用Yarn的RackResolver来分析整个的系统拓扑,这里主要是为了获取这个host的rack:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.log4j.{Level, Logger}
/**
* Wrapper around YARN's [[RackResolver]]. This allows Spark tests to easily override the
* default behavior, since YARN's class self-initializes the first time it's called, and
* future calls all use the initial configuration.
*/
private[yarn] class SparkRackResolver {
def resolve(conf: Configuration, hostName: String): String = {
RackResolver.resolve(conf, hostName).getNetworkLocation()
}
在Yarn这一端,系统的拓扑解析过程需要实现接口 DNSToSwitchMapping,其中最重要的接口是resolve(),传入一系列的主机(或者IP)名称,返回这些主机(IP)的网路位置信息:
public interface DNSToSwitchMapping {
public List<String> resolve(List<String> names);
public void reloadCachedMappings();
public void reloadCachedMappings(List<String> names);
}
在Yarn中,DNSToSwitchMapping的实现类是通过net.topology.node.switch.mapping.impl配置的,默认情况下,Yarn提供的内置实现是ScriptBasedMapping:
package org.apache.hadoop.yarn.util;
......
import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.LimitedPrivate({"YARN", "MAPREDUCE"})
public class RackResolver {
....
public synchronized static void init(Configuration conf) {
....
Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
conf.getClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, // 默认的DNSToSwitchMapping实现类
DNSToSwitchMapping.class);
ScriptBasedMapping的行为,是通过在java中执行net.topology.script.file.name所配置的外部脚本文件,来返回系统的拓扑关系。这个配置没有默认值,但是在Cloudera的Yarn中,net.topology.script.file.name配置为topology.py,这个脚本输入对应的hostname的list,返回这些hostname的网络信息。
实际上,ClouderaManager是根据用户配置的集群拓扑关系,生成集群拓扑描述文件topology.map,一个记录了所有Host/IP的rack信息的XML文件,如下所示。在某个Host无法在拓扑描述文件中找到对应信息的情况下,会返回default
:
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<topology>
<node name="lashadoop-17-rack17.hadoop.prod.com" rack="/rack17"/>
<node name="10.72.2.22" rack="/default"/>
<node name="lashadoop-17-rack18.hadoop.prod.com" rack="/rack18"/>
<node name="10.72.1.33" rack="/default"/>
</topology>
我们在下面的代码中调用了Yarn的RackResolver