ZooKeeper Leader选举机制源码分析(二)选举核心方法 lookForLeader()

选举核心方法 lookForLeader() 业务逻辑分析

知道了选举相关的重要类及成员变量的作用以后,接下来我们开始分析真正执行选举逻辑的方法lookForLeader():

1) 选举前的准备工作
2) 将自己作为初始leader投出去
3)循环交换投票直至选出Leader,循环交换投票过程中,根据收到的投票发送者状态不同,有下面三种情况:
3.1) 发送者状态为LOOKING:
3.1.1) 验证自己与大家的投票谁更适合做leader
3.1.2) 判断本轮选举是否可以结束了
3.2) 发送者状态为OBSERVING:
3.3) 发送者状态为FOLLOWING/LEADING:

leader只要成功发出去一个消息,整个集群对这个消息就不会丢,因为leader选举,会选zxid最大的那个服务器。如果没发出去就挂了,消息就丢失了

1) 选举前的准备工作

public Vote lookForLeader() throws InterruptedException {
	// ----------------------- 1 选举前的初始化工作 ---------------------
    try {
        // Java Management eXtensions,Oracle提供的分布式应用程序监控技术
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(
                self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }

    if (self.start_fle == 0) {
	    //系统启动开始时间
        self.start_fle = Time.currentElapsedTime();
    }
    try {
        // recvset,receive set,用于存放来自于外部的选票,一个entry代表一次投票
        // key为投票者的serverid,value为选票
        // 该集合相当于投票箱
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        // outofelection,out of election,退出选举
        // 其中存放的是非法选票,即投票者的状态不是looking
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        // notTimeout,notification Timeout
        int notTimeout = finalizeWait;

        // ----------------------- 2 将自己作为初始leader投出去 ---------------------
        synchronized(this){
        ...

self.start_fle = Time.currentElapsedTime();

 为什么不用System.currentTimeMillis()?

因为系统时间是可以改的,不安全,并且系统时间返回的是毫秒,而currentElapsedTime是纳秒,更精确
ZooKeeper Leader选举机制源码分析(二)选举核心方法 lookForLeader()
获取相对于虚拟机的时间,就不会有系统时间的问题ZooKeeper Leader选举机制源码分析(二)选举核心方法 lookForLeader()
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

receive set,用于存放来自于外部的选票,一个entry代表一次投票
key为投票者的serverid,value为选票Vote
该集合相当于投票箱,票箱记录了集群中其他节点的投票结果
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

out of election,退出选举
其中存放的是非法选票,已经退出选举的Server发来的选票,即投票者的状态不是looking
int notTimeout = finalizeWait;

notification Timeout,通知超时时间,200毫秒
发出选票后收到回复允许等待的时间

2) 将自己作为初始leader投出去

 // ---------------------- 2 将自己作为初始化leader投出去 ----------------
            // notTimeout,notification timeout
            int notTimeout = finalizeWait;

            synchronized(this){
                // 逻辑时钟增一
                logicalclock.incrementAndGet();
                // 更新提案(更新自己的选票)
                // getInitId():返回当前server的id
                // getInitLastLoggedZxid():返回当前server的最大的zxid(最后一个zxid)
                // getPeerEpoch():返回当前Server的epoch
                // 将自己作为初始化leader,更新推荐信息
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            // 发送通知到队列
            sendNotifications();

logicalclock.incrementAndGet();
逻辑时钟加一
ZooKeeper Leader选举机制源码分析(二)选举核心方法 lookForLeader()
逻辑时钟可以这么理解:logicalclock代表选举逻辑时钟(类比现实中的第十八次全国人大、第十九次全国人大……),这个值从0开始递增,在同一次选举中,各节点的值基本相同,也有例外情况,比如在第18次选举中,某个节点A挂了,其他节点完成了Leader选举,但没过多久,该Leader又挂了,于是进入了第19次Leader选举,同时节点A此时恢复,加入到Leader选举中,那么节点A的logicallock为18,而其他节点的logicallock为19,针对这种情况,节点A的logicallock会被直接更新为19并参与到第19次Leader选举中。

updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());

更新当前server的推荐信息为当前server自己,注意该方法和logicalclock.incrementAndGet()一起是一个原子操作
synchronized void updateProposal(long leader, long zxid, long epoch){
    if(LOG.isDebugEnabled()){
        LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
                + Long.toHexString(zxid) + " (newzxid), " + proposedLeader
                + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
    }
    // 更新当前server的推荐信息
    // 上一章有说过,这三个字段是成员变量,记录当前Server所推荐的Leader信息
    proposedLeader = leader;
    proposedZxid = zxid;
    proposedEpoch = epoch;
}

getInitId():获取当前server的id

private long getInitId(){
	//判断是不是参与者,只有具有选举权的Server在选举时,才是参与者,否则是观察者OBSERVER
	//如果是参与者,返回当前Server的ServerId
    if(self.getLearnerType() == LearnerType.PARTICIPANT)
        return self.getId();
    else return Long.MIN_VALUE;
}

public enum LearnerType {
    PARTICIPANT, OBSERVER;
}

判断当前状态是否是参与者,即排除了观察者,不具有选举权的Server
具有选举权的Server在有Leader情况下才是Follower,在选举的情况下叫Participant,参与者

getInitLastLoggedZxid():获取当前server最后的(也是最大的)zxid,即事务Id

private long getInitLastLoggedZxid(){
	//同理判断是否具有选举权
    if(self.getLearnerType() == LearnerType.PARTICIPANT)
        return self.getLastLoggedZxid();
    else return Long.MIN_VALUE;
}

getPeerEpoch():获取当前server的epoch

private long getPeerEpoch(){
    if(self.getLearnerType() == LearnerType.PARTICIPANT)
    	try {
    		return self.getCurrentEpoch();
    	} catch(IOException e) {
    		RuntimeException re = new RuntimeException(e.getMessage());
    		re.setStackTrace(e.getStackTrace());
    		throw re;
    	}
    else return Long.MIN_VALUE;
}

sendNotifications();

将更新过的Ledaer推荐信息发送出去(将更新过的信息写入到一个发送队列,具体的发送逻辑不在这,上一章讲过,有专门的线程去处理)

/**
 * Send notifications to all peers upon a change in our vote
 */
private void sendNotifications() {
    // 遍历所有具有选举权的server
    for (QuorumServer server : self.getVotingView().values()) {
        long sid = server.id;

        // notmsg,notification msg
        ToSend notmsg = new ToSend(ToSend.mType.notification,//消息类型
                proposedLeader,//推荐的Leader的ServerId(myid)
                proposedZxid,//推荐的Leader的zxid
                logicalclock.get(),//此次选举的逻辑时钟
                QuorumPeer.ServerState.LOOKING,//当前Server的状态
                sid,    // 接受者的server id
                proposedEpoch);//推荐的Leader的epoch
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        //放入发送队列
        sendqueue.offer(notmsg);
    }
}

/**
 * Send notifications to all peers upon a change in our vote
 */
private void sendNotifications() {
    // 遍历所有具有选举权的server
    for (QuorumServer server : self.getVotingView().values()) {
        long sid = server.id;

        // notmsg,notification msg
        ToSend notmsg = new ToSend(ToSend.mType.notification,//消息类型
                proposedLeader,//推荐的Leader的ServerId(myid)
                proposedZxid,//推荐的Leader的zxid
                logicalclock.get(),//此次选举的逻辑时钟
                QuorumPeer.ServerState.LOOKING,//当前Server的状态
                sid,    // 接受者的server id
                proposedEpoch);//推荐的Leader的epoch
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        //放入发送队列
        sendqueue.offer(notmsg);
    }
}

遍历的是什么?
self.getVotingView().values(),返回的是所有具有选举权和被选举权的Server

public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
    return QuorumPeer.viewToVotingView(getView());
}

/**
* A 'view' is a node's current opinion(评价,称呼) of
* the membership(成员) of the entire(整个的) ensemble(全体).
* 翻译:“view”是一个节点(Server)对整个系统成员的当前称呼。
*/
// 获取到zk集群中的所有server(包含participant与observer)
public Map<Long,QuorumPeer.QuorumServer> getView() {
   return Collections.unmodifiableMap(this.quorumPeers);
}

static Map<Long,QuorumPeer.QuorumServer> viewToVotingView(Map<Long,QuorumPeer.QuorumServer> view) {
    Map<Long,QuorumPeer.QuorumServer> ret = new HashMap<Long, QuorumPeer.QuorumServer>();
    // 将observer给排除出去,只获取参与者,即具有选举权的Server
    for (QuorumServer server : view.values()) {
        if (server.type == LearnerType.PARTICIPANT) {
            ret.put(server.id, server);
        }
    }
    return ret;
}

ToSend notmsg = new ToSend(…)

notification msg,通知消息,即将推荐的Leader信息封装成ToSend对象放入发送队列,有专门线程去发送该消息
sid代表了消息接收者的server id

3) 循环交换投票直至选出Leader

将自己作为初始leader投出去以后,接下来就会一直循环处理接收到的选票信息:

// ----------------------- 3 循环交换投票直至选出Leader ---------------------
/*
 * Loop in which we exchange notifications until we find a leader
 * 循环交换通知,直到找到Leader
 */

while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
    /*
     * Remove next notification from queue, times out after 2 times
     * the termination time
     */
    // recvqueue,receive queue,其中存放着接受到的所有外来的通知
    // 有专门线程去处理接收其他Server发来的通知,并将接收到的信息解析封装成Notification 放入recvqueue队列
    Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);

    /*
     * Sends more notifications if haven't received enough.
     * Otherwise processes new notification.
     */
    if(n == null){
        if(manager.haveDelivered()){
            // 重新发送,目的是为了重新再接收
            sendNotifications();
        } else {
            // 重新连接zk集群中的每一个server
            manager.connectAll();
        }

        /*
         * Exponential backoff
         */
        int tmpTimeOut = notTimeout*2;
        notTimeout = (tmpTimeOut < maxNotificationInterval?
                tmpTimeOut : maxNotificationInterval);
        LOG.info("Notification time out: " + notTimeout);
    }
    else if(validVoter(n.sid) && validVoter(n.leader)) {
	   	//validVoter(n.sid):验证发送者的ServerId
	    //validVoter(n.leader):验证当前通知推荐的leader的ServerId
	...

while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){

循环交换通知,直到找到Leader(一但找到Leader,状态就不在是LOOKING) Notification n =
recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);

