当我们把zookeeper服务启动时,首先需要做的一件事就是leader选举,zookeeper中leader选举的算法有3种,包括LeaderElection算法、AuthFastLeaderElection算法以及FastLeaderElection算法,其中FastLeadElection算法是默认的,当然,我们也可以在配置文件中修改配置项:electionAlg。
1、当zookeeper服务启动时,在类QuorumPeerMain中的入口函数main,主线程启动:
public class QuorumPeerMain {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class); private static final String USAGE = "Usage: QuorumPeerMain configfile"; protected QuorumPeer quorumPeer; /**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
2、然后便是QuorumPeer重写Thread.start方法,启动:
quorumPeer.start();
quorumPeer.join();
在类QuorumPeer中
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
cnxnFactory.start();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
super.start();
}
3、可以从上面的源码中看到,quorumPeer线程启动后,首先做的是数据恢复,它会读取保存在磁盘中的数据:
private void loadDataBase() {
try {
//从本地文件中恢复db
zkDb.loadDataBase(); // load the epochs
/*
从最新的zxid恢复epoch变量
其中zxid为long型,前32位代表epoch值,后32位代表zxid值,
这个zxid(ZooKeeper Transaction Id),即事务id,zookeeper每次更,zxid都会增大
因此越大代表数据越新
*/
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
//....
4、然后便是初始化选举,一开始选举自己,默认使用的算法是FastLeaderElection:
synchronized public void startLeaderElection() {
try {
/*
先投自己
*/
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
} // if (!getView().containsKey(myid)) {
// throw new RuntimeException("My id " + myid + " not in the peer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
5、然后便是绑定选举端口,FastLeaderElection初始化:
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null; //TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
/*
绑定选举端口,等待集群其它机器连接
*/
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
//基于TCP的选举算法
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
6、QuorumPeer线程启动:
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1; /*
业务层发送队列,业务对象ToSend
业务层接收队列,业务对象Notification
*/
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager); }
在FastLeaderElection.java文件中:
Messenger(QuorumCnxManager manager) { this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true); this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
7、在进行选举的过程中,每台zookeeper server服务器有以下四种状态:LOOKING、FOLLOWING、LEADING、OBSERVING,其中出于OBSERVING状态的server不参加投票过程,只有出于LOOKING状态的机子才参加投票过程,一旦投票结束,server的状态就会变成FOLLOWER或者LEADER。
下面先说一下leader选举过程:
步骤1:对于处于LOOKING状态的server来说,首先判断一个被称为逻辑时钟值(logicalclock),如果收到的logicalclock的值大于当前server自身的logicalclock值,说明这是更新的一次选举,此时需要更新自身server的logicalclock值,并且将之前收到的来自其他server的投票结果清空,然后判断是否需要更新自身的投票,判断的标准是先看epoch值的大小,然后再判断zxid的大小,最后再看server id的大小(当然,针对这种情况,server肯定会更新自身的投票,因为当前server的epoch值小于收到的epoch值嘛),然后将自身的投票广播给其他server。
在FastLeaderElection.java文件中:
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
} /*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/ return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
步骤2:如果是自身的logicalclock值大于接收的logicalclock值,那么就直接break;如果刚好相等, 就根据epoch、zxid以及server id来判断是否需要更新,然后再把自己的投票广播给其他server,最后要把收到投票加入到当前server接收的投票队伍中。
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
在FastLeaderElection.java文件的lookForLeader函数中:
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
//清空之前收到的投票结果
recvset.clear();
//判断是否需要更新自身投票
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
//广播
sendNotifications();
} if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
} //加入投票队伍
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
步骤3:服务器判断投票是否结束,结束的条件是:是否某个leader得到了半数以上的server的支持,如果是,则尝试再等一会儿(200ms)看是否收到更新数据,如果没有收到,则设置自身的角色(follower Or leader),然后退出选举流程,否则继续。
FastLeaderElection.java文件中;
//判断投票是否结束
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
} /*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
} return voteSet.hasAllQuorums();
}
在lookForLeader函数中:
//判读投票是否结束
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader
//再等一会儿,看是否有新的投票
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
} /*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//如果没有发生新的投票,则结束选举过程
//设置自身状态
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
步骤4:以上我们讨论的是数据发送server的状态是LOOKING状态,如果数据发送方的状态是FOLLOWING或是LEADING状态,那么如果logicalclock相同,则将数据保存到recvset中,如果对方server自称是leader的话,那么就判断是否有半数以上的server支持它,如果是,则设置自身选举状态并且退出选举;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
//当前server与发送方server的logicalclock相同
if(n.electionEpoch == logicalclock.get()){
//加入到recvset中
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
步骤5:如果收到的数据的logicalclock值与当前server的logicalclock不相等,那么说明在另外一个选举中已经有了选举结果,于是加入outofelection集合中,并且在outofelection集合中判断时候支持过半,如果是,则更新自身的投票,并且设置自身的状态:
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
总结:这就是zookeeper的FastLeaderElection选举的大致过程。
参考博客: