首先,来看怎么构造一个org.apache.hadoop.yarn.client.api.YarnClient
1
2
3
4
5
|
class ClientHelper(conf: Configuration) extends
Logging {
val yarnClient = YarnClient.createYarnClient
info( "trying to connect to RM %s"
format conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
yarnClient.init(conf);
yarnClient.start
|
!!!这个client还有个start方法,看来它跟RM很谈得来。的确,它实现了service这个接口。 好吧,它是一个服务。在YarnJobFactory中,我们用yarn-site.xml构造了一个YarnConfiguration对象,现在用它来初始化YarnClient,因为我们至少需要RM在哪,对不?
下边分几部分看submitApplication方法的实现
第一次调用YarnClient - 获取信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = { val app = yarnClient.createApplication
val newAppResponse = app.getNewApplicationResponse
var mem = memoryMb
var cpu = cpuCore
// If we are asking for memory more than the max allowed, shout out
if
(mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
throw
new SamzaException( "You‘re asking for more memory (%s) than is allowed by YARN: %s"
format
(mem, newAppResponse.getMaximumResourceCapability().getMemory()))
}
// If we are asking for cpu more than the max allowed, shout out
if
(cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) {
throw
new SamzaException( "You‘re asking for more CPU (%s) than is allowed by YARN: %s"
format
(cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores()))
}
appId = Some(newAppResponse.getApplicationId)
|
首先通过yarnClient的createApplication方法获取一个YarnClientApplication对象。这是对RM的第一次请求,那么这次请求能得到什么信息呢?
通过这次请求得到的YarnClientApplication对象有两个方法:
- getApplicationSubmissionContext() ,
它返回一个 ApplicationSubmissionContext对象。“
ApplicationSubmissionContext
represents all of the information needed by theResourceManager
to launch theApplicationMaster
for an application.” - getNewApplicationResponse(), 它返回一个GetNewApplicationResponse对象。
鉴于YarnClient的createApplication方法没有任何参数,而YarnClient本身的状态中由用户指定的部分只是YarnConfiguration的内容,因此这个createApplication方法并不会告诉YARN客户端对资源的需求,因此它返回的app对象只包含了yarn的RM本身的信息。
在获取了app这个对象之后,submitApplication方法通过
1
|
val newAppResponse = app.getNewApplicationResponse |
从中取出了newAppResponse这个对象,然后从中取出了当前YARN集群最多支持的内存和CPU数目(TODO:这个值是当前可用的资源的值,还是整体上最大资源值)。然后对比给AM申请的container想要的内存和CPU,如果超出了YARN支持的最大值,就抛出异常。
否则,就把从newAppResponse中获取的applicationId赋给appId。看来在第一次请求时,YARN就给分配了appId,只是这个appId,并不和资源关联。
第二调用YarnClient - 提交job
如果资源足够,AM就可以提交,那就开始填写AM运行需要的资源,具体来说就是组装ApplicationSubmissionContext类的一个对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
name match { case
Some(name) => { appCtx.setApplicationName(name) }
case
None => { appCtx.setApplicationName(appId.toString) }
} env match { case
Some(env) => {
containerCtx.setEnvironment(env)
info( "set environment variables to %s for %s"
format (env, appId.get))
}
case
None => None
} // set the local package so that the containers and app master are provisioned with it val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath) val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath) packageResource.setResource(packageUrl) info( "set package url to %s for %s"
format (packageUrl, appId.get))
packageResource.setSize(fileStatus.getLen) info( "set package size to %s for %s"
format (fileStatus.getLen, appId.get))
packageResource.setTimestamp(fileStatus.getModificationTime) packageResource.setType(LocalResourceType.ARCHIVE) packageResource.setVisibility(LocalResourceVisibility.APPLICATION) resource.setMemory(mem) info( "set memory request to %s for %s"
format (mem, appId.get))
resource.setVirtualCores(cpu) info( "set cpu core request to %s for %s"
format (cpu, appId.get))
appCtx.setResource(resource) containerCtx.setCommands(cmds.toList) info( "set command to %s for %s"
format (cmds, appId.get))
containerCtx.setLocalResources(Collections.singletonMap( "__package" , packageResource))
appCtx.setApplicationId(appId.get) info( "set app ID to %s"
format appId.get)
appCtx.setAMContainerSpec(containerCtx) appCtx.setApplicationType(ClientHelper.applicationType) info( "submitting application request for %s"
format appId.get)
yarnClient.submitApplication(appCtx) |
这段代码设置了一个ApplicationSubmissionContext对象,然后再用yarnClient把它提交。这样就提交了一个YARN job。
这样YarnClient一共用了两次,初始一次请求,获取appID和YARN的资源上限的情况,第二次请求,真正提交job。
这段代码让我有些疑惑。首先appCtx大致分为两部分,一部分是job的信息,比如application type和application ID,另一部分和AM有关。和AM有关的部分又可以分成两块: 1. cpu和内存的大小,这两个资源组装在Resource这个类的对象里,由setResource设置到 appCtx中 2:运行container所需的命令和文件、环量变量,这部分设置在一个ContainerLaunchContext对象中,然后这个对象再被调置在appCtx中。疑惑的地方在于:为什么AM所需的资源要分成两部分呢?cpu和内存本就该是container申请的一部分呀?
看看API里关于containerLaunchContext类的说明,就更不明白了
ContainerLaunchContext
represents all of the information needed by theNodeManager
to launch a container.It includes details such as:
ContainerId
of the container.Resource
allocated to the container.- User to whom the container is allocated.
- Security tokens (if security is enabled).
LocalResource
necessary for running the container such as binaries, jar, shared-objects, side-files etc.- Optional, application-specific binary service data.
- Environment variables for the launched process.
- Command to launch the container.
好吧,“Resource
allocated
to the container.”,
这一条ContainerLanchContext并没有体现,在它提供的方法中并不能设置Resource。这不是骗人吗?
而appCtx却有单独的一个setAMContainerSpec 方法来设置Resource。那么在申请运行task所需的container时,如果说明其所需的资源呢?看来一定不是用了这个ContainerLaunchContext对象。
两个不同的协议
Samza AM为task申请container的代码在SamzaAppMasterTaskManager这个类里
1
2
3
4
5
6
7
8
9
|
protected
def requestContainers(memMb: Int, cpuCores: Int, containers: Int) {
info( "Requesting %d container(s) with %dmb of memory"
format (containers, memMb))
val capability = Records.newRecord(classOf[Resource])
val priority = Records.newRecord(classOf[Priority])
priority.setPriority( 0 )
capability.setMemory(memMb)
capability.setVirtualCores(cpuCores)
( 0
until containers).foreach(idx => amClient.addContainerRequest( new
ContainerRequest(capability, null , null , priority)))
} |
这里的amClient就是org.apache.hadoop.yarn.client.api.async.AMRMClientAsync类的对象。它用来和RM联系,处理container相关的事情。当AM请求container时,它就不用submitApplication中为AM设置container资源所需的那套动作了,而是使用ContainerRequest这类。而且ContainerRequest的构造方法中
1
|
public ContainerRequest(Resource capability, String[] nodes, String[] racks, Priority priority, boolean
relaxLocality)
|
使用了Resource做为参数。
可见为AM申请container和为task申请container走的过程的确不一样。毕竟,为AM的运行申请container是作为提交任务的一部分。最终发现两个是使用的不同的协议。提交任务时,使用的是这个协议:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
message ApplicationSubmissionContextProto { optional ApplicationIdProto application_id = 1 ;
optional string application_name = 2
[ default
= "N/A" ];
optional string queue = 3
[ default
= "default" ];
optional PriorityProto priority = 4 ;
optional ContainerLaunchContextProto am_container_spec = 5 ;
optional bool cancel_tokens_when_complete = 6
[ default
= true ];
optional bool unmanaged_am = 7
[ default
= false ];
optional int32 maxAppAttempts = 8
[ default
= 0 ];
optional ResourceProto resource = 9 ;
optional string applicationType = 10
[ default
= "YARN" ];
} message ContainerLaunchContextProto { repeated StringLocalResourceMapProto localResources = 1 ;
optional bytes tokens = 2 ;
repeated StringBytesMapProto service_data = 3 ;
repeated StringStringMapProto environment = 4 ;
repeated string command = 5 ;
repeated ApplicationACLMapProto application_ACLs = 6 ;
} |
ContainerLaunchContextProto里根本没有代表cpu和内存资源的ResourceProto,这个Protocol是在ApplicationSubmissionContextProto里。对照containerLaunchContext类的说明,的确显得很奇怪。
而申请container的请求,走的是
1
2
3
4
5
6
7
8
9
10
11
12
|
message ResourceRequestProto { optional PriorityProto priority = 1 ;
optional string resource_name = 2 ;
optional ResourceProto capability = 3 ;
optional int32 num_containers = 4 ;
optional bool relax_locality = 5
[ default
= true ];
} message ResourceProto { optional int32 memory = 1 ;
optional int32 virtual_cores = 2 ;
} |