文章目录
环境
hadoop :1.0.0
java :1.8.0_171
启动haoop,并配置远程调试
-
指定远程调试监听端口8888
export HADOOP_CLIENT_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y" -
利用hadoop jar命令提交任务
hadoop jar XXX.jar main函数所在的类 -
配置idea
-
在org.apache.hadoop.util.RunJar的main函数入口下打断点
-
选择第三部配置的remote.hadoop.然后单击蜘蛛图标
-
最终结果如下:
代码解析
runJar
- 如果代码在打包的时候已经配置了mainClass,在提交jar包的时候可以不用添加mainClass的相关参数
Manifest manifest = jarFile.getManifest();
if (manifest != null) {
mainClassName = manifest.getMainAttributes().getValue("Main-Class");
}
jarFile.close();
- 创建临时文件夹和工作文件夹
File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
tmpDir.mkdirs();
if (!tmpDir.isDirectory()) {
System.err.println("Mkdirs failed to create " + tmpDir);
System.exit(-1);
}
final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
workDir.delete();
workDir.mkdirs();
if (!workDir.isDirectory()) {
System.err.println("Mkdirs failed to create " + workDir);
System.exit(-1);
}
- 添加钩子函数,在程序结束的时候清理文件
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
FileUtil.fullyDelete(workDir);
} catch (IOException e) {
}
}
});
- 解压jar包,使用动态代理得到mainClass,然后并执行
ClassLoader loader =
new URLClassLoader(classPath.toArray(new URL[0]));
Thread.currentThread().setContextClassLoader(loader);
Class<?> mainClass = Class.forName(mainClassName, true, loader);
Method main = mainClass.getMethod("main", new Class[] {Array.newInstance(String.class, 0).getClass()});
String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]);
try {
main.invoke(null, new Object[] { newArgs });
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
自定义mapreduce程序
- 自定义的mapreduce都会调用waitForCompletion()函数。其实现如下
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
jobClient.monitorAndPrintJob(conf, info);
} else {
info.waitForCompletion();
}
return isSuccessful();
}
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// Connect to the JobTracker and submit the job
// connect()函数里初始化了 jobClient
connect();
// 这里正是提交任务
info = jobClient.submitJobInternal(conf);
super.setJobID(info.getID());
state = JobState.RUNNING;
}
这个函数只做两件事,第一件事是提交任务,第二件事等待任务结束。
观察submit()的实现,程序依靠JobClient类来实现连接到集群和提交任务。
- JobClient解析
JobClient的构造函数如下:
public JobClient(JobConf conf) throws IOException {
setConf(conf);
// init()函数通过通过动态代理获得jobSubmitClient的代理对象
init(conf);
}
构造函数里调用了init()方法,其实现如下:
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
tasklogtimeout = conf.getInt(
TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
this.ugi = UserGroupInformation.getCurrentUser();
if ("local".equals(tracker)) {
conf.setNumMapTasks(1);
// 使用本地模式
// LocalJobRunner实现了JobSubmissionProtocol接口,并没有调用JobTracker
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
// 使用非本地模式
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}
由init()函数可知:如果mapred.job.tracker的配置值是local或者没有配置,则jobSubmitClient的实例是一个本地已经实现的LocalJobRunner。如果不是local,则jobSubmitClient只是一个RPC客户端,真正的实现是在远程的JobTracker。
- JobTracker解析
我们上面说到的东西我们画个图来总结一下:
- job 的 submit()函数调用connect函数
- connect函数创建jobclient
- jobclient对象在创建的时候,构造函数里调用init()函数
- init()函数根据配置信息选择创建LocalJobRunner还是Jobtracker的Rpc代理。
- job submit()函数通过函数jobclient的submitJobInternal提交任务。