分布式调度组件整合设计解析

前言

分布式调度组件从落地到如今已有一年多的时间,作为组件开发者在其中过程中也在不断思考该组件的实现提升点以及后续的功能拓展接入。

作为一个整合类型的组件设计,从使用者的角度来看,应该更多地掩盖整合前各种接入实现,专心关注在当前组件的使用过程。因此,整合过程中的第一要素,就是要拉平多个整合组件的差异,包括数据模型、功能实现、以及外部透出的呈现,保证不同底层实现的无缝切换。第二就是简化对接工作量,把常用配置项默认固化,减轻使用方的对接成本,专心关注到业务中去。

背景

分布式调度组件来源于常见的定时调度任务开放场景。

现有的服务应用大多都需要提供多实例部署能力,对应地,定时任务的开发和运行都需要考虑多个运行实例下的执行情况。常见的分布式定时调度解决方式大致分为两种类型:中心化调度以及去中心化调度。去中间化调度比较常见的就是Quarterz、Elastic Job等解决方案,其主要通过数据层面的共享,如Quarterz使用数据库,Elastic Job使用Zookeeper作为数据共享中心,各自节点实例自主抢占执行任务;中心化调度则拥有统一的调度中心服务。任务的注册、管理、执行都由调度中心进行存储与触发执行下发。相比之下,中心化调度拥有更全局的执行视野,能够更大程度地按照既定方案进行任务分发执行,对资源分配,执行策略,失败异常处理都有更加优异的处理手段。

考虑市面上成熟的中心化调度解决方案,同时适配阿里云环境与开源自建场景,决定对阿里云的SchedulerX与开源项目XXLJob进行整合,对二者进行统一封装,提供到研发人员使用,使其更多地关注业务实现层面,减轻重复的研发工作。

整合对象

设计目标

  • 呈现形式 :二方依赖包
  • 统一业务实现形式:统一的抽象类,并提供统一的触发参数以及返回结果封装
  • 自动扫描装配:借助Spring能够自动化地完成任务执行器的注册
  • 无代码改动的底层实现切换:仅允许配置项层面体现底层实现的差异

设计分析

1、统一整合装配实现的可行性分析

(1)XXLJob的装配实现分析

按照XXLJob的官方文档内容过,提供Bean模式的任务注册方式有两种:类层面或者是方法层面。

  • Bean模式-类层面

分布式调度组件整合设计解析

简单地说,就是开发者需要通过继承XXLJob提供的**JobHandler**类,实现指定的任务方法,再手动地往**XxlJobExecutor**类中调用**registJobHandler**方法进行手动添加。

  • Bean模式-方法层面

分布式调度组件整合设计解析
第二种装配思路就是通过注解进行注册驱动,对于在已被添加XxlJob()注解的方法,由XxlJob进行统一扫描等级注册。

从使用成本来讲,方法层面的Bean模式明显地更加轻便,只需要一个简单的注解便可进行声明,自动完成注册。但是整合组件的开发角度来看,方法层面的Bean模式过于简单,且注解驱动很难提供有效约束手段限制用户。此时,我们再把目光转到SchedulerX,寻找两者装配实现的共存方案。

(2)SchedulerX的装配实现分析

  • Java应用接入

    分布式调度组件整合设计解析
    由官网提供的代码实现样例来看,SchedulerX的实现方式也是实现JavaProcessor接口后,实现process方法。同时值得注意的是,实现业务类需要使用Component注解进行声明,说明SchedulerX的装配驱动是由Spring扫描Bean后再进行注册。

(3)可行性分析

回归设计目标的第二、三点:

  • 统一业务实现形式:统一的抽象类,并提供统一的触发参数以及返回结果封装
  • 自动扫描装配:借助Spring或手动步骤完成自动化地完成任务执行器的注册

    设计上,希望提供到用户的是一个简洁的抽象类,用户仅需要关注提供出去的统一实现类方法。以此做为基准,就需要梳理出组件还需要承担怎么样的职能和步骤,才能够使得任务逻辑在不同组件下执行。

    处于SchedulerX仅提供类层面的实现方案,因此XXLJob的实现选择上也只能同样地以类层面进行实现。

    从组件开发的层面,定制开发允许我们拥有更加高的*度进行二者的整合处理,于是,把视野放到装配机制,看看是否存在可行实现的突破口,或许能够在其中找到对应的处理方案,解决Java单继承的问题。

    SchedulerX的装配动作从代码样例来看是依赖Spring,借用@Component注解直接注册成为Bean。而XXLJob的装配实现需要用户手动把类实例添加了XxlJobExecutor,于是留给我们的操作空间十分广阔,可以*选择装配时机,装配方式,并且无Spring依赖,只针对类实例。

2、统一封装抽象类的设计

