Schedulerx2.0支持MapReduce模型

1. 前言

Schedulerx2.0提供了map模型,通过一个map方法就能将海量数据分布式到多台机器上分布式执行,随着业务方的深入使用,又提出了更多的需求,比如:

  • 监听所有子任务完成的事件
  • 处理所有子任务返回的订单号
  • 汇总结果进行工作流数据传输

2. 简介

MapReduce模型是Map模型的扩展,废弃了postProcess方法,新增reduce接口,需要实现MapReduceJobProcessor。

MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。如果有子任务失败,reduce不会执行。

MapReduce模型,还能处理所有子任务的结果。子任务通过return ProcessResult(true, result)返回结果(比如返回订单号),reduce的时候,可以通过context拿到所有子任务的结果,进行相应的处理,不需要业务方自己做存储。注意:所有子任务结果会缓存在master节点,对内存有压力,建议子任务个数和result不要太大。

3. 接口

  • public ProcessResult process(JobContext context) throws Exception; (必选)
  • public ProcessResult map(List<? extends Object> taskList, String taskName); (必选)
  • public ProcessResult reduce(JobContext context); (必选)
  • public void kill(JobContext context); (可选)

4. 执行方式

和map模型一样,MapReduce模型,也支持如下执行方式:

  • 并行计算:支持子任务300以下,有子任务列表。
  • 内存网格:基于内存计算,子任务5W以下,速度快。
  • 网格计算:基于文件计算,子任务100W以下。

5. 原理

Schedulerx2.0中,MapReduce模型只有一个reduce,所有子任务完成后会执行reduce方法,原理如下图所示:
Schedulerx2.0支持MapReduce模型
可以在reduce方法中返回该任务实例的执行结果,作为工作流的上下游数据传递。
Reduce方法也会通过ProcessResult返回任务状态,只有所有子任务和reduce都返回true,才算这次实例成功。

6. 最佳实践

6.1 通过mapreduce进行工作流上下游传递

下面我举个例子,比如一个工作流JobA->JobB->JobC。JobA和JobC是java任务单机执行,JobB是网格计算MapReduce任务。代码如下:

public class TestJobA extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("hello JobA");
        return new ProcessResult(true, String.valueOf(10));
    }

}
public class TestJobB extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) {
        String executorName = context.getTaskName();
        if (isRootTask(context)) {
            System.out.println("start root task");
            String upstreamData = context.getUpstreamData().get(0).getData();
            int dispatchNum = Integer.valueOf(upstreamData);
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (executorName.equals("Level1Dispatch")) {
            String executor = (String)context.getTask();
            System.out.println(executor);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "520");
    }

}
public class TestJobC extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        System.out.println("hello JobC");
        String upstreamData = context.getUpstreamData().get(0).getData();
        System.out.print(upstreamData);
        return new ProcessResult(true);
    }

}

执行结果如下:
Schedulerx2.0支持MapReduce模型
jobA输出了10,jobB产生了0~10个msg并通过reduce输出520,jobC打印520。

6.2 Mapreduce处理所有子任务结果,由reduce汇总

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 10;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

reduce执行结果如下:

taskId:0, result:
taskId:1, result:msg_0
taskId:2, result:msg_1
taskId:3, result:msg_2
taskId:4, result:msg_3
taskId:5, result:msg_4
taskId:6, result:msg_5
taskId:7, result:msg_6
taskId:8, result:msg_7
taskId:9, result:msg_8
taskId:10, result:msg_9
taskId:11, result:msg_10

taskId=0表示的是根节点,一般不会返回结果,不需要管。

上一篇:Akka in Schedulerx2.0


下一篇:SchedulerX2.0支持一次性任务