1 概述
在hadoop1.X中作业提交到JobTracker时,用了非常典型的事件模型,将源码抽象一下。
2 监听者
1个监听接口:需要关注的行为
2个监听者:具体实现类
public interface Listener { /** * 监听作业添加 * * @param job 作业对象 */ public void jobAdded(Job job); /** * 监听作业删除 * * @param job 作业对象 */ public void jobDeleted(Job job); } /** * 作业初始化监听器 * * 源码中调用jobTracker的initJob方法 * */ public class EagerTaskInitializationListener implements Listener { public void jobAdded(Job job) { System.out.println("作业被添加:" + job.getName() + " 执行操作1........"); } public void jobDeleted(Job job) { System.out.println("作业被删除:" + job.getName() + " 执行操作1........"); } } /** * 排序监听器 */ public class JobQueueJobInProgressListener implements Listener { public void jobAdded(Job job) { System.out.println("作业被添加:" + job.getName() + " 执行操作2........"); } public void jobDeleted(Job job) { System.out.println("作业被删除:" + job.getName() + " 执行操作2........"); } }
3 调度器
1个抽象调度器
1个具体调度器
public abstract class TaskScheduler { protected TaskTrackerManager taskTrackerManager; public void setTaskTrackerManager(TaskTrackerManager taskTrackerManager) { // 该对象就是JobTracker对象 this.taskTrackerManager = taskTrackerManager; } public void start() { } } public class JobQueueScheduler extends TaskScheduler { // 监听器1 private EagerTaskInitializationListener jobInProgressListener1; // 监听器2 private JobQueueJobInProgressListener jobInProgressListener2; /** * 初始化并注册监听器 */ public void start() { jobInProgressListener1 = new EagerTaskInitializationListener(); jobInProgressListener2 = new JobQueueJobInProgressListener(); // taskTrackerManager为父类中的对象,其实就是JobTracker taskTrackerManager.addJobInProgressListener(jobInProgressListener1); taskTrackerManager.addJobInProgressListener(jobInProgressListener2); } }
4 Tacker相关
1个抽象作业类
1个添加监听者的接口
1具体JobTracker
public class Job { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } } public interface TaskTrackerManager { // 添加监听者 public void addJobInProgressListener(Listener listener); } public class JobTracker implements TaskTrackerManager { // 调度器 private TaskScheduler scheduler; // 所有监听者 private List<Listener> jobInProgressListeners = new CopyOnWriteArrayList<Listener>(); /** * 启动时创建调度器 */ public JobTracker() { scheduler = new JobQueueScheduler(); } /** * 启动JobTracker */ public void startTracker() { // 将自身对象传给调度器 scheduler.setTaskTrackerManager(this); // 启动调度器 scheduler.start(); } /** * 作业被添加。通知所有注册的监听器 * * @param job 作业对象 */ public void addJob(Job job) { for (Listener listener : jobInProgressListeners) { listener.jobAdded(job); } } /** * 作业被删除。通知所有注册的监听器 * * @param job 作业对象 */ public void deleteJob(Job job) { for (Listener listener : jobInProgressListeners) { listener.jobDeleted(job); } } /** * 添加监听器 */ @Override public void addJobInProgressListener(Listener listener) { jobInProgressListeners.add(listener); } }
5 测试
public class TestJobTracker { public static void main(String[] args) { // 初始化 JobTracker jobTracker = new JobTracker(); jobTracker.startTracker(); // 添加作业 Job job = new Job(); job.setName("xy job"); jobTracker.addJob(job); // 删除作业 jobTracker.deleteJob(job); } }