receive queue,其中存放着接受到的所有外来的通知
有专门线程去处理接收其他Server发来的通知,并将接收到的信息解析封装成Notification 放入recvqueue队列

可以看到会有从recvqueue取出通知为空的情况

什么情况取出来是空呢?
假如广播出去8个,由于网络原因可能只收到3个,第四次取的时候就是空的
还有可能收到8个了,但是选举还没结束,再次取的时候也是空的
总之就是为了保证选举还没结束的时候,能继续收到其他Server的选票,并继续处理判断,直到选出Leader
if(manager.haveDelivered()){ //简单来说该方法就是判断是否和集群失联,返回false表示失联

manager:QuorumCnxManager,连接管理器,维护了服务器之间的TCP连接
haveDelivered:判断是否已经被交付,即检查所有队列是否为空,表示所有消息都已传递。

boolean haveDelivered() {
    for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
        LOG.debug("Queue size: " + queue.size());
        if (queue.size() == 0) {
            return true;
        }
    }

    return false;
}

queueSendMap就是之前说的连接管理器维护的发送给其他Server失败的消息副本的Map
只要有一个队列为0就返回true,后面就不看了,因为之前说过只要有一个队列为空,就说明当前Server与zk集群的连接没有问题
只有当所有队列都不为空,才说明当前Server与zk集群失联
sendNotifications();

