Elastic-Job源码解读

写在前面

目前公司使用的作业调度工具是Elastic-Job,版本2.1.5,三月份因为失效转移配置出过一次线上事故,排查问题的过程中粗略的读了一下源码,刚好借此机会深入理解一下Elastic-Job

总体架构

Elastic-Job源码解读
注:图片来自https://github.com/elasticjob/elastic-job-lite

概述

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。(本文不讨论)

Elastic-Job核心组件:quartzZookeeper

  • quartz的角色是调度每台机器上的任务(即每台机器上的分片任务何时执行)
  • Zookeeper则是分布式调度中心

功能

Elastic-Job-Lite
分布式调度协调
弹性扩容缩容
失效转移
错过执行作业重触发
作业分片一致性
保证同一分片在分布式环境中仅一个执行实例
自诊断并修复分布式不稳定造成的问题
支持并行调度
支持作业生命周期操作
丰富的作业类型
Spring整合以及命名空间提供
运维平台

源码解读

任务初始化

JobScheduler

public class JobScheduler {
    
    /**
     * 两个固定的key值,存于Quartz的JobDetail#JobDataMap中
     */
    public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
    private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
 
     ...
    
    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) {
        // 添加作业实例
        JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance());
        // 作业配置
        this.liteJobConfig = liteJobConfig;
        // 注册中心
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        // 提供了一个分布式任务开始或完成时的前置后置扩展点,这里用户可以用来执行一些任务开启以及完成时的特定逻辑
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
        // 任务门面,一个任务一个jobFacade,封装了任务开启、任务失效转移、错过再执行、任务事件等
        jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus);
    }
    
    private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final List<ElasticJobListener> elasticJobListeners) {
        GuaranteeService guaranteeService = new GuaranteeService(regCenter, liteJobConfig.getJobName());
        for (ElasticJobListener each : elasticJobListeners) {
            if (each instanceof AbstractDistributeOnceElasticJobListener) {
                ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
            }
        }
    }
    
    /**
     * 初始化作业.
     */
    public void init() {
        // 更新作业配置到ZK
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        // 设置分片数
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        // 创建作业调度控制器
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        // 本地将job与jobScheduleController和注册中心关联起来,同时zk注册中心创建以jobName命名的节点
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        // 注册作业启动信息
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        // 调度任务
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

    /**
     * 配置Quartz
     * 这里体现的是Elastic Job和Quartz的融合,Quartz任务调度的具体语句是scheduler.scheduleJob(jobDetail, createTrigger(cron)),
     * 当任务设定时间到了之后,Quartz会去执行org.quartz.Job#execute(org.quartz.JobExecutionContext)方法,Elastic Job对应的Job实现是LiteJob
     *
     */
    private JobDetail createJobDetail(final String jobClass) {
        // Quartz描述调度任务的接口
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        // 注意这个地方的jobFacade和LiteJob中jobFacade是一致的
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                // 注意这个地方的elasticJob和LiteJob中elasticJob是一致的,用于任务调度时判断任务的类型
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }
    
    ...

任务触发

作业实例内任务触发是通过Quartz来完成的,按照cron设定的时间定时触发,Quartz触发Job的实现LiteJob#execute方法

LiteJob

public final class LiteJob implements Job {


    /**
     * 注意JobScheduler中的两个静态属性,JobScheduler初始化时将这两个字段存于Quartz的JobDetail#JobDataMap中,
     * Quartz在初始化LiteJob时,会从JobDetail的JobDataMap中取到这两个值,具体可见org.quartz.simpl.PropertySettingJobFactory#newJob(org.quartz.spi.TriggerFiredBundle, org.quartz.Scheduler)
     * public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
     * private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
     */
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        // Quartz任务调度的入口
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

AbstractElasticJobExecutor

public abstract class AbstractElasticJobExecutor {

    ...省略代码
    
    /**
     * 执行作业.
     */
    public final void execute() {
        try {
            jobFacade.checkJobExecutionEnvironment();
        } catch (final JobExecutionEnvironmentException cause) {
            jobExceptionHandler.handleException(jobName, cause);
        }
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
        }
        // 如果任务正在执行中,将分配给当前作业实例的分片都记录为misfire
        if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, String.format(
                        "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                        shardingContexts.getShardingItemParameters().keySet()));
            }
            return;
        }
        try {
            // 作业前置扩展
            jobFacade.beforeJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobExceptionHandler.handleException(jobName, cause);
        }
        // 1、执行任务
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
        // 2、错过再执行
        while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
            jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
            execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
        }
        // 3、失效转移
        jobFacade.failoverIfNecessary();
        try {
            // 作业后置扩展
            jobFacade.afterJobExecuted(shardingContexts);
            //CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            //CHECKSTYLE:ON
            jobExceptionHandler.handleException(jobName, cause);
        }
    }
    
    private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        if (shardingContexts.getShardingItemParameters().isEmpty()) {
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), JobStatusTraceEvent.State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
            }
            return;
        }
        // 记录任务启动
        jobFacade.registerJobBegin(shardingContexts);
        String taskId = shardingContexts.getTaskId();
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_RUNNING, "");
        }
        try {
            // 执行作业
            process(shardingContexts, executionSource);
        } finally {
            // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
            jobFacade.registerJobCompleted(shardingContexts);
            if (itemErrorMessages.isEmpty()) {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_FINISHED, "");
                }
            } else {
                if (shardingContexts.isAllowSendJobEvent()) {
                    jobFacade.postJobStatusTraceEvent(taskId, JobStatusTraceEvent.State.TASK_ERROR, itemErrorMessages.toString());
                }
            }
        }
    }
    
    private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        // 单分片,当前作业实例只有一个分片
        if (1 == items.size()) {
            int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
            JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
            process(shardingContexts, item, jobExecutionEvent);
            return;
        }
        final CountDownLatch latch = new CountDownLatch(items.size());
        // 多分片情况,一个分片一个线程
        for (final int each : items) {
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(new Runnable() {
                
                @Override
                public void run() {
                    try {
                        process(shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            // 协调多分片同步完成
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobExecutionEvent(startEvent);
        }
        log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
        JobExecutionEvent completeEvent;
        try {
            // 单分片任务处理,实际会转到用户自定义的执行内容
            process(new ShardingContext(shardingContexts, item));
            completeEvent = startEvent.executionSuccess();
            log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
            if (shardingContexts.isAllowSendJobEvent()) {
                jobFacade.postJobExecutionEvent(completeEvent);
            }
            // CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            // CHECKSTYLE:ON
            completeEvent = startEvent.executionFailure(cause);
            jobFacade.postJobExecutionEvent(completeEvent);
            itemErrorMessages.put(item, ExceptionUtil.transform(cause));
            jobExceptionHandler.handleException(jobName, cause);
        }
    }
    
    protected abstract void process(ShardingContext shardingContext);
}

SimpleJobExecutor

这里仅以SimpleJobExecutor为例,AbstractElasticJobExecutor还有两个实现ScriptJobExecutor、DataflowJobExecutor,内容雷同不赘述。

public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
    
    private final SimpleJob simpleJob;
    
    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
    
    @Override
    protected void process(final ShardingContext shardingContext) {
        // 用户自定义的任务执行内容
        simpleJob.execute(shardingContext);
    }
}

分片策略

/**
 * 基于平均分配算法的分片策略.
 * 
 * <p>
 * 如果分片不能整除, 则不能整除的多余分片将依次追加到序号小的服务器.
 * 如: 
 * 1. 如果有3台服务器, 分成9片, 则每台服务器分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
 * 2. 如果有3台服务器, 分成8片, 则每台服务器分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
 * 3. 如果有3台服务器, 分成10片, 则每台服务器分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].
 * </p>
 * 
 * @author zhangliang
 */
public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        // 整除的部分,每台机器平均分配
        Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
        // 不能整除的部分,从第一台机器开始,一台一个,直到分完为止
        addAliquant(jobInstances, shardingTotalCount, result);
        return result;
    }
    
    private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
    
    private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        // 从第一个开始,分配不均的分别加到各台机器上,直到分完为止
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            count++;
        }
    }
}