面向用户端的实现封装,考虑使用抽象类,限制用户继承后的可实例化。提供任务运行业务实现方法,并且,需要设计实现方法的请求参数以及返回内容,保证外部调用时能够传递进入必要的请求返回和返回准确的运行结果。

public abstract class AbstractGtsSchedulerTaskProcessor {

    /**
     * 获取任务标识id
     *
     * @return 任务标识id
     */
    public abstract String getTaskId();

    /**
     * 任务业务执行方法
     *
     * @param parameter 任务执行入参
     * @return 任务执行结果
     * @throws Exception 执行异常
     */
    public abstract GtsSchedulerTaskResult process(GtsSchedulerTaskParameter parameter) throws Exception;

 定义抽象类AbstractGtsSchedulerTaskProcessor,其中提供两个抽象方法需要提供给用户进行实现。

  • 方法getTaskId是用于给当前定义的定时任务提供唯一的可识别标识。
  • 方法process是提供给开发者的业务执行逻辑实现。
  • 请求参数使用GtsSchedulerTaskParameter作为统一请求。
public class GtsSchedulerTaskParameter {

    /**
     * 任务执行入参
     */
    private String customData;

    /**
     * 分片序号
     */
    private Long shardingNo;
    
    /**
     * 分片总数
     */
    private Long shardingTotal;
}
  • 返回参数使用GtsSchedulerTaskResult作为执行结果的统一返回
public class GtsSchedulerTaskResult {

    /**
     * 任务执行状态
     */
    private GtsSchedulerTaskStatusEnum status;

    /**
     * 错误信息
     */
    private String errorMessage;

    /**
     * 执行结果返回
     */
    private String result;

    public static GtsSchedulerTaskResult successResult(String result) {
        return new GtsSchedulerTaskResult()
                .setStatus(GtsSchedulerTaskStatusEnum.SUCCESS).setResult(result);
    }

    public static GtsSchedulerTaskResult successResult() {
        return new GtsSchedulerTaskResult()
                .setStatus(GtsSchedulerTaskStatusEnum.SUCCESS);
    }

    public static GtsSchedulerTaskResult failResult() {
        return new GtsSchedulerTaskResult()
                .setStatus(GtsSchedulerTaskStatusEnum.FAIL);
    }

    public static GtsSchedulerTaskResult failResult(String errorMessage) {
        return new GtsSchedulerTaskResult()
                .setStatus(GtsSchedulerTaskStatusEnum.FAIL).setErrorMessage(errorMessage);
    }
}

3、适配器模式的使用

上面完成了统一封装实现的基本设计,就需要来解决两种调用中心的注册装配问题。

两种调用中心执行方法与我们现有的统一封装设计是不匹配的,需要有中间步骤进行请求参数和返回结果转换。因此,引入适配器模式,使用适配类作为对接XXLJob和SchedulerX的实际对象,再通过调用用户自身实现的AbstractGtsSchedulerTaskProcessor子类方法完成实际任务的调用,并在其中加入请求参数与返回接口转换的动作逻辑。

以下为XXLJob的适配器:

public class XxlJobTaskAdaptor extends IJobHandler implements TaskConvert<String, ReturnT<String>> {

    private final AbstractGtsSchedulerTaskProcessor callback;

    public XxlJobTaskAdaptor(AbstractGtsSchedulerTaskProcessor callback) {
        this.callback = callback;
    }

    /**
     * XXLJob实际调用执行方法
     *
     * @param param
     * @return
     * @throws Exception
     */
    @Override
    public ReturnT<String> execute(String param) {

        GtsSchedulerTaskParameter gtsTaskParameter = convertContext(param);
        try {
            GtsSchedulerTaskResult result = callback.process(gtsTaskParameter);
            return convertResult(result);
        } catch (Exception e) {
            log.error("[Task Execute inner error]task id:{}", callback.getTaskId(), e);
            return new ReturnT<>(ReturnT.FAIL_CODE, ExceptionUtils.getFullStackTrace(e));
        }
    }

    /**
     * 任务框架入参转换到业务入参
     *
     * @param param
     * @return
     */
    @Override
    public GtsSchedulerTaskParameter convertContext(String param) {
       // 任务框架入参转换到业务入参
    }

    /**
     * 业务结果转换到任务框架结果
     *
     * @param result
     * @return
     */
    @Override
    public ReturnT<String> convertResult(GtsSchedulerTaskResult result) {
        // 业务结果转换到任务框架结果
    }

4、Starter驱动配置读取与自动装配

再次把目光回放到设计目标的第三、四点:

  • 自动扫描装配:借助Spring能够自动化地完成任务执行器的注册
  • 无代码改动的底层实现切换:仅允许配置项层面体现底层实现的差异

(1)装配注册