如果**manager.haveDelivered()**返回true,表明当前Server和集群连接没有问题,所以重新发送当前Server推荐的Leader的选票通知,目的是为了重新再接收其他Server的回复
manager.connectAll();

如果**manager.haveDelivered()**返回false,表明当前Server和集群已经失联,所以重新连接zk集群中的每一个server

public void connectAll(){
    long sid;
    for(Enumeration<Long> en = queueSendMap.keys();
        en.hasMoreElements();){
        sid = en.nextElement();
        connectOne(sid);
    }      
}

为什么重连了,不需要重新发送通知了呢?
因为我失联了,但是发送队列中的消息是还再的,重新连接后会重新继续发送,而且其他Server在recvqueue.poll为null的时候,如果没有和集群失联,也会重新sendNotifications,所以这里是不需要的。

int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);

重新发送通知或者重连集群后,将通知超时时间扩大两倍,如果超过最大通知时间,将超时时间置为最大时间
else if(validVoter(n.sid) && validVoter(n.leader)) {

如果从recvqueue取出的投票通知不为空,会先验证投票的发送者和推荐者是否合法,合法了再继续处理

// 验证指定server是否合法
private boolean validVoter(long sid) {
	//即判断是否具有选举权和被选举权
    return self.getVotingView().containsKey(sid);
}

3.1) 收到的投票发送者状态为LOOKING: 3.1.1) 验证自己与大家的投票谁更适合做leader

...
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
	...
    if(n == null){...}
    else if(validVoter(n.sid) && validVoter(n.leader)) {
        switch (n.state) {
            case LOOKING:
            	// 3.1.1) 验证自己与大家的投票谁更适合做leader
                // If notification > current, replace and send messages out
                // n.electionEpoch:外来通知所在选举的逻辑时钟
                // logicalclock.get():获取到当前server的逻辑时钟
                // 处理当前选举过时的情况:清空票箱,更新逻辑时钟
                if (n.electionEpoch > logicalclock.get()) {
                    // 更新当前server所在的选举的逻辑时钟
                    logicalclock.set(n.electionEpoch);
                    // 清空票箱
                    recvset.clear();
                    // 判断当前server与n谁更适合做leader,无论谁更适合,
                    // 都需要更新当前server的推荐信息,然后广播出去
                    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                        getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                    } else {
                        updateProposal(getInitId(),
                            getInitLastLoggedZxid(),
                            getPeerEpoch());
                    }
                    sendNotifications();
                    // 处理算来n过时的情况:n对于当前选举没有任何用处,直接丢掉
                } else if (n.electionEpoch < logicalclock.get()) {
                    if(LOG.isDebugEnabled()){
                        LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                            + Long.toHexString(n.electionEpoch)
                            + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                    }
                    break;
                    // 处理n.electionEpoch 与 logicalclock.get() 相等的情况
                    // totalOrderPredicate()用于判断外来n与当前server所推荐的leader
                    // 谁更适合做新的leader
                } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)) {
                    // 更新当前server的推荐信息
                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                    // 广播出去
                    sendNotifications();
                }

                if(LOG.isDebugEnabled()){
                    LOG.debug("Adding vote: from=" + n.sid +
                        ", proposed leader=" + n.leader +
                        ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                        ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                }
                // 将外来n通知封装为一个选票,投放到“选票箱”
                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
				// n.sid:通知发送者的ServerId
				// n.leader,n.zxid,n.peerEpoch:所推荐Leader的相关信息
				// n.electionEpoch:外来通知所在选举的逻辑时钟
                // ----------------------- 3.1.2) 判断本轮选举是否可以结束了 ---------------------
                if (termPredicate(recvset,
                    new Vote(proposedLeader, proposedZxid,
                            logicalclock.get(), proposedEpoch))) {
                ...

n.electionEpoch:外来通知所在选举的逻辑时钟
logicalclock.get():获取到当前server选举的逻辑时钟

正常情况,在选举的时候每一个Server的electionEpoch应该都是相同的,即他们是在同一轮选举,是通过当前currentEpoch+1获得的,不是同步获得的。也有例外情况,比如在第18次选举中,某个节点A挂了,其他节点完成了Leader选举,但没过多久,该Leader又挂了,于是进入了第19次Leader选举,同时节点A此时恢复,加入到Leader选举中,那么节点A的logicallock为18,而其他节点的logicallock为19,针对这种情况,节点A的logicallock会被直接更新为19并参与到第19次Leader选举中。

这个时候需要比较选票所在的选举的逻辑时钟和当前Server选举的逻辑时钟是否相等,通过比较n.electionEpoch和
logicalclock.get()的值,有三种情况:

什么情况外来投票大,或者小呢?
比如5个机器,已经选举好Leader了,有两个已经通知了,另外两个不知道,这个时候刚上任的Leader又突然挂了,还没通知到另外两个机器的时候,就会造成这种情况,已经通知的那两个epoch会再次重新选举的,逻辑时钟会再加一,即epoch会在加一,未通知的那两个epoch还没变
站在未通知的Server角度,在接收到已经通知的Server回复的时候,就会发现回复的通知epoch更大
站在已经通知的Server角度,在接受到未通知的Server发来的通知时,会发现自己比通知的epoch大

if (n.electionEpoch > logicalclock.get()) {…}

处理n.electionEpoch 比 logicalclock.get() 大的情况(外来投票epoch大)
自己已经过时了,选谁都没有意义,所以做如下操作:
logicalclock.set(n.electionEpoch):更新当前server所在的选举的逻辑时钟
recvset.clear():清空票箱,之前收集的投票都已经过时了,没意义了。
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch()):判断外来n与当前server谁更适合做新的leader(注意不是当前Server所推荐的,而就是当前Server)
updateProposal(…):选择更适合的更新当前server的推荐信息
sendNotifications():将自己的选票广播出去
else if (n.electionEpoch < logicalclock.get()) {…}

处理n.electionEpoch 比 logicalclock.get() 小的情况(外来投票epoch小)
说明外来的过时了,它的选票没有意义,不做任何处理,直接break掉switch,重新进入循环,从recvqueue取下一个通知,继续处理
else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {…}

处理n.electionEpoch 与 logicalclock.get() 相等的情况,即他们在同一轮选举中
totalOrderPredicate(…):断言,判断外来n与当前server所推荐的leader谁更适合做新的leader,返回true,则n(外来的)更适合
如果返回true,即外来的更合适,则执行下面方法:
updateProposal():更新当前server的推荐信息
sendNotifications():广播出去
处理完上面情况后,如果没有break,即是外来选票的逻辑时钟更大,或者相等,代表外来选票有效,则将选票放入选票箱:

recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

将外来n通知封装为一个选票,投放到“选票箱”

特殊情况:当前服务器收到外来通知发现外来通知推荐的leader更适合以后,会更新自己的推荐信息并再次广播出去,这个时候recvqueue除了第一次广播推荐自己收到的回复外,还会收到新一轮广播的回复,对于其他Server而言有可能会回复两次通知,但对于本地Server是没有影响的,因为投票箱recvset是一个Map,key是发送消息的服务器的ServerId,每个Server只会记录一个投票,新的会覆盖旧的

接下来会尝试走《3.1.2 判断本轮选举是否可以结束了》这一步,但是如果刚开始选举,要达到相同选票过半才结束选举,所以肯定不会结束,里面的逻辑是不会走的,所以直接break掉switch,然后会循环到一开始从recvqueue取下一个通知,继续处理…

totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)

totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())