分布式

Elastic Job使用zk作为注册中心,作业实例信息、分片信息、配置信息、作业运行状态等均已节点方式存于zk。Elastic Job是通过zk的节点变更事件完成分布式任务协同,节点新增、变更、移除等事件会实时同步给分布式环境中的每个作业实例,Elastic Job提供了多种监听器来处理这些事件,监听器父类AbstractJobListener以及TreeCacheListener
Elastic-Job源码解读

TreeCacheListener

zk提供的节点变更监听接口

/**
 * Listener for {@link TreeCache} changes
 */
public interface TreeCacheListener
{
    /**
     * 监听zk事件变更
     * Called when a change has occurred
     *
     * @param client the client
     * @param event  describes the change
     * @throws Exception errors
     */
    public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception;
}

AbstractJobListener

Elastic Job封装的作业监听器

public abstract class AbstractJobListener implements TreeCacheListener {
    
    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        ChildData childData = event.getData();
        if (null == childData) {
            return;
        }
        String path = childData.getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
    }
    
    // 抽象方法,子类监听器按需实现
    protected abstract void dataChanged(final String path, final Type eventType, final String data);
}

JobCrashedJobListener

失效转移监听器

class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            // 1失效转移开启、2注册中心事件-节点移除,也就是一台服务器下线、3是instance路径,即jobName/instances路径
            if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
                // path,jobName/instances/ip-@-@pid
                // jobInstanceId是这个样子的ip-@-@pid
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                // 如果jobInstanceId和当前机器一致,直接跳过
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                // 获取失效转移的分片,对应zk目录jobName/sharding/分片号/failover,失效转移分片对应的实例id
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    // 如果有jobInstanceId的失效转移分片
                    for (int each : failoverItems) {
                        // 把分片存放到目录leader/failover/items
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    // 获取如果jobInstanceId没有失效转移分片对应的分片,然后存放到目录leader/failover/items/分片号,执行分片分片失效转移
                    // 从这里看只要是服务器宕机就一定要执行时效转移逻辑了,其实也不是,
                    // shardingService.getShardingItems(jobInstanceId)会判断服务器是否还可用,不可用的话返回的分片集合就是空的
                    // 但是,针对dump对内存导致的服务器短暂的不可用,则有可能出现错误,我们的任务异常启动就出现这里
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }

FailoverSettingsChangedJobListener

失效转移配置变更监听器,从控制台关闭失效转移时的处理逻辑,如果是开启的话本地无需处理

class FailoverSettingsChangedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !LiteJobConfigurationGsonFactory.fromJson(data).isFailover()) {
                failoverService.removeFailoverInfo();
            }
        }
    }

其他

JobRegistry 任务管理,一个JVM一个单例,记录任务和注册中心对应关心、任务状态、任务实例
SchedulerFacade 任务调度门面类,一个任务对应一个
JobNodeStorage 作业节点访问

ShardingNode zk节点名称构建规则
JobNodePath 作业节点构建

总结

文章以任务初始化、任务触发、分片策略、分布式为切入点讲述Elastic Job的源码,一方面自己总结记录、另一方面希望可以帮助到其他的开发者快读理解Elastic Job工作原理。

上一篇:eclipse下清除项目的svn信息


下一篇:Java源码英翻中网页演示