zk observer 节点

zk 集群中有3种节点:leader,follower,observer,其中 observer 节点没有投票权,即它不参与选举和写请求的投票。

比较 Follower 和 Observer 的代码:

//void org.apache.zookeeper.server.quorum.Observer.processPacket(QuorumPacket qp) throws IOException
protected void processPacket(QuorumPacket qp) throws IOException{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
LOG.warn("Ignoring proposal");
break;
case Leader.COMMIT:
LOG.warn("Ignoring commit");
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Observer started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
((ObserverZooKeeperServer)zk).sync();
break;
case Leader.INFORM:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
Request request = new Request (null, hdr.getClientId(),
hdr.getCxid(),
hdr.getType(), null, null);
request.txn = txn;
request.hdr = hdr;
ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
obs.commitRequest(request);
break;
}
}
// void org.apache.zookeeper.server.quorum.Follower.processPacket(QuorumPacket qp) throws IOException
protected void processPacket(QuorumPacket qp) throws IOException{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
fzk.sync();
break;
}
}

可以看到,observer 不处理 leader 的 proposal 和 commit,但是它会接收 leader 的 inform,所以它是能听到 write 的。

// void org.apache.zookeeper.server.quorum.Leader.processAck(long sid, long zxid, SocketAddress followerAddr)
// 代码片段
commit(zxid); // 向 followers 发送 COMMIT
inform(p); // 向 observers 发送 INFORM
zk.commitProcessor.commit(p.request); // leader 提交
上一篇:web


下一篇:使用手机预览移动端项目(Vue)