判断谁更适合做leader
该方法返回true,表示外来的更适合,即new更适合

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
            Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    // 获取权重,observer的权重为0,如果为0是observer就return false
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }
    // zxid:其为一个 64 位长度的 Long 类型,其中高 32 位表示 epoch,低 32 位表示 xid。
    // 先比较前32位,如果newEpoch > curEpoch,肯定newZxid > curZxid,直接返回true
    // 如果newEpoch 和 curEpoch相同
    // 在看Zxid,实际上比较的就是xid(前32位相等),如果newZxid > curZxid,直接返回true
    // 如果Zxid也相同,就比较ServerId
    return ((newEpoch > curEpoch) || 
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

3.1.2) 判断本轮选举是否可以结束了

...
case LOOKING:
	....
    // ----------------------- 3.1.2) 判断本轮选举是否可以结束了 ---------------------
    /*
 	 * 尝试通过现在已经收到的信息,判断是否已经足够确认最终的leader了,通过方法termPredicate() ,
 	 * 判断标准很简单:是否已经有超过半数的机器所推举的leader为当前自己所推举的leader.
 	 * 如果是,保险起见,最多再等待finalizeWait(默认200ms)的时间进行最后的确认,
 	 * 如果发现有了更新的leader信息,则把这个Notification重新放回recvqueue,显然,选举将继续进行。
 	 * 否则,选举结束,根据选举的leader是否是自己,设置自己的状态为LEADING或者OBSERVING或者FOLLOWING。
 	 */
    if (termPredicate(recvset,
        new Vote(proposedLeader, proposedZxid,
                logicalclock.get(), proposedEpoch))) {

        // Verify if there is any change in the proposed leader
        // 该循环有两个出口:
        // break:从该出口跳出,说明n的值不为null,说明在剩余的通知中找到了更适合做leader的通知
        // while()条件:从该出口跳出,说明n的值为null,说明在剩余的通知中没有比当前server所推荐的leader更适合的了
        while((n = recvqueue.poll(finalizeWait,
            TimeUnit.MILLISECONDS)) != null){
            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                proposedLeader, proposedZxid, proposedEpoch)){
                // 将更适合的n重新放回到recvqueue,以便对其进行重新投票
                recvqueue.put(n);
                break;
            }
        }

        // 若n为null,则说明当前server所推荐的leader就是最终的leader,
        // 则此时就可以进行收尾工作了
        if (n == null) {
            // 修改当前server的状态,非leader即following
            self.setPeerState((proposedLeader == self.getId()) ?
                ServerState.LEADING: learningState());
            // 形成最终选票
            Vote endVote = new Vote(proposedLeader,
                                proposedZxid,
                                logicalclock.get(),
                                proposedEpoch);
            // 清空recvqueue队列
            leaveInstance(endVote);
            return endVote;
        }
    }
    break;

if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) {…}

终止断言:判断当前Server所推荐的leader在票箱中的支持率是否过半

/**
 * 终止断言。给定一组选票,决定是否有足够的票数宣布选举结束。
 */
protected boolean termPredicate(
        HashMap<Long, Vote> votes,
        Vote vote) {

    HashSet<Long> set = new HashSet<Long>();

    // 遍历票箱:从票箱中查找与选票vote相同的选票
    for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())){
            set.add(entry.getKey());
        }
    }

    return self.getQuorumVerifier().containsQuorum(set);
}

org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum

/**
 * Verifies if a set is a majority.
 */
public boolean containsQuorum(Set<Long> set){
    return (set.size() > half);
}

half就是集群总数的一半
ZooKeeper Leader选举机制源码分析(二)选举核心方法 lookForLeader()
可以看到一定要大于一半,等于一半也不行,这也是服务器数量推荐奇数的原因。
基于该理论,由 5 台主机构成的集群,最多只允许 2 台宕机(至少要有3票)。而由 6 台构成的集群,其最多也只允许 2 台宕机(3票不过半,至少4票)。即,6 台与5 台的容灾能力是相同的。基于此容灾能力的原因,建议使用奇数台主机构成集群,以避免资源浪费。但从系统吞吐量上说,6 台主机的性能一定是高于 5 台的。所以使用 6 台主机并不是资源浪费。

已经过半了,但是recvqueue里面的通知还没处理完,还有可能有更适合的Leader通知

如果有更合适的,将通知重新加入recvqueue队列的尾部,并break退出循环,此时n != null ,不会进行收尾动作,会重新进行选举,最终还是会更新当前Server的推荐信息为这个更适合的Leader,并广播出去
如果没有,即n为null,则说明当前server所推荐的leader就是最终的leader,则此时就可以进行收尾工作了

