1. 前言
Schedulerx2.0提供了map模型,通过一个map方法就能将海量数据分布式到多台机器上分布式执行,随着业务方的深入使用,又提出了更多的需求,比如:
- 监听所有子任务完成的事件
- 处理所有子任务返回的订单号
- 汇总结果进行工作流数据传输
2. 简介
MapReduce模型是Map模型的扩展,废弃了postProcess方法,新增reduce接口,需要实现MapReduceJobProcessor。
MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。如果有子任务失败,reduce不会执行。
MapReduce模型,还能处理所有子任务的结果。子任务通过return ProcessResult(true, result)返回结果(比如返回订单号),reduce的时候,可以通过context拿到所有子任务的结果,进行相应的处理,不需要业务方自己做存储。注意:所有子任务结果会缓存在master节点,对内存有压力,建议子任务个数和result不要太大。
3. 接口
- public ProcessResult process(JobContext context) throws Exception; (必选)
- public ProcessResult map(List<? extends Object> taskList, String taskName); (必选)
- public ProcessResult reduce(JobContext context); (必选)
- public void kill(JobContext context); (可选)
4. 执行方式
和map模型一样,MapReduce模型,也支持如下执行方式:
- 并行计算:支持子任务300以下,有子任务列表。
- 内存网格:基于内存计算,子任务5W以下,速度快。
- 网格计算:基于文件计算,子任务100W以下。
5. 原理
Schedulerx2.0中,MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,原理如下图所示:
可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。
Reduce方法也会通过ProcessResult返回任务状态,只有所有子任务和reduce都返回true,才算这次实例成功。
6. 最佳实践
6.1 通过mapreduce进行工作流上下游传递
下面我举个例子,比如一个工作流JobA->JobB->JobC。JobA和JobC是java任务单机执行,JobB是网格计算MapReduce任务。代码如下:
public class TestJobA extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
System.out.println("hello JobA");
return new ProcessResult(true, String.valueOf(10));
}
}
public class TestJobB extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) {
String executorName = context.getTaskName();
if (isRootTask(context)) {
System.out.println("start root task");
String upstreamData = context.getUpstreamData().get(0).getData();
int dispatchNum = Integer.valueOf(upstreamData);
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (executorName.equals("Level1Dispatch")) {
String executor = (String)context.getTask();
System.out.println(executor);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true, "520");
}
}
public class TestJobC extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
System.out.println("hello JobC");
String upstreamData = context.getUpstreamData().get(0).getData();
System.out.print(upstreamData);
return new ProcessResult(true);
}
}
执行结果如下:
jobA输出了10,jobB产生了0~10个msg并通过reduce输出520,jobC打印520。
6.2 Mapreduce处理所有子任务结果,由reduce汇总
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum = 10;
if (context.getJobParameters() != null) {
dispatchNum = Integer.valueOf(context.getJobParameters());
}
if (isRootTask(context)) {
System.out.println("start root task");
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
Thread.sleep(2000);
return new ProcessResult(true, task);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
}
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}
reduce执行结果如下:
taskId:0, result:
taskId:1, result:msg_0
taskId:2, result:msg_1
taskId:3, result:msg_2
taskId:4, result:msg_3
taskId:5, result:msg_4
taskId:6, result:msg_5
taskId:7, result:msg_6
taskId:8, result:msg_7
taskId:9, result:msg_8
taskId:10, result:msg_9
taskId:11, result:msg_10
taskId=0表示的是根节点,一般不会返回结果,不需要管。