dolphin scheduler(四)

TaskPriorityQueueImpl


前言

前面已经快速地讲了提交的任务被分发出去的时候是通过将任务推进队列*另一端进行消费的。介于master与worker端的一个服务对象,应该也是可以单独拿出来看一看。我认为它的主要类图就应该看的就是如下那几个~~。
dolphin scheduler(四)

一、TaskPriorityQueueImpl

这个队列就是实现了接口TaskPriorityQueue下的方法,并委托了PriorityBlockingQueue这个类帮它处理task信息,可以看一下比较的实现方法了解这个队列里的任务是基于什么规则排列任务的优先级。

@Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue {
    /**
     * queue size
     */
    private static final Integer QUEUE_MAX_SIZE = 3000;

    /**
     * queue
     */
    private PriorityBlockingQueue<String> queue = new PriorityBlockingQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());

    /**
     * put task takePriorityInfo
     *
     * @param taskPriorityInfo takePriorityInfo
     * @throws Exception
     */
    @Override
    public void put(String taskPriorityInfo) throws Exception {
        queue.put(taskPriorityInfo);
    }

    /**
     * take taskInfo
     * @return taskInfo
     * @throws Exception
     */
    @Override
    public String take() throws Exception {
        return queue.take();
    }

    /**
     * queue size
     * @return size
     * @throws Exception
     */
    @Override
    public int size() throws Exception {
        return queue.size();
    }

    /**
     * TaskInfoComparator
     */
    private class TaskInfoComparator implements Comparator<String>{

        /**
         * compare o1 o2
         * @param o1 o1
         * @param o2 o2
         * @return compare result
         */
        @Override
        public int compare(String o1, String o2) {
            String s1 = o1;
            String s2 = o2;
            String[] s1Array = s1.split(UNDERLINE);
            if(s1Array.length > TASK_INFO_LENGTH){
                // warning: if this length > 5, need to be changed
                s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
            }

            String[] s2Array = s2.split(UNDERLINE);
            if(s2Array.length > TASK_INFO_LENGTH){
                // warning: if this length > 5, need to be changed
                s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
            }

            return s1.compareTo(s2);
        }
    }
}

二、TaskPriorityQueueConsumer

此类才是使用上面队列实例的类,它也是继承了thread类的一个子类,
核心方法也是在run方法下面,看一下。里面的内容。

public void run() {
        List<String> failedDispatchTasks = new ArrayList<>();
        while (Stopper.isRunning()) {
            try {
                int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
                failedDispatchTasks.clear();
                for (int i = 0; i < fetchTaskNum; i++) {
                    if (taskPriorityQueue.size() <= 0) {
                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                        continue;
                    }
                    // if not task , blocking here
                    String taskPriorityInfo = taskPriorityQueue.take();
                    TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
                    boolean dispatchResult = dispatch(taskPriority.getTaskId());
                    if (!dispatchResult) {
                        failedDispatchTasks.add(taskPriorityInfo);
                    }
                }
                for (String dispatchFailedTask : failedDispatchTasks) {
                    taskPriorityQueue.put(dispatchFailedTask);
                }
            } catch (Exception e) {
                logger.error("dispatcher task error", e);
            }
        }
    }

就是从队列另一端拉取任务,然后将任务分发出去,如果分发失败则将任务塞回队列,等稍后继续尝试分发出去。

2.dispatcher方法

看一下它是如何分发任务的,他就是根据任务ID去数据库查询当前任务的实例详细信息,然后根据任务类型等条件构建任务执行上下文。接着又进一步包裹成执行上下文,并借助ExecutorDispatcher对象将此执行上下文分发出去。

 /**
     * dispatch task
     *
     * @param taskInstanceId taskInstanceId
     * @return result
     */
    private boolean dispatch(int taskInstanceId) {
        boolean result = false;
        try {
            TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
            ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());

            if (taskInstanceIsFinalState(taskInstanceId)) {
                // when task finish, ignore this task, there is no need to dispatch anymore
                return true;
            } else {
                result = dispatcher.dispatch(executionContext);
            }
        } catch (ExecuteException e) {
            logger.error("dispatch error", e);
        }
        return result;
    }

3.ExecutorDispatcher对象。

再继续追下去看一下到底干了啥,主要就是根据一定条件选择合适的worker 节点,然后将这个任务发给此worker,让它帮忙将任务提交到集群上运行。怎么选择合适的worker 不是这次关注的重点,继续去看一下怎么execute当前的任务的。

/**
     * task dispatch
     *
     * @param context context
     * @return result
     * @throws ExecuteException if error throws ExecuteException
     */
    public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
        /**
         * get executor manager
         */
        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
        if(executorManager == null){
            throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
        }

        /**
         * host select
         */

        Host host = hostManager.select(context);
        if (StringUtils.isEmpty(host.getAddress())) {
            throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker , " +
                            "current task need to %s worker group execute",
                    context.getCommand(),context.getWorkerGroup()));
        }
        context.setHost(host);
        executorManager.beforeExecute(context);
        try {
            /**
             * task execute
             */
            return executorManager.execute(context);
        } finally {
            executorManager.afterExecute(context);
        }
    }

4.NettyExecutorManager对象。

这次就是得委托了NettyExecutorManager对象帮执行这个任务了,去看一下execute的代码具体逻辑。

@Override
    public Boolean execute(ExecutionContext context) throws ExecuteException {

        /**
         *  all nodes
         */
        Set<String> allNodes = getAllNodes(context);

        /**
         * fail nodes
         */
        Set<String> failNodeSet = new HashSet<>();

        /**
         *  build command accord executeContext
         */
        Command command = context.getCommand();

        /**
         * execute task host
         */
        Host host = context.getHost();
        boolean success = false;
        while (!success) {
            try {
                doExecute(host,command);
                success = true;
                context.setHost(host);
            } catch (ExecuteException ex) {
                logger.error(String.format("execute command : %s error", command), ex);
                try {
                    failNodeSet.add(host.getAddress());
                    Set<String> tmpAllIps = new HashSet<>(allNodes);
                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (remained != null && remained.size() > 0) {
                        host = Host.of(remained.iterator().next());
                        logger.error("retry execute command : {} host : {}", command, host);
                    } else {
                        throw new ExecuteException("fail after try all nodes");
                    }
                } catch (Throwable t) {
                    throw new ExecuteException("fail after try all nodes");
                }
            }
        }

        return success;
    }
忽略其他的继续追它的doExecute方法,就是借助nettyRemoteClient将command信息发送出去。
private void doExecute(final Host host, final Command command) throws ExecuteException {
        /**
         * retry count,default retry 3
         */
        int retryCount = 3;
        boolean success = false;
        do {
            try {
                nettyRemotingClient.send(host, command);
                success = true;
            } catch (Exception ex) {
                logger.error(String.format("send command : %s to %s error", command, host), ex);
                retryCount--;
                ThreadUtils.sleep(100);
            }
        } while (retryCount >= 0 && !success);

        if (!success) {
            throw new ExecuteException(String.format("send command : %s to %s error", command, host));
        }
    }
下一篇 看一下nettyclient发送任务给worker端的细节。
上一篇:Go例程与操作系统线程的区别


下一篇:Quartz基础使用