if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) {
    // 该循环有两个出口:
    // break:从该出口跳出,说明n的值不为null,说明在剩余的通知中找到了更适合做leader的通知
    // while()条件:从该出口跳出,说明n的值为null,说明在剩余的通知中没有比当前server所推荐的leader更适合的了
    while((n = recvqueue.poll(finalizeWait,
        TimeUnit.MILLISECONDS)) != null){
        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
            proposedLeader, proposedZxid, proposedEpoch)){
            // 如果发现有更合适的
            // 将更适合的n重新放回到recvqueue,以便对其进行重新投票
            recvqueue.put(n);
            // put:将指定的元素插入此队列的尾部,必要时等待可用的空间。
            break;
        }
    }

    // 若n为null,则说明当前server所推荐的leader就是最终的leader,
    // 则此时就可以进行收尾工作了
    if (n == null) {
		...
    }
}
break;

收尾工作:

 // 若n为null,则说明当前server所推荐的leader就是最终的leader,
  // 则此时就可以进行收尾工作了
  if (n == null) {
      // 修改当前server的状态,非leader即following
      // 如果推荐的Leader就是我自己,修改我当前状态为LEADING
      // 如果不是我自己,判断自己是否是参与者,如果是则状态置为FOLLOWING,否则是OBSERVING
      self.setPeerState((proposedLeader == self.getId()) ?
          ServerState.LEADING: learningState());
      // 形成最终选票
      Vote endVote = new Vote(proposedLeader,
                          proposedZxid,
                          logicalclock.get(),
                          proposedEpoch);
      // 清空recvqueue队列
      leaveInstance(endVote);
      // 返回最终选票
      return endVote;
  }
  
  private ServerState learningState(){
  	if(self.getLearnerType() == LearnerType.PARTICIPANT){
  		LOG.debug("I'm a participant: " + self.getId());
  		return ServerState.FOLLOWING;
  	}
  	else{
  		LOG.debug("I'm an observer: " + self.getId());
  		return ServerState.OBSERVING;
  	}
  }
  
  private void leaveInstance(Vote v) {
      if(LOG.isDebugEnabled()){
          LOG.debug("About to leave FLE instance: leader="
              + v.getId() + ", zxid=0x" +
              Long.toHexString(v.getZxid()) + ", my id=" + self.getId()
              + ", my state=" + self.getPeerState());
      }
      recvqueue.clear();
  }

3.2) 发送者状态为OBSERVING:

观察者是不参与Leader选举的,所以收到这样的选票不做任何处理

case OBSERVING: 
    LOG.debug("Notification from observer: " + n.sid);
    break;

3.3) 发送者状态为FOLLOWING/LEADING:

首先要清楚两点:
当一个Server接收到其它Server的通知后,无论自己处于什么状态,其都会向那个Server发送自己的通知
一个Server若能够接收到其它Server的通知,说明该Server不是Observer,而是Participant。因为sendNotifications()方法中是不给Observer发送的
这个n.state,是收到外来通知的那个发送者的Server的状态

zk 集群中的每一台主机,在不同的阶段会处于不同的状态。每一台主机具有四种状态。

LOOKING:选举状态
FOLLOWING:Follower 的正常工作状态
OBSERVING:Observer 的正常工作状态
LEADING:Leader 的正常工作状态

