选举核心方法 lookForLeader() 业务逻辑分析
知道了选举相关的重要类及成员变量的作用以后,接下来我们开始分析真正执行选举逻辑的方法lookForLeader():
1) 选举前的准备工作
2) 将自己作为初始leader投出去
3)循环交换投票直至选出Leader,循环交换投票过程中,根据收到的投票发送者状态不同,有下面三种情况:
3.1) 发送者状态为LOOKING:
3.1.1) 验证自己与大家的投票谁更适合做leader
3.1.2) 判断本轮选举是否可以结束了
3.2) 发送者状态为OBSERVING:
3.3) 发送者状态为FOLLOWING/LEADING:
leader只要成功发出去一个消息,整个集群对这个消息就不会丢,因为leader选举,会选zxid最大的那个服务器。如果没发出去就挂了,消息就丢失了
1) 选举前的准备工作
public Vote lookForLeader() throws InterruptedException {
// ----------------------- 1 选举前的初始化工作 ---------------------
try {
// Java Management eXtensions,Oracle提供的分布式应用程序监控技术
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
//系统启动开始时间
self.start_fle = Time.currentElapsedTime();
}
try {
// recvset,receive set,用于存放来自于外部的选票,一个entry代表一次投票
// key为投票者的serverid,value为选票
// 该集合相当于投票箱
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
// outofelection,out of election,退出选举
// 其中存放的是非法选票,即投票者的状态不是looking
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
// notTimeout,notification Timeout
int notTimeout = finalizeWait;
// ----------------------- 2 将自己作为初始leader投出去 ---------------------
synchronized(this){
...
self.start_fle = Time.currentElapsedTime();
为什么不用System.currentTimeMillis()?
因为系统时间是可以改的,不安全,并且系统时间返回的是毫秒,而currentElapsedTime是纳秒,更精确
获取相对于虚拟机的时间,就不会有系统时间的问题
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
receive set,用于存放来自于外部的选票,一个entry代表一次投票
key为投票者的serverid,value为选票Vote
该集合相当于投票箱,票箱记录了集群中其他节点的投票结果
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
out of election,退出选举
其中存放的是非法选票,已经退出选举的Server发来的选票,即投票者的状态不是looking
int notTimeout = finalizeWait;
notification Timeout,通知超时时间,200毫秒
发出选票后收到回复允许等待的时间
2) 将自己作为初始leader投出去
// ---------------------- 2 将自己作为初始化leader投出去 ----------------
// notTimeout,notification timeout
int notTimeout = finalizeWait;
synchronized(this){
// 逻辑时钟增一
logicalclock.incrementAndGet();
// 更新提案(更新自己的选票)
// getInitId():返回当前server的id
// getInitLastLoggedZxid():返回当前server的最大的zxid(最后一个zxid)
// getPeerEpoch():返回当前Server的epoch
// 将自己作为初始化leader,更新推荐信息
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
// 发送通知到队列
sendNotifications();
logicalclock.incrementAndGet();
逻辑时钟加一
逻辑时钟可以这么理解:logicalclock代表选举逻辑时钟(类比现实中的第十八次全国人大、第十九次全国人大……),这个值从0开始递增,在同一次选举中,各节点的值基本相同,也有例外情况,比如在第18次选举中,某个节点A挂了,其他节点完成了Leader选举,但没过多久,该Leader又挂了,于是进入了第19次Leader选举,同时节点A此时恢复,加入到Leader选举中,那么节点A的logicallock为18,而其他节点的logicallock为19,针对这种情况,节点A的logicallock会被直接更新为19并参与到第19次Leader选举中。
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
更新当前server的推荐信息为当前server自己,注意该方法和logicalclock.incrementAndGet()一起是一个原子操作
synchronized void updateProposal(long leader, long zxid, long epoch){
if(LOG.isDebugEnabled()){
LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
+ Long.toHexString(zxid) + " (newzxid), " + proposedLeader
+ " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
}
// 更新当前server的推荐信息
// 上一章有说过,这三个字段是成员变量,记录当前Server所推荐的Leader信息
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
getInitId():获取当前server的id
private long getInitId(){
//判断是不是参与者,只有具有选举权的Server在选举时,才是参与者,否则是观察者OBSERVER
//如果是参与者,返回当前Server的ServerId
if(self.getLearnerType() == LearnerType.PARTICIPANT)
return self.getId();
else return Long.MIN_VALUE;
}
public enum LearnerType {
PARTICIPANT, OBSERVER;
}
判断当前状态是否是参与者,即排除了观察者,不具有选举权的Server
具有选举权的Server在有Leader情况下才是Follower,在选举的情况下叫Participant,参与者
getInitLastLoggedZxid():获取当前server最后的(也是最大的)zxid,即事务Id
private long getInitLastLoggedZxid(){
//同理判断是否具有选举权
if(self.getLearnerType() == LearnerType.PARTICIPANT)
return self.getLastLoggedZxid();
else return Long.MIN_VALUE;
}
getPeerEpoch():获取当前server的epoch
private long getPeerEpoch(){
if(self.getLearnerType() == LearnerType.PARTICIPANT)
try {
return self.getCurrentEpoch();
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
else return Long.MIN_VALUE;
}
sendNotifications();
将更新过的Ledaer推荐信息发送出去(将更新过的信息写入到一个发送队列,具体的发送逻辑不在这,上一章讲过,有专门的线程去处理)
/**
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
// 遍历所有具有选举权的server
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
// notmsg,notification msg
ToSend notmsg = new ToSend(ToSend.mType.notification,//消息类型
proposedLeader,//推荐的Leader的ServerId(myid)
proposedZxid,//推荐的Leader的zxid
logicalclock.get(),//此次选举的逻辑时钟
QuorumPeer.ServerState.LOOKING,//当前Server的状态
sid, // 接受者的server id
proposedEpoch);//推荐的Leader的epoch
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
//放入发送队列
sendqueue.offer(notmsg);
}
}
/**
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
// 遍历所有具有选举权的server
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
// notmsg,notification msg
ToSend notmsg = new ToSend(ToSend.mType.notification,//消息类型
proposedLeader,//推荐的Leader的ServerId(myid)
proposedZxid,//推荐的Leader的zxid
logicalclock.get(),//此次选举的逻辑时钟
QuorumPeer.ServerState.LOOKING,//当前Server的状态
sid, // 接受者的server id
proposedEpoch);//推荐的Leader的epoch
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
//放入发送队列
sendqueue.offer(notmsg);
}
}
遍历的是什么?
self.getVotingView().values(),返回的是所有具有选举权和被选举权的Server
public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
return QuorumPeer.viewToVotingView(getView());
}
/**
* A 'view' is a node's current opinion(评价,称呼) of
* the membership(成员) of the entire(整个的) ensemble(全体).
* 翻译:“view”是一个节点(Server)对整个系统成员的当前称呼。
*/
// 获取到zk集群中的所有server(包含participant与observer)
public Map<Long,QuorumPeer.QuorumServer> getView() {
return Collections.unmodifiableMap(this.quorumPeers);
}
static Map<Long,QuorumPeer.QuorumServer> viewToVotingView(Map<Long,QuorumPeer.QuorumServer> view) {
Map<Long,QuorumPeer.QuorumServer> ret = new HashMap<Long, QuorumPeer.QuorumServer>();
// 将observer给排除出去,只获取参与者,即具有选举权的Server
for (QuorumServer server : view.values()) {
if (server.type == LearnerType.PARTICIPANT) {
ret.put(server.id, server);
}
}
return ret;
}
ToSend notmsg = new ToSend(…)
notification msg,通知消息,即将推荐的Leader信息封装成ToSend对象放入发送队列,有专门线程去发送该消息
sid代表了消息接收者的server id
3) 循环交换投票直至选出Leader
将自己作为初始leader投出去以后,接下来就会一直循环处理接收到的选票信息:
// ----------------------- 3 循环交换投票直至选出Leader ---------------------
/*
* Loop in which we exchange notifications until we find a leader
* 循环交换通知,直到找到Leader
*/
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
// recvqueue,receive queue,其中存放着接受到的所有外来的通知
// 有专门线程去处理接收其他Server发来的通知,并将接收到的信息解析封装成Notification 放入recvqueue队列
Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
// 重新发送,目的是为了重新再接收
sendNotifications();
} else {
// 重新连接zk集群中的每一个server
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(validVoter(n.sid) && validVoter(n.leader)) {
//validVoter(n.sid):验证发送者的ServerId
//validVoter(n.leader):验证当前通知推荐的leader的ServerId
...
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
循环交换通知,直到找到Leader(一但找到Leader,状态就不在是LOOKING) Notification n =
recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);receive queue,其中存放着接受到的所有外来的通知
有专门线程去处理接收其他Server发来的通知,并将接收到的信息解析封装成Notification 放入recvqueue队列
可以看到会有从recvqueue取出通知为空的情况
什么情况取出来是空呢?
假如广播出去8个,由于网络原因可能只收到3个,第四次取的时候就是空的
还有可能收到8个了,但是选举还没结束,再次取的时候也是空的
总之就是为了保证选举还没结束的时候,能继续收到其他Server的选票,并继续处理判断,直到选出Leader
if(manager.haveDelivered()){ //简单来说该方法就是判断是否和集群失联,返回false表示失联
manager:QuorumCnxManager,连接管理器,维护了服务器之间的TCP连接
haveDelivered:判断是否已经被交付,即检查所有队列是否为空,表示所有消息都已传递。
boolean haveDelivered() {
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
LOG.debug("Queue size: " + queue.size());
if (queue.size() == 0) {
return true;
}
}
return false;
}
queueSendMap就是之前说的连接管理器维护的发送给其他Server失败的消息副本的Map
只要有一个队列为0就返回true,后面就不看了,因为之前说过只要有一个队列为空,就说明当前Server与zk集群的连接没有问题
只有当所有队列都不为空,才说明当前Server与zk集群失联
sendNotifications();
如果**manager.haveDelivered()**返回true,表明当前Server和集群连接没有问题,所以重新发送当前Server推荐的Leader的选票通知,目的是为了重新再接收其他Server的回复
manager.connectAll();
如果**manager.haveDelivered()**返回false,表明当前Server和集群已经失联,所以重新连接zk集群中的每一个server
public void connectAll(){
long sid;
for(Enumeration<Long> en = queueSendMap.keys();
en.hasMoreElements();){
sid = en.nextElement();
connectOne(sid);
}
}
为什么重连了,不需要重新发送通知了呢?
因为我失联了,但是发送队列中的消息是还再的,重新连接后会重新继续发送,而且其他Server在recvqueue.poll为null的时候,如果没有和集群失联,也会重新sendNotifications,所以这里是不需要的。
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);
重新发送通知或者重连集群后,将通知超时时间扩大两倍,如果超过最大通知时间,将超时时间置为最大时间
else if(validVoter(n.sid) && validVoter(n.leader)) {
如果从recvqueue取出的投票通知不为空,会先验证投票的发送者和推荐者是否合法,合法了再继续处理
// 验证指定server是否合法
private boolean validVoter(long sid) {
//即判断是否具有选举权和被选举权
return self.getVotingView().containsKey(sid);
}
3.1) 收到的投票发送者状态为LOOKING: 3.1.1) 验证自己与大家的投票谁更适合做leader
...
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
...
if(n == null){...}
else if(validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// 3.1.1) 验证自己与大家的投票谁更适合做leader
// If notification > current, replace and send messages out
// n.electionEpoch:外来通知所在选举的逻辑时钟
// logicalclock.get():获取到当前server的逻辑时钟
// 处理当前选举过时的情况:清空票箱,更新逻辑时钟
if (n.electionEpoch > logicalclock.get()) {
// 更新当前server所在的选举的逻辑时钟
logicalclock.set(n.electionEpoch);
// 清空票箱
recvset.clear();
// 判断当前server与n谁更适合做leader,无论谁更适合,
// 都需要更新当前server的推荐信息,然后广播出去
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
// 处理算来n过时的情况:n对于当前选举没有任何用处,直接丢掉
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
// 处理n.electionEpoch 与 logicalclock.get() 相等的情况
// totalOrderPredicate()用于判断外来n与当前server所推荐的leader
// 谁更适合做新的leader
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
// 更新当前server的推荐信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
// 广播出去
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// 将外来n通知封装为一个选票,投放到“选票箱”
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// n.sid:通知发送者的ServerId
// n.leader,n.zxid,n.peerEpoch:所推荐Leader的相关信息
// n.electionEpoch:外来通知所在选举的逻辑时钟
// ----------------------- 3.1.2) 判断本轮选举是否可以结束了 ---------------------
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
...
n.electionEpoch:外来通知所在选举的逻辑时钟
logicalclock.get():获取到当前server选举的逻辑时钟
正常情况,在选举的时候每一个Server的electionEpoch应该都是相同的,即他们是在同一轮选举,是通过当前currentEpoch+1获得的,不是同步获得的。也有例外情况,比如在第18次选举中,某个节点A挂了,其他节点完成了Leader选举,但没过多久,该Leader又挂了,于是进入了第19次Leader选举,同时节点A此时恢复,加入到Leader选举中,那么节点A的logicallock为18,而其他节点的logicallock为19,针对这种情况,节点A的logicallock会被直接更新为19并参与到第19次Leader选举中。
这个时候需要比较选票所在的选举的逻辑时钟和当前Server选举的逻辑时钟是否相等,通过比较n.electionEpoch和
logicalclock.get()的值,有三种情况:
什么情况外来投票大,或者小呢?
比如5个机器,已经选举好Leader了,有两个已经通知了,另外两个不知道,这个时候刚上任的Leader又突然挂了,还没通知到另外两个机器的时候,就会造成这种情况,已经通知的那两个epoch会再次重新选举的,逻辑时钟会再加一,即epoch会在加一,未通知的那两个epoch还没变
站在未通知的Server角度,在接收到已经通知的Server回复的时候,就会发现回复的通知epoch更大
站在已经通知的Server角度,在接受到未通知的Server发来的通知时,会发现自己比通知的epoch大
if (n.electionEpoch > logicalclock.get()) {…}
处理n.electionEpoch 比 logicalclock.get() 大的情况(外来投票epoch大)
自己已经过时了,选谁都没有意义,所以做如下操作:
logicalclock.set(n.electionEpoch):更新当前server所在的选举的逻辑时钟
recvset.clear():清空票箱,之前收集的投票都已经过时了,没意义了。
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch()):判断外来n与当前server谁更适合做新的leader(注意不是当前Server所推荐的,而就是当前Server)
updateProposal(…):选择更适合的更新当前server的推荐信息
sendNotifications():将自己的选票广播出去
else if (n.electionEpoch < logicalclock.get()) {…}
处理n.electionEpoch 比 logicalclock.get() 小的情况(外来投票epoch小)
说明外来的过时了,它的选票没有意义,不做任何处理,直接break掉switch,重新进入循环,从recvqueue取下一个通知,继续处理
else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {…}
处理n.electionEpoch 与 logicalclock.get() 相等的情况,即他们在同一轮选举中
totalOrderPredicate(…):断言,判断外来n与当前server所推荐的leader谁更适合做新的leader,返回true,则n(外来的)更适合
如果返回true,即外来的更合适,则执行下面方法:
updateProposal():更新当前server的推荐信息
sendNotifications():广播出去
处理完上面情况后,如果没有break,即是外来选票的逻辑时钟更大,或者相等,代表外来选票有效,则将选票放入选票箱:
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
将外来n通知封装为一个选票,投放到“选票箱”
特殊情况:当前服务器收到外来通知发现外来通知推荐的leader更适合以后,会更新自己的推荐信息并再次广播出去,这个时候recvqueue除了第一次广播推荐自己收到的回复外,还会收到新一轮广播的回复,对于其他Server而言有可能会回复两次通知,但对于本地Server是没有影响的,因为投票箱recvset是一个Map,key是发送消息的服务器的ServerId,每个Server只会记录一个投票,新的会覆盖旧的
接下来会尝试走《3.1.2 判断本轮选举是否可以结束了》这一步,但是如果刚开始选举,要达到相同选票过半才结束选举,所以肯定不会结束,里面的逻辑是不会走的,所以直接break掉switch,然后会循环到一开始从recvqueue取下一个通知,继续处理…
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())
判断谁更适合做leader
该方法返回true,表示外来的更适合,即new更适合
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
// 获取权重,observer的权重为0,如果为0是observer就return false
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
// zxid:其为一个 64 位长度的 Long 类型,其中高 32 位表示 epoch,低 32 位表示 xid。
// 先比较前32位,如果newEpoch > curEpoch,肯定newZxid > curZxid,直接返回true
// 如果newEpoch 和 curEpoch相同
// 在看Zxid,实际上比较的就是xid(前32位相等),如果newZxid > curZxid,直接返回true
// 如果Zxid也相同,就比较ServerId
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
3.1.2) 判断本轮选举是否可以结束了
...
case LOOKING:
....
// ----------------------- 3.1.2) 判断本轮选举是否可以结束了 ---------------------
/*
* 尝试通过现在已经收到的信息,判断是否已经足够确认最终的leader了,通过方法termPredicate() ,
* 判断标准很简单:是否已经有超过半数的机器所推举的leader为当前自己所推举的leader.
* 如果是,保险起见,最多再等待finalizeWait(默认200ms)的时间进行最后的确认,
* 如果发现有了更新的leader信息,则把这个Notification重新放回recvqueue,显然,选举将继续进行。
* 否则,选举结束,根据选举的leader是否是自己,设置自己的状态为LEADING或者OBSERVING或者FOLLOWING。
*/
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
// 该循环有两个出口:
// break:从该出口跳出,说明n的值不为null,说明在剩余的通知中找到了更适合做leader的通知
// while()条件:从该出口跳出,说明n的值为null,说明在剩余的通知中没有比当前server所推荐的leader更适合的了
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
// 将更适合的n重新放回到recvqueue,以便对其进行重新投票
recvqueue.put(n);
break;
}
}
// 若n为null,则说明当前server所推荐的leader就是最终的leader,
// 则此时就可以进行收尾工作了
if (n == null) {
// 修改当前server的状态,非leader即following
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
// 形成最终选票
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
// 清空recvqueue队列
leaveInstance(endVote);
return endVote;
}
}
break;
if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) {…}
终止断言:判断当前Server所推荐的leader在票箱中的支持率是否过半
/**
* 终止断言。给定一组选票,决定是否有足够的票数宣布选举结束。
*/
protected boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
// 遍历票箱:从票箱中查找与选票vote相同的选票
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);
}
org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
/**
* Verifies if a set is a majority.
*/
public boolean containsQuorum(Set<Long> set){
return (set.size() > half);
}
half就是集群总数的一半
可以看到一定要大于一半,等于一半也不行,这也是服务器数量推荐奇数的原因。
基于该理论,由 5 台主机构成的集群,最多只允许 2 台宕机(至少要有3票)。而由 6 台构成的集群,其最多也只允许 2 台宕机(3票不过半,至少4票)。即,6 台与5 台的容灾能力是相同的。基于此容灾能力的原因,建议使用奇数台主机构成集群,以避免资源浪费。但从系统吞吐量上说,6 台主机的性能一定是高于 5 台的。所以使用 6 台主机并不是资源浪费。
已经过半了,但是recvqueue里面的通知还没处理完,还有可能有更适合的Leader通知
如果有更合适的,将通知重新加入recvqueue队列的尾部,并break退出循环,此时n != null ,不会进行收尾动作,会重新进行选举,最终还是会更新当前Server的推荐信息为这个更适合的Leader,并广播出去
如果没有,即n为null,则说明当前server所推荐的leader就是最终的leader,则此时就可以进行收尾工作了
if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) {
// 该循环有两个出口:
// break:从该出口跳出,说明n的值不为null,说明在剩余的通知中找到了更适合做leader的通知
// while()条件:从该出口跳出,说明n的值为null,说明在剩余的通知中没有比当前server所推荐的leader更适合的了
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
// 如果发现有更合适的
// 将更适合的n重新放回到recvqueue,以便对其进行重新投票
recvqueue.put(n);
// put:将指定的元素插入此队列的尾部,必要时等待可用的空间。
break;
}
}
// 若n为null,则说明当前server所推荐的leader就是最终的leader,
// 则此时就可以进行收尾工作了
if (n == null) {
...
}
}
break;
收尾工作:
// 若n为null,则说明当前server所推荐的leader就是最终的leader,
// 则此时就可以进行收尾工作了
if (n == null) {
// 修改当前server的状态,非leader即following
// 如果推荐的Leader就是我自己,修改我当前状态为LEADING
// 如果不是我自己,判断自己是否是参与者,如果是则状态置为FOLLOWING,否则是OBSERVING
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
// 形成最终选票
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock.get(),
proposedEpoch);
// 清空recvqueue队列
leaveInstance(endVote);
// 返回最终选票
return endVote;
}
private ServerState learningState(){
if(self.getLearnerType() == LearnerType.PARTICIPANT){
LOG.debug("I'm a participant: " + self.getId());
return ServerState.FOLLOWING;
}
else{
LOG.debug("I'm an observer: " + self.getId());
return ServerState.OBSERVING;
}
}
private void leaveInstance(Vote v) {
if(LOG.isDebugEnabled()){
LOG.debug("About to leave FLE instance: leader="
+ v.getId() + ", zxid=0x" +
Long.toHexString(v.getZxid()) + ", my id=" + self.getId()
+ ", my state=" + self.getPeerState());
}
recvqueue.clear();
}
3.2) 发送者状态为OBSERVING:
观察者是不参与Leader选举的,所以收到这样的选票不做任何处理
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
3.3) 发送者状态为FOLLOWING/LEADING:
首先要清楚两点:
当一个Server接收到其它Server的通知后,无论自己处于什么状态,其都会向那个Server发送自己的通知
一个Server若能够接收到其它Server的通知,说明该Server不是Observer,而是Participant。因为sendNotifications()方法中是不给Observer发送的
这个n.state,是收到外来通知的那个发送者的Server的状态
zk 集群中的每一台主机,在不同的阶段会处于不同的状态。每一台主机具有四种状态。
LOOKING:选举状态
FOLLOWING:Follower 的正常工作状态
OBSERVING:Observer 的正常工作状态
LEADING:Leader 的正常工作状态
代码里面有处理Status为OBSERVING的通知:
为什么Observer会发通知?:
首先没读其他代码,所以不是很清楚,但是可以推测一下,如果新增Observer,在启动的时候,它怎么知道谁是Leader?肯定是发通知,别人会告诉它,只不过这个逻辑代码不在选举代码这里
case OBSERVING 与 else if(validVoter(n.sid) && validVoter(n.leader)) { 有矛盾,肯定不会走case OBSERVING,已经在else if里面过滤了,为什么这么写?:
有可能是为了解决线程安全问题,为了程序健壮性
while ((self.getPeerState() == ServerState.LOOKING) &&(!stop))//只要当前状态是LOOKING,即没有选出Leader会一直循环
{
// recvqueue,receive queue,其中存放着接受到的所有外来的通知
Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);
if(n == null){
...
}
else if(validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
...
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
// ----------------------- 3.3) 发送者状态为FOLLOWING/LEADING -----------------------
// ----------------------- 处理无需选举的情况 ---------------------
// 首先要清楚两点:
// 1) 当一个Server接收到其它Server的通知后,无论自己处于什么状态,
// 其都会向那个Server发送自己的通知
// 2) 一个Server若能够接收到其它Server的通知,说明该Server不是Observer
// 而是Participant。因为sendNotifications()方法中是不给Observer发送的
// 有两种场景会出现leader或follower给当前server发送通知:
// 1)有新Server要加入一个正常运行的集群时,这个新的server在启动时,
// 其状态为looking,要查找leader,其向外发送通知。此时的leader、
// follower的状态肯定不是looking,而分别是leading、following状态。
// 当leader、follower接收到通知后,就会向其发送自己的通知
// 此时,当前Server选举的逻辑时间与其它follower或leader的epoch相同,也有可能不同
//
// 2)当其它Server已经在本轮选举中选出了新的leader,但还没有通知到当前Server
// 所以当前Server的状态仍保持为looking而其它Server中的部分主机状态可能
// 已经是leading或following了
// 此时,当前Server选举的逻辑时间与其它follower或leader的epoch肯定相同
// 经过分析可知,最终的两种场景是:
// 1)当前Server选举的逻辑时间与其它follower或leader的epoch相同
// 2)当前Server选举的逻辑时间与其它follower或leader的epoch不同
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch together.
* 把来自同一时代的所有通知放在一起考虑。
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
// 判断当前server是否应该退出本轮选举了
// 其首先判断n所推荐的leader在当前Server的票箱中支持率是否过半
// 若过半,再判断n所推荐的leader在outofelection中的状态是否合法
// 若合法,则可以退出本轮选举了
if(ooePredicate(recvset, outofelection, n)) {
// 收尾工作
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
* 在加入一个既定的团队之前,要确认大多数人都是跟随同一个领导。
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
// 若n所推荐的leader在你们通知所形成的集合中的支持率过半,则
// 我就知道谁是leader了,我就可以退出选举了
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
...
}
}
有两种场景会出现leader或follower给当前server发送通知:
1)有新Server要加入一个正常运行的集群时,这个新的server在启动时,其状态为looking,要查找leader,其向外发送通知。此时的leader、follower的状态肯定不是looking,而分别是leading、following状态。当leader、follower接收到通知后,就会向其发送自己的通知
此时,当前Server选举的逻辑时间与其它follower或leader的epoch相同,也有可能不同
2)当其它Server已经在本轮选举中选出了新的leader,但还没有通知到当前Server所以当前Server的状态仍保持为looking而其它Server中的部分主机状态可能已经是leading或following了
此时,当前Server选举的逻辑时间与其它follower或leader的epoch肯定相同 经过分析可知,即最终的两种场景是:当前Server选举的逻辑时间与其它follower或leader的epoch相同
当前Server选举的逻辑时间与其它follower或leader的epoch不同
epoch相同情况
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch together.
* 把来自同一时代的所有通知放在一起考虑。
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
// 判断当前server是否应该退出本轮选举了
// 其首先判断n所推荐的leader在当前Server的票箱中支持率是否过半
// 若过半,再判断n所推荐的leader在outofelection中的状态是否合法
// 若合法,则可以退出本轮选举了
if(ooePredicate(recvset, outofelection, n)) {
// 收尾工作
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
recvset.put(n.sid,new Vote(…));
如果是相同逻辑时钟的选举的选票通知,则将其封装成选票放入票箱,注意此时虽然选票的状态不是FOLLOWING就是LEADING,但是因为是处于同一逻辑时钟的选票,所以认为是有效的
**if(ooePredicate(recvset, outofelection, n)) {…}
**
判断当前server是否应该退出本轮选举了 recvset:票箱
outofelection:其中存放的是非法选票,即投票者的状态不是looking的选票 怎么判断?
首先判断n所推荐的leader在当前Server的票箱中支持率是否过半(即第一个参数的集合中判断是否过半)
若过半,再判断n所推荐的leader在outofelection中的状态是否合法(从第二个参数的集合中判断是否合法)
若合法,则可以退出本轮选举了
如果 ooePredicate返回true,说明当前server退出本轮选举了,执行收尾工作:改变状态、生成最终选票、清空队列
如果 ooePredicate返回false,继续往下走,处理epoch不同情况的情况
如果是上面场景分析的第二种场景,这个时候recvset往往可能已经有很多票了,不可能是空的,有一定概率在这个时候就能选出Leader了,所以epoch相同的这段代码就是针对场景2的一种优化,加快选出Leader
epoch不同情况
注意epoch相同的时候会先处理相同的情况,此时若还没有决定Leader,还会继续处理epoch不同的情况,此时其实是针对上面说的场景1
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
// 若n所推荐的leader在你们(“你们”代表LEADING和FOLLOWING的Server)通知所形成的集合中的支持率过半,则
// 我就知道谁是leader了,我就可以退出选举了
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
outofelection.put(n.sid, new Vote(xxxx));
将状态为FOLLOWING/LEADING的Server发来的放到outofelection非法选票集合里
if(ooePredicate(outofelection, outofelection, n)) {…}场景1中新Server要加入一个正常运行的集群,选举的逻辑时钟肯定不一样,所以假设还没达到条件,会重新循环,再从队列取通知继续处理,在继续放到outofelection里…outofelection里选票越来越多。
若n所推荐的leader在你们(“你们”代表LEADING和FOLLOWING的Server)通知所形成的集合中的支持率过半,则我就知道谁是leader了,我就可以退出选举了
protected boolean ooePredicate(HashMap<Long,Vote> recv,
HashMap<Long,Vote> ooe,
Notification n) {
// 首先判断n所推荐的leader在recv中的支持率是否过半,
// 若过半,则执行checkLeader()。
// 而checkLeader()方法用于判断leader的状态是否合法
return (termPredicate(recv, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state))
&& checkLeader(ooe, n.leader, n.electionEpoch));
}
termPredicate(recv, new Vote(n.xx…))
判断n选票在集合recv中是否过半,该方法上面讲过
checkLeader(…)
如果满足过半条件,才会执行该方法
用于判断leader的状态是否合法
/**
* 翻译:在这种情况下有个一leader已经选举了出来,并且有法定Server支持该leader,
* 我们必须检查这个leader是否投票并已确认过其领导。我们需要这种检查,以避免server
* 反复地选择一个已经崩溃并且不再领导的leader。
*/
protected boolean checkLeader(
HashMap<Long, Vote> votes,
long leader,
long electionEpoch){
//先默认true,后面排除法
boolean predicate = true;
/*
* If everyone else thinks I'm the leader, I must be the leader.
* The other two checks are just for the case in which I'm not the
* leader. If I'm not the leader and I haven't received a message
* from leader stating that it is leading, then predicate is false.
*/
if(leader != self.getId()){
// 若推荐的leader是别人,我不是Leader
if(votes.get(leader) == null) predicate = false;
//如果在votes,即outofelection中不存在,肯定是false
else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
//如果存在,但是状态不是LEADING,也是false
} else if(logicalclock.get() != electionEpoch) {
// 如果每个人都认为我是领导,那我就是领导。
// 若推荐的leader是当前server,则判断为的逻辑时钟和推荐的epoch是否一样,不一样肯定是false
predicate = false;
}
return predicate;
}