从源码层面,总结下Hadoop客户端提交作业的流程:
1. 选择使用分布式环境通信协议,还是本地调试通信协议
org.apache.hadoop.mapreduce.Job#connect
2. 上传作业代码jobjar, libjar等,从本地文件系统到HDFS中去。
copyAndConfigureFiles(job, submitJobDir);
3. 拆分输入文件,生成splits
org.apache.hadoop.mapreduce.JobSubmitter#writeSplits()
a. 调用job.getInputFormat().getSplits()
b. 根据split size (通常就是HDFS block size), 将文件拆分成多个逻辑上的Split。
每个Split要记录它在逻辑文件中的字节起始位置, 和这个Split所在的HDFS chunk存储在了哪些HDFS datanode上:
c. 将所有Split写入HDFS上的同一个全局文件(<jobSubmitDir>/job.split)中。
这个全局文件的头部是:org.apache.hadoop.mapreduce.split.JobSplitWriter#SPLIT_FILE_HEADER ("SPL") + split version (1)
在这个全局文件内部,为每个Split, 依次写入 split serializer class name + 序列化了的业务数据,
org.apache.hadoop.mapreduce.split.JobSplitWriter#writeNewSplits
(
jobSubmitDir:
local: /tmp/hadoop/mapred/staging/<job id>
hdfs: /tmp/hadoop-yarn/staging/<user>/.staging/<job id>
)
d. 将所有Split的描述信息写入<jobSubmitDir>/job.splitmetainfo中。包括每个Split在job.split全局文件中的偏移量、长度,存储这个Split的datanode的地址
org.apache.hadoop.mapreduce.split.JobSplitWriter#writeJobSplitMetaInfo
4. 将客户端内存中初始化了的作业配置job.configuration写入HDFS <submitJobDir>/job.xml
5. 将作业包装成org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext,调用org.apache.hadoop.yarn.client.api.impl.YarnClientImpl#submitApplication()提交作业给Yarn (走Hadoop RPC)。