代码里面有处理Status为OBSERVING的通知:
为什么Observer会发通知?:
首先没读其他代码,所以不是很清楚,但是可以推测一下,如果新增Observer,在启动的时候,它怎么知道谁是Leader?肯定是发通知,别人会告诉它,只不过这个逻辑代码不在选举代码这里
case OBSERVING 与 else if(validVoter(n.sid) && validVoter(n.leader)) { 有矛盾,肯定不会走case OBSERVING,已经在else if里面过滤了,为什么这么写?:
有可能是为了解决线程安全问题,为了程序健壮性

while ((self.getPeerState() == ServerState.LOOKING) &&(!stop))//只要当前状态是LOOKING,即没有选出Leader会一直循环
{
    // recvqueue,receive queue,其中存放着接受到的所有外来的通知
    Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);

    if(n == null){
		...
    }
    else if(validVoter(n.sid) && validVoter(n.leader)) {
        switch (n.state) {
            case LOOKING:
			...
            case OBSERVING: 
                LOG.debug("Notification from observer: " + n.sid);
                break;
            // ----------------------- 3.3) 发送者状态为FOLLOWING/LEADING -----------------------
            // -----------------------     处理无需选举的情况 ---------------------
            // 首先要清楚两点:
            // 1) 当一个Server接收到其它Server的通知后,无论自己处于什么状态,
            //    其都会向那个Server发送自己的通知
            // 2) 一个Server若能够接收到其它Server的通知,说明该Server不是Observer
            //     而是Participant。因为sendNotifications()方法中是不给Observer发送的

            // 有两种场景会出现leader或follower给当前server发送通知:
            // 1)有新Server要加入一个正常运行的集群时,这个新的server在启动时,
            //    其状态为looking,要查找leader,其向外发送通知。此时的leader、
            //    follower的状态肯定不是looking,而分别是leading、following状态。
            //    当leader、follower接收到通知后,就会向其发送自己的通知
            //	  此时,当前Server选举的逻辑时间与其它follower或leader的epoch相同,也有可能不同
            //
            // 2)当其它Server已经在本轮选举中选出了新的leader,但还没有通知到当前Server
            //    所以当前Server的状态仍保持为looking而其它Server中的部分主机状态可能
            //    已经是leading或following了
            //    此时,当前Server选举的逻辑时间与其它follower或leader的epoch肯定相同

            // 经过分析可知,最终的两种场景是:
            // 1)当前Server选举的逻辑时间与其它follower或leader的epoch相同
            // 2)当前Server选举的逻辑时间与其它follower或leader的epoch不同
            case FOLLOWING:
            case LEADING:
            /*
             * Consider all notifications from the same epoch together.
             * 把来自同一时代的所有通知放在一起考虑。
             */
            if(n.electionEpoch == logicalclock.get()){
                recvset.put(n.sid, new Vote(n.leader,
                                              n.zxid,
                                              n.electionEpoch,
                                              n.peerEpoch));

                // 判断当前server是否应该退出本轮选举了
                // 其首先判断n所推荐的leader在当前Server的票箱中支持率是否过半
                // 若过半,再判断n所推荐的leader在outofelection中的状态是否合法
                // 若合法,则可以退出本轮选举了
                if(ooePredicate(recvset, outofelection, n)) {
                    // 收尾工作
                    self.setPeerState((n.leader == self.getId()) ?
                            ServerState.LEADING: learningState());

                    Vote endVote = new Vote(n.leader, 
                            n.zxid, 
                            n.electionEpoch, 
                            n.peerEpoch);
                    leaveInstance(endVote);
                    return endVote;
                }
            }

            /*
             * Before joining an established ensemble, verify
             * a majority is following the same leader.
             * 在加入一个既定的团队之前,要确认大多数人都是跟随同一个领导。
             */
            outofelection.put(n.sid, new Vote(n.version,
                                                n.leader,
                                                n.zxid,
                                                n.electionEpoch,
                                                n.peerEpoch,
                                                n.state));

            // 若n所推荐的leader在你们通知所形成的集合中的支持率过半,则
            // 我就知道谁是leader了,我就可以退出选举了
            if(ooePredicate(outofelection, outofelection, n)) {
                synchronized(this){
                    logicalclock.set(n.electionEpoch);
                    self.setPeerState((n.leader == self.getId()) ?
                            ServerState.LEADING: learningState());
                }
                Vote endVote = new Vote(n.leader,
                                        n.zxid,
                                        n.electionEpoch,
                                        n.peerEpoch);
                leaveInstance(endVote);
                return endVote;
            }
            break;
        default:
            LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                    n.state, n.sid);
            break;
        }
    } else {
		...
    }
}

有两种场景会出现leader或follower给当前server发送通知:

1)有新Server要加入一个正常运行的集群时,这个新的server在启动时,其状态为looking,要查找leader,其向外发送通知。此时的leader、follower的状态肯定不是looking,而分别是leading、following状态。当leader、follower接收到通知后,就会向其发送自己的通知
此时,当前Server选举的逻辑时间与其它follower或leader的epoch相同,也有可能不同
2)当其它Server已经在本轮选举中选出了新的leader,但还没有通知到当前Server所以当前Server的状态仍保持为looking而其它Server中的部分主机状态可能已经是leading或following了
此时,当前Server选举的逻辑时间与其它follower或leader的epoch肯定相同 经过分析可知,即最终的两种场景是:

当前Server选举的逻辑时间与其它follower或leader的epoch相同
当前Server选举的逻辑时间与其它follower或leader的epoch不同

epoch相同情况

case FOLLOWING:
case LEADING:
/*
 * Consider all notifications from the same epoch together.
 * 把来自同一时代的所有通知放在一起考虑。
 */
if(n.electionEpoch == logicalclock.get()){
    recvset.put(n.sid, new Vote(n.leader,
                                  n.zxid,
                                  n.electionEpoch,
                                  n.peerEpoch));

    // 判断当前server是否应该退出本轮选举了
    // 其首先判断n所推荐的leader在当前Server的票箱中支持率是否过半
    // 若过半,再判断n所推荐的leader在outofelection中的状态是否合法
    // 若合法,则可以退出本轮选举了
    if(ooePredicate(recvset, outofelection, n)) {
        // 收尾工作
        self.setPeerState((n.leader == self.getId()) ?
                ServerState.LEADING: learningState());

        Vote endVote = new Vote(n.leader, 
                n.zxid, 
                n.electionEpoch, 
                n.peerEpoch);
        leaveInstance(endVote);
        return endVote;
    }
}

recvset.put(n.sid,new Vote(…));
如果是相同逻辑时钟的选举的选票通知,则将其封装成选票放入票箱,注意此时虽然选票的状态不是FOLLOWING就是LEADING,但是因为是处于同一逻辑时钟的选票,所以认为是有效的
**if(ooePredicate(recvset, outofelection, n)) {…}
**
判断当前server是否应该退出本轮选举了 recvset:票箱
outofelection:其中存放的是非法选票,即投票者的状态不是looking的选票 怎么判断?
首先判断n所推荐的leader在当前Server的票箱中支持率是否过半(即第一个参数的集合中判断是否过半)
若过半,再判断n所推荐的leader在outofelection中的状态是否合法(从第二个参数的集合中判断是否合法)
若合法,则可以退出本轮选举了

如果 ooePredicate返回true,说明当前server退出本轮选举了,执行收尾工作:改变状态、生成最终选票、清空队列
如果 ooePredicate返回false,继续往下走,处理epoch不同情况的情况

如果是上面场景分析的第二种场景,这个时候recvset往往可能已经有很多票了,不可能是空的,有一定概率在这个时候就能选出Leader了,所以epoch相同的这段代码就是针对场景2的一种优化,加快选出Leader

epoch不同情况

注意epoch相同的时候会先处理相同的情况,此时若还没有决定Leader,还会继续处理epoch不同的情况,此时其实是针对上面说的场景1

outofelection.put(n.sid, new Vote(n.version,
                                    n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch,
                                    n.state));

// 若n所推荐的leader在你们(“你们”代表LEADING和FOLLOWING的Server)通知所形成的集合中的支持率过半,则
// 我就知道谁是leader了,我就可以退出选举了
if(ooePredicate(outofelection, outofelection, n)) {
    synchronized(this){
        logicalclock.set(n.electionEpoch);
        self.setPeerState((n.leader == self.getId()) ?
                ServerState.LEADING: learningState());
    }
    Vote endVote = new Vote(n.leader,
                            n.zxid,
                            n.electionEpoch,
                            n.peerEpoch);
    leaveInstance(endVote);
    return endVote;
}
break;

outofelection.put(n.sid, new Vote(xxxx));

将状态为FOLLOWING/LEADING的Server发来的放到outofelection非法选票集合里
if(ooePredicate(outofelection, outofelection, n)) {…}

场景1中新Server要加入一个正常运行的集群,选举的逻辑时钟肯定不一样,所以假设还没达到条件,会重新循环,再从队列取通知继续处理,在继续放到outofelection里…outofelection里选票越来越多。

若n所推荐的leader在你们(“你们”代表LEADING和FOLLOWING的Server)通知所形成的集合中的支持率过半,则我就知道谁是leader了,我就可以退出选举了

protected boolean ooePredicate(HashMap<Long,Vote> recv, 
                                HashMap<Long,Vote> ooe, 
                                Notification n) {
    // 首先判断n所推荐的leader在recv中的支持率是否过半,
    // 若过半,则执行checkLeader()。
    // 而checkLeader()方法用于判断leader的状态是否合法
    return (termPredicate(recv, new Vote(n.version, 
                                         n.leader,
                                         n.zxid, 
                                         n.electionEpoch, 
                                         n.peerEpoch, 
                                         n.state))
            && checkLeader(ooe, n.leader, n.electionEpoch));
    
}

termPredicate(recv, new Vote(n.xx…))
判断n选票在集合recv中是否过半,该方法上面讲过
checkLeader(…)
如果满足过半条件,才会执行该方法
用于判断leader的状态是否合法

/**
 * 翻译:在这种情况下有个一leader已经选举了出来,并且有法定Server支持该leader,
 * 我们必须检查这个leader是否投票并已确认过其领导。我们需要这种检查,以避免server
 * 反复地选择一个已经崩溃并且不再领导的leader。
 */
protected boolean checkLeader(
        HashMap<Long, Vote> votes,
        long leader,
        long electionEpoch){
	//先默认true,后面排除法
    boolean predicate = true;

    /*
     * If everyone else thinks I'm the leader, I must be the leader.
     * The other two checks are just for the case in which I'm not the
     * leader. If I'm not the leader and I haven't received a message
     * from leader stating that it is leading, then predicate is false.
     */

    if(leader != self.getId()){   
    	// 若推荐的leader是别人,我不是Leader
        if(votes.get(leader) == null) predicate = false;
        //如果在votes,即outofelection中不存在,肯定是false
        else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
        //如果存在,但是状态不是LEADING,也是false
    } else if(logicalclock.get() != electionEpoch) {
    	// 如果每个人都认为我是领导,那我就是领导。  
	    // 若推荐的leader是当前server,则判断为的逻辑时钟和推荐的epoch是否一样,不一样肯定是false
        predicate = false;
    } 

    return predicate;
}

上一篇:java云平台开发技术,成功定级腾讯T3-2


下一篇:分布式系统的可靠协调系统——Zookeeper