  • SchedulerX的装配方式是直接使用@Component注解然后通过getBeanName进行执行实例的获取
public static JobProcessor create(String type) {
        JobProcessor jobProcessor = null;
        String className = ConfigUtil.getWorkerConfig().getString(WorkerConstants.WORKER_EXECUTOR_PREFIX + type);
        if (className != null) {
            try {
                if (SpringContext.context != null && "java".equals(type)) {
                    jobProcessor = SpringContext.getBean(className, SchedulerxWorker.CUSTOMER_CLASS_LOADER);
                } else {
                    jobProcessor = ReflectionUtil.getInstanceByClassName(className, SchedulerxWorker.CUSTOMER_CLASS_LOADER);
                }
            } catch (Throwable e) {
                LOGGER.error("", e);
            }
        }
        return jobProcessor;
    }

在考虑统一使用Spring装配的环境下,我们仅需要考虑XXLJob的装配方式了。

对于用户的自行实现任务类,我们需要进行扫描获取全部的实现类,然后手动创建适配器XxlJobTaskAdaptor后进行手动调用registJobHandler方法。

@Configuration
@EnableConfigurationProperties(XxlJobProperties.class)
@ConditionalOnClass(XxlJobExecutor.class)
@ConditionalOnExpression("'${gts.schedule.type}'.equalsIgnoreCase('xxlJob')")
public class XxlJobConfiguration {

    // 使用AutoWeird把全局的抽象实现Bean进行集中获取
    @Getter
    @Autowired(required = false)
    public List<AbstractGtsSchedulerTaskProcessor> taskProcessorList = new ArrayList<>();

    @Bean
    public XxlJobExecutor xxlJobExecutor(XxlJobProperties properties) {
        XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
        String logPath = System.getProperty("user.home") + "/logs/xxlJob";
        xxlJobExecutor.setLogPath(logPath);
        xxlJobExecutor.setPort(properties.getPort());
        xxlJobExecutor.setAdminAddresses(properties.getAddress());
        xxlJobExecutor.setAppname(properties.getAppName());
        xxlJobExecutor.setIp(properties.getIp());
        xxlJobExecutor.setLogRetentionDays(properties.getLogRetentionDays());
        xxlJobExecutor.setAccessToken(properties.getAccessToken());

        // super start
        try {
            xxlJobExecutor.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return xxlJobExecutor;
    }

    @PostConstruct
    public void init() {
        // 手动注册IJobHandler
        if (!CollectionUtils.isEmpty(taskProcessorList)) {
            for (AbstractGtsSchedulerTaskProcessor taskProcessor : taskProcessorList) {
                XxlJobExecutor.registJobHandler(taskProcessor.getTaskId(), new XxlJobTaskAdaptor(taskProcessor));
            }
        }
    }
}

(2)配置项切换

处理拉平了任务调用实现以及任务装配,代码层面的差异化内容已经得以磨平了,剩下的就是两个调度中心客户端启动时需要的配置项内容的差异。在经过对比后发现两者的内容还是存在一定的业务出入,所以没有打算对配置内容字段进行统一,还是拆分不同的properties类进行独立装配。

  • XXLJob配置properties类
@ConfigurationProperties(prefix = "gts.schedule.xxl-job")
@ConditionalOnExpression("'${gts.schedule.type}'.equalsIgnoreCase('xxlJob')")
public class XxlJobProperties {

    /**
     * 使用xxl-job时,调度中心部署跟地址 :如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";
     */
    private String address;


    /**
     * xxl-job中 执行器AppName:执行器心跳注册分组依据;为空则关闭自动注册
     */
    private String appName;

    /**
     * xxl-job 中ip
     */
    private String ip;

    // ...

}

同时,由于使用组件类型是互斥的,我们通过使用@ConditionalOnExpression注解对两个组件的Configuration进行选择性装配,保证生效的实现符合当前使用的调度中心类型。

(3)Starter配置

最后,为了实现二方包引入能够在SpringBoot功能实现自动装配,需要配置spring.factories文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.aliyun.gts.bpaas.schedule.TaskRegisterAutoConfiguration

TaskRegisterAutoConfiguration类作为Starter自动装配的驱动入口。

后记

分布式调度组件本质上来说并非有很大的技术实现难点,但是它代表的是如何通过有效的整合,抹平不同对接方的差异,减轻多种同功能类型组件的对接工作量的实践过程。希望大家注意到:整个组件的实践思路,从现状分析,抽象设计,实现开发一系列步骤下来,我们都始终关注紧扣着设计目标。清晰的目标和边界才是驱动优异实践落地的明灯。

上一篇:阿里分布式任务调度SchedulerX2.0支持Dataworks任务


下一篇:汉诺塔问题