elastic-job源码分析(三)Leader选举

对于一个分布式系统来说,为了保证数据的一致性,通常会选择一个主节点执行数据的写入,然后从节点同步主节点数据,elastic-job同样是一个道理。
首先看一下主节点的选举代码

public void registerStartUpInfo(final boolean enabled) {
        listenerManager.startAllListeners();
        // leader选举
        leaderService.electLeader();
        serverService.persistOnline(enabled);
        instanceService.persistOnline();
        shardingService.setReshardingFlag();
        monitorService.listen();
        if (!reconcileService.isRunning()) {
            reconcileService.startAsync();
        }
    }

也是在启动之前,注册作业启动信息的时候进行的leader选举。具体看下是如何操作的

public void electLeader() {
        log.debug("Elect a new leader now.");
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

到第二块代码才真正的开始了leader的选举。这里用了curator框架提供的LeaderLatch进行选举,而LeaderLatch的方式则是一种抢占的方式来决定选主。各个节点通过在zookeeper上指定目录下建立临时顺序节点,然后对列表进行排序,排在第一个就是leader。上述代码中调用了start方法开始选主,然后调用await阻塞等待,一旦选举成功后会立即返回,但是先不从start看起,先从await看起,看一下要满足什么条件,然后再看选主过程。

public void await() throws InterruptedException, EOFException
    {
        synchronized(this)
        {
            while ( (state.get() == State.STARTED) && !hasLeadership.get() )
            {
                wait();
            }
        }
        if ( state.get() != State.STARTED )
        {
            throw new EOFException();
        }
    }

方法要想返回必须满足两个条件,一是state状态已经开启,另一个则是是否存在leader,这个变量默认值是false,也就是在某个地方会将其更改为true。带着这个疑问现在可以看start方法了:

public void start() throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    internalStart();
                }
                finally
                {
                    startTask.set(null);
                }
            }
        }));
    }

代码首先更改了state的值更改为started,这里就是刚才await条件之一,更改完毕之后就代表现在正式开始选择leader了。紧接着设置了一个异步任务,等待client连接建立之后执行。

private synchronized void internalStart()
    {
        if ( state.get() == State.STARTED )
        {
            client.getConnectionStateListenable().addListener(listener);
            try
            {
                reset();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                log.error("An error occurred checking resetting leadership.", e);
            }
        }
    }

这列主要是建立连接的监听器,方便client对server连接变化时做一些操作。其次调用了reset方法,

void reset() throws Exception
    {
        setLeadership(false);
        setNode(null);

        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }

                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }

直接初始化选主状态为false,然后将上次设置得路径设置为空,然后生成了一个匿名内部类,实现了里面的回调函数用于节点建立之后进行回调,最后是在latch下建立临时顺序节点。节点的名称有一点特别首先看下图:
elastic-job源码分析(三)Leader选举
为什么是这种格式呢?看下面的一行代码

public String forPath(final String givenPath, byte[] data) throws Exception
    {
        if ( compress )
        {
            data = client.getCompressionProvider().compress(givenPath, data);
        }
		// 构建临时顺序节点
        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
        List<ACL> aclList = acling.getAclList(adjustedPath);
        client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);

        String returnPath = null;
        if ( backgrounding.inBackground() )
        {
            pathInBackground(adjustedPath, data, givenPath);
        }
        else
        {
            String path = protectedPathInForeground(adjustedPath, data, aclList);
            returnPath = client.unfixForNamespace(path);
        }
        return returnPath;
    }

代码格式在adjustPath方法里面进行了调整,

String adjustPath(String path) throws Exception
    {
    	// 前面设置节点的时候设置了withProtection
        if ( protectedMode.doProtected() )
        {
            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
            // 这里会一次加上 _c_ 、uuid、-、最后时latch-
            String name = getProtectedPrefix(protectedMode.protectedId()) + pathAndNode.getNode();
            path = ZKPaths.makePath(pathAndNode.getPath(), name);
        }
        return path;
    }

在getProtectedPrefix时会依次加上_c_、uuid、-、latch-、最后那串有序数字则是zookeeper对于在latch下面建立临时顺序节点的时候临时自动编号节点。接着上面建节点说当节点建立成功的时候会回调函数,函数中调用了一个getChildren方法,接着往下走

private void getChildren() throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    checkLeadership(event.getChildren());
                }
            }
        };
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

跟之前的代码差不多,一个一名内部类实现了回调函数,下面使用client获取了latchPath下面的子节点,这个地方很关键,之前说过各个client会在指定目录下创建临时顺序节点,然后会对节点进行排序,排序的规则是取节点最后十位zk自动编号的数字。

private void checkLeadership(List<String> children) throws Exception
    {
        if ( debugCheckLeaderShipLatch != null )
        {
            debugCheckLeaderShipLatch.await();
        }
		// 获取本节点的全路径临时顺序节点
		// /schedulerJobTest/leader/election/latch/_c_72d42538-e234-46af-bc63-d22167559ac0-latch-0000000031
        final String localOurPath = ourPath.get();
        // 子节点排序后的集合,主要是通过sorter的fixForSorting截取后十位例如0000000031进行排序比较
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        // 本节点不存在时重新启动设置
        if ( ourIndex < 0 )
        {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        // 等于0说明自己处在第一位,是leader,设置为true的代码中会唤醒wait等待的线程。
        else if ( ourIndex == 0 )
        {
            setLeadership(true);
        }
        // 自己不是leader,但是要监听上一位当上一位节点被删除时重新进行选主
        else
        {
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };
			// 回调接口,当client getData操作执行后回调,调用reset重新进行启动过程
            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                	// 如果不存在该节点调用reset重新启动
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // previous node is gone - reset
                        reset();
                    }
                }
            };
            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }

上述代码是选举leader的关键代码,梳理一下步骤:

  1. 获取本client在latchPath下建立节点的全路径。
  2. 对latchPath下的节点进行排序。
  3. 根据排序后的结果查找本client的临时顺序节点的序号,看是否是leader(排在第一位),如果是leader设置hasLeadership(前面代码提到过,这个条件为true结束await)
  4. 如果不是leader,在获取前一个节点数据时设置监听节点的删除动作,并且在获取数据结束后回调判断节点是否存在,如果不存在则reset重新启动。

好了到这里LeaderLatch的选举原理差不多就到这里了。接下来继续看这块代码:

public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            // 开始leader选举
            latch.start();

            // 节点选举成功后会立即返回
            latch.await();
            // 被选举为leader之后调用回调函数,设置leader信息
            callback.execute();

        } catch (Exception ex) {
            handleException(ex);
            log.error("选举leader发生异常");
        }
    }

成为leader之后会立即从await中返回,然后调用回调函数,在函数中会先检查是否存在leader节点,没有的话就将本机的设置为leader信息。

class LeaderElectionExecutionCallback implements LeaderExecutionCallback {

        @Override
        public void execute() {
            // 如果还没有设置leader信息则设置,如果有leader信息说明leader成功了
           if (!hasLeader()) {
               jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
           }
        }
    }

这里是在启动的时候进行leader选举。除此之外当节点数据发生变化时,具体数据监听器在ElectionListenerManager类:一是不存在leader并且有server节点状态更改为enable,二是leader节点被移除,并且当前节点正在运行中,处于可用状态。符合这两种状况的任何一种都会重新进行leader选举。
所以leader选举大概三个地方,一是启动的时候,二三就是上面的两种情况了。

上一篇:基础篇--ES基础数据操作


下一篇:对话 Kibana 之父:如果需要,你该自己动手编写工具