ZooKeeper(六):watch机制的原理与实现

  因为ZK有watch机制,可以随时发现一些数据的变化,从而达到数据的及时性。

  ZK的所有读操作都可以设置watch监视点: getData, getChildren, exists. 写操作则是不能设置监视点的。

  监视有两种类型:数据监视点和子节点监视点。创建、删除或者设置znode都会触发这些监视点。exists,getData 可以设置监视点。getChildren 可以设置子节点变化。

  而可能监测的事件类型有: NodeCreated, NodeDataChanged, NodeDeleted, NodeChildrenChanged.

  ZK 可以做到,只要数据一发生变化,就会通知相应地注册了监听的客户端。那么,它是怎么做到的呢?

  其实原理应该是很简单的,四个步骤:

    1. 客户端注册Watcher到服务端;
    2. 服务端发生数据变更;
    3. 服务端通知客户端数据变更;
    4. 客户端回调Watcher处理变更应对逻辑;

  我们以 getData 监听数据变更为例,详细看下 ZK 是如何处理watch的。

一、 客户端注册监听到服务端

  只有读服务才可以设置监听。我们以 getData 为例。

    // org.apache.zookeeper.ZooKeeper#getData(java.lang.String, boolean, org.apache.zookeeper.AsyncCallback.DataCallback, java.lang.Object)
/**
* The asynchronous version of getData.
*
* @see #getData(String, boolean, Stat)
*/
public void getData(String path, boolean watch, DataCallback cb, Object ctx) {
// 通过传入是否监听的标识位来决定监听,而 watcher 则使用 watchManager.defaultWatcher
// 即在构造 ZooKeeper 实例时传入的 watcher
getData(path, watch ? watchManager.defaultWatcher : null, cb, ctx);
}
// 要深入了解上面的数据来源,我们还是有必要看一下构造方法
// org.apache.zookeeper.ZooKeeper
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher); if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
// 默认的 watcher, 如果需要监听变化, 又没有传入 watcher, 则使用此 defaultWatcher
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider; cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
// 言归正传,getData 如果想要设置监听数据变化,则必定是异步调用
/**
* The asynchronous version of getData.
*
* @see #getData(String, Watcher, Stat)
*/
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) {
final String clientPath = path;
PathUtils.validatePath(clientPath); // 通过 DataWatchRegistration 进行监听注册
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
} final String serverPath = prependChroot(clientPath);
// 组装 request, response 传给 ClientCnxn
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 只是加入其发送队列中,即返回
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb);
}
// org.apache.zookeeper.ClientCnxn#queuePacket
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration) {
return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null);
} public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// 添加到队列
outgoingQueue.add(packet);
}
}
// 唤醒发送线程,我们可以认为此处即是数据已发送往服务端
// 事实上 SendThread 会一直循环处理数据发送、心跳等任务
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}

二、服务端注册客户端的监听请求

  咱们先看 Leader 对于watch的处理,是在 FinalRequestProcessor 中,才开始关注是否监听,如果监听则将该连接信息存储到 zkData 中去,以便在将来数据发生变更时进行通信 cnxn ;

    // org.apache.zookeeper.server.NIOServerCnxn#readPayload 负责读取客户端数据
// org.apache.zookeeper.server.NIOServerCnxn#readRequest
// org.apache.zookeeper.server.FinalRequestProcessor#processRequest
public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request); // request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
} ProcessTxnResult rc = zks.processTxn(request); // ZOOKEEPER-558:
// In some cases the server does not close the connection (e.g., closeconn buffer
// was not being queued — ZOOKEEPER-558) properly. This happens, for example,
// when the client closes the connection. The server should still close the session, though.
// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
// We need to check if we can close the session id.
// Sometimes the corresponding ServerCnxnFactory could be null because
// we are just playing diffs from the leader.
if (closeSession(zks.serverCnxnFactory, request.sessionId)
|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
return;
}
} if (request.getHdr() != null) {
/*
* Request header is created only by the leader, so this must be
* a quorum request. Since we're comparing timestamps across hosts,
* this metric may be incorrect. However, it's still a very useful
* metric to track in the happy case. If there is clock drift,
* the latency can go negative. Note: headers use wall time, not
* CLOCK_MONOTONIC.
*/
long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
if (propagationLatency >= 0) {
ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
}
} if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn; long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA";
// Notify ZooKeeperServer that the request has finished so that it can
// update any request accounting/throttling limits
zks.decInProcess();
zks.requestFinished(request);
Code err = Code.OK;
Record rsp = null;
String path = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
/*
* When local session upgrading is disabled, leader will
* reject the ephemeral node creation due to session expire.
* However, if this is the follower that issue the request,
* it will have the correct error code, so we should use that
* and report to user
*/
if (request.getException() != null) {
throw request.getException();
} else {
throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
}
} KeeperException ke = request.getException();
if (ke instanceof SessionMovedException) {
throw ke;
}
if (ke != null && request.type != OpCode.multi) {
throw ke;
} LOG.debug("{}", request); if (request.isStale()) {
ServerMetrics.getMetrics().STALE_REPLIES.add(1);
} switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
updateStats(request, lastOp, lastZxid); cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response");
return;
}
case OpCode.createSession: {
// ...
}
case OpCode.multi: {
// ...
}
case OpCode.multiRead: {
//...
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
// ...
}
case OpCode.delete:
case OpCode.deleteContainer: {
//
}
case OpCode.setData: {
//
}
case OpCode.reconfig: {
//
}
case OpCode.setACL:
case OpCode.closeSession:
case OpCode.sync:
case OpCode.check:
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
// 解析客户端数据到 GetDataRequest 中,其中包含了 watch 状态值
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
path = getDataRequest.getPath();
// 由 handleGetDataRequest 处理 watch 状态
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
case OpCode.setWatches: {
// setWatches 是另一种注册 watch 的方式
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// TODO We really should NOT need this!!!!
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(),
cnxn);
break;
}
case OpCode.getACL: {
//
}
case OpCode.getChildren: {
//
break;
}
case OpCode.getAllChildrenNumber: {
//
break;
}
case OpCode.getChildren2: {
//
break;
}
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
break;
}
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
throw new KeeperException.NoWatcherException(msg);
}
requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
break;
}
case OpCode.getEphemerals: {
//
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
// and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
cnxn.sendCloseSession();
return;
} catch (KeeperException e) {
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x{}", sb.toString());
err = Code.MARSHALLINGERROR;
} ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); updateStats(request, lastOp, lastZxid); try {
if (request.type == OpCode.getData && path != null && rsp != null) {
// Serialized read responses could be cached by the connection object.
// Cache entries are identified by their path and last modified zxid,
// so these values are passed along with the response.
GetDataResponse getDataResponse = (GetDataResponse) rsp;
Stat stat = null;
if (getDataResponse.getStat() != null) {
stat = getDataResponse.getStat();
}
cnxn.sendResponse(hdr, rsp, "response", path, stat);
} else {
cnxn.sendResponse(hdr, rsp, "response");
}
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG", e);
}
}
// org.apache.zookeeper.server.FinalRequestProcessor#handleGetDataRequest
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
GetDataRequest getDataRequest = (GetDataRequest) request;
String path = getDataRequest.getPath();
// 无权限地获取 dataNode 节点信息,用于后续判断
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
// 检查权限
zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
Stat stat = new Stat();
// 获取节点数据,如果需要进行watch监听,则把当前的连接信息传递过去,此处为 NIOServerCnxn
byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
return new GetDataResponse(b, stat);
} // org.apache.zookeeper.server.ZKDatabase#getData
/**
* get data and stat for a path
* @param path the path being queried
* @param stat the stat for this path
* @param watcher the watcher function
* @return
* @throws KeeperException.NoNodeException
*/
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
return dataTree.getData(path, stat, watcher);
}
// org.apache.zookeeper.server.DataTree#getData
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
byte[] data = null;
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
// 将 stat 信息放到 stat 变量中
n.copyStat(stat);
// 如果设置了监听,则注册监听, dataWatches 负责所有的监听管理
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
data = n.data;
}
updateReadStat(path, data == null ? 0 : data.length);
return data;
} // org.apache.zookeeper.server.watch.WatchManager#addWatch
@Override
public synchronized boolean addWatch(String path, Watcher watcher) {
// 各个地方都有检测是否连接有效的设置,避免数据不一致
if (isDeadWatcher(watcher)) {
LOG.debug("Ignoring addWatch with closed cnxn");
return false;
} // 此处为同步调用,所以可以请放心使用 HashMap 作为watcher的容器
// watchTable = new HashMap<String, Set<Watcher>>();
// 每一个 path 下,可以设置 n 个watcher, 所以使用 Set 数据结构保存 watcher
Set<Watcher> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher); // 针对每个链接,可以设置很多 path 的监听
Set<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
return paths.add(path);
}

  到此,服务端注册监听就完成了。最终是由 WatchManager 进行管理,包括连接上注册路径监听和路径上注册连接监听 双向管理。

三、 服务端通知客户端数据变更

  watch 是在数据发生变更时进行通知客户端的。比如 setData 时,将会触发 此功能。其实就是在 Commit 时,保存数据之后,进行客户端通知。

  也是在 FinalRequestProcessor 中完成的。

    public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request); // request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
// 处理发送请求
ProcessTxnResult rc = zks.processTxn(request);
//...
}
// org.apache.zookeeper.server.ZooKeeperServer#processTxn
// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn()); final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum(); // return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
// 入库事务信息
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn()); // request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
} // do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
} // org.apache.zookeeper.server.ZKDatabase#processTxn
/**
* the process txn on the data
* @param hdr the txnheader for the txn
* @param txn the transaction that needs to be processed
* @return the result of processing the transaction on this
* datatree/zkdatabase
*/
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}
// org.apache.zookeeper.server.DataTree#processTxn
public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
return this.processTxn(header, txn, false);
} // 和 getData 时一样, DataTree 是 ZooKeeper 中保存数据的主要数据结构
// org.apache.zookeeper.server.DataTree#processTxn
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
ProcessTxnResult rc = new ProcessTxnResult(); try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
//
break;
case OpCode.create2:
//
break;
case OpCode.createTTL:
//
break;
case OpCode.createContainer:
//
break;
case OpCode.delete:
case OpCode.deleteContainer:
//...
break;
case OpCode.reconfig:
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
// 保存数据
rc.stat = setData(
setDataTxn.getPath(),
setDataTxn.getData(),
setDataTxn.getVersion(),
header.getZxid(),
header.getTime());
break;
case OpCode.setACL:
SetACLTxn setACLTxn = (SetACLTxn) txn;
rc.path = setACLTxn.getPath();
rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
break;
case OpCode.closeSession:
//..
break;
case OpCode.error:
// ...
break;
case OpCode.check:
CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
rc.path = checkTxn.getPath();
break;
case OpCode.multi:
//...
break;
}
} catch (KeeperException e) {
LOG.debug("Failed: {}:{}", header, txn, e);
rc.err = e.code().intValue();
} catch (IOException e) {
LOG.debug("Failed: {}:{}", header, txn, e);
} /*
* Snapshots are taken lazily. When serializing a node, it's data
* and children copied in a synchronization block on that node,
* which means newly created node won't be in the snapshot, so
* we won't have mismatched cversion and pzxid when replaying the
* createNode txn.
*
* But there is a tricky scenario that if the child is deleted due
* to session close and re-created in a different global session
* after that the parent is serialized, then when replay the txn
* because the node is belonging to a different session, replay the
* closeSession txn won't delete it anymore, and we'll get NODEEXISTS
* error when replay the createNode txn. In this case, we need to
* update the cversion and pzxid to the new value.
*
* Note, such failures on DT should be seen only during
* restore.
*/
if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err);
int lastSlash = rc.path.lastIndexOf('/');
String parentName = rc.path.substring(0, lastSlash);
CreateTxn cTxn = (CreateTxn) txn;
try {
setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
} catch (KeeperException.NoNodeException e) {
LOG.error("Failed to set parent cversion for: {}", parentName, e);
rc.err = e.code().intValue();
}
} else if (rc.err != Code.OK.intValue()) {
LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err);
} /*
* Things we can only update after the whole txn is applied to data
* tree.
*
* If we update the lastProcessedZxid with the first sub txn in multi
* and there is a snapshot in progress, it's possible that the zxid
* associated with the snapshot only include partial of the multi op.
*
* When loading snapshot, it will only load the txns after the zxid
* associated with snapshot file, which could cause data inconsistency
* due to missing sub txns.
*
* To avoid this, we only update the lastProcessedZxid when the whole
* multi-op txn is applied to DataTree.
*/
if (!isSubTxn) {
/*
* A snapshot might be in progress while we are modifying the data
* tree. If we set lastProcessedZxid prior to making corresponding
* change to the tree, then the zxid associated with the snapshot
* file will be ahead of its contents. Thus, while restoring from
* the snapshot, the restore method will not apply the transaction
* for zxid associated with the snapshot file, since the restore
* method assumes that transaction to be present in the snapshot.
*
* To avoid this, we first apply the transaction and then modify
* lastProcessedZxid. During restore, we correctly handle the
* case where the snapshot contains data ahead of the zxid associated
* with the file.
*/
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid;
} if (digestFromLoadedSnapshot != null) {
compareSnapshotDigests(rc.zxid);
} else {
// only start recording digest when we're not in fuzzy state
logZxidDigest(rc.zxid, getTreeDigest());
}
} return rc;
}
// org.apache.zookeeper.server.DataTree#setData
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte[] lastdata = null;
// 节点数据变更
synchronized (n) {
lastdata = n.data;
nodes.preChange(path, n);
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
nodes.postChange(path, n);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
long dataBytes = data == null ? 0 : data.length;
if (lastPrefix != null) {
this.updateCountBytes(lastPrefix, dataBytes - (lastdata == null ? 0 : lastdata.length), 0);
}
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata)); updateWriteStat(path, dataBytes);
// 保存完数据之后,再做监听的通知
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
} // 调用 WatchManager 进行通知,因为之前也是在 WatchManager 中注册的
// org.apache.zookeeper.server.watch.WatchManager#triggerWatch
@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
// 取出全部的 watcher
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
// 把所有注册到端的 path 全部删除
Set<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
// 然后依次回调客户端
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 调用 watcher 的 process 方法
w.process(e);
}
//...
return new WatcherOrBitSet(watchers);
} // 来看下 NIOServerCnxn 的 process 方法
// org.apache.zookeeper.server.NIOServerCnxn#process
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
} // Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification", null, null);
}

  总结,watch 动作是在服务端存储完数据之后,进行一一调用完成的动作。统一由 WatchManager 管理。

四、 客户端回调用户程序

  会有一个专门的Event线程进行结果处理。当然Event线程的数据来源,是由与服务端连接socket处理好了的。

        // org.apache.zookeeper.ClientCnxn.EventThread#run
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
// 由外部请求将事件放入 waitingEvents
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 负责异步回调和watch工作
processEvent(event);
}
if (wasKilled) {
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
} LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
// org.apache.zookeeper.ClientCnxn.EventThread#processEvent
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// watcher 回调,由业务自行实现功能即可
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else if (event instanceof LocalCallback) {
// 异步事件回调,由业务自行实现功能即可
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof DataCallback) {
((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ACLCallback) {
((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ChildrenCallback) {
((ChildrenCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof Children2Callback) {
((Children2Callback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof StringCallback) {
((StringCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof AsyncCallback.EphemeralsCallback) {
((AsyncCallback.EphemeralsCallback) lcb.cb).processResult(lcb.rc, lcb.ctx, null);
} else if (lcb.cb instanceof AsyncCallback.AllChildrenNumberCallback) {
((AsyncCallback.AllChildrenNumberCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, -1);
} else {
((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx);
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response).getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response).getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response).getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetAllChildrenNumberResponse) {
AllChildrenNumberCallback cb = (AllChildrenNumberCallback) p.cb;
GetAllChildrenNumberResponse rsp = (GetAllChildrenNumberResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getTotalNumber());
} else {
cb.processResult(rc, clientPath, p.ctx, -1);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof Create2Response) {
Create2Callback cb = (Create2Callback) p.cb;
Create2Response rsp = (Create2Response) p.response;
if (rc == 0) {
cb.processResult(
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())),
rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
if (rc == 0) {
List<OpResult> results = rsp.getResultList();
int newRc = rc;
for (OpResult result : results) {
if (result instanceof ErrorResult
&& KeeperException.Code.OK.intValue()
!= (newRc = ((ErrorResult) result).getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetEphemeralsResponse) {
EphemeralsCallback cb = (EphemeralsCallback) p.cb;
GetEphemeralsResponse rsp = (GetEphemeralsResponse) p.response;
if (rc == 0) {
cb.processResult(rc, p.ctx, rsp.getEphemerals());
} else {
cb.processResult(rc, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Unexpected throwable", t);
}
} }

  可以看到,Event线程是比较简单的,只是一个纯粹接收队列数据,调用业务逻辑的过程。所以其重点变成了外部添加的队列为。这是在处理完请求时,构造packet的。从 SendThread 的处理开始。

        // org.apache.zookeeper.ClientCnxn.SendThread#run
@Override
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
} if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
} if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
} if (to <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
} // If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
// 处理发送请求
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
// closing so this is expected
LOG.warn(
"An exception was thrown while closing send thread for session 0x{}.",
Long.toHexString(getSessionId()),
e);
break;
} else {
LOG.warn(
"Session 0x{} for sever {}, Closing socket connection. "
+ "Attempting reconnect except it is a SessionExpiredException.",
Long.toHexString(getSessionId()),
serverAddress,
e); // At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
} synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(
LOG,
ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
// org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
@Override
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// io操作
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
// 传递数据给 sendThread, 以便进行回调
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()); if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
} // 将 ByteBuffer 转换为 Packet
// org.apache.zookeeper.ClientCnxn.SendThread#readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
LOG.debug(
"Got ping response for session id: 0x{} after {}ms.",
Long.toHexString(sessionId),
((System.nanoTime() - lastPingSentNs) / 1000000));
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
}
LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
return;
}
if (replyHdr.getXid() == -1) {
// -1 means notification
LOG.debug("Got notification session id: 0x{}", Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response"); // convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0) {
event.setPath("/");
} else if (serverPath.length() > chrootPath.length()) {
event.setPath(serverPath.substring(chrootPath.length()));
} else {
LOG.warn(
"Got server path {} which is too short for chroot path {}.",
event.getPath(),
chrootPath);
}
} WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
} // If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
} Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
+ " with err " + replyHdr.getErr()
+ " expected Xid " + packet.requestHeader.getXid()
+ " for a packet with details: " + packet);
} packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
} LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
} finally {
finishPacket(packet);
}
}
// org.apache.zookeeper.ClientCnxn#finishPacket
// @VisibleForTesting
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
} if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
// 当注册了异步事件时,会添加到 eventThread 的队列中
eventThread.queuePacket(p);
}
}
// org.apache.zookeeper.ClientCnxn.EventThread#queuePacket
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void queuePacket(Packet packet) {
// 添加到 waitingEvents 队列,然后异步回调业务
// 然后交给 EventThread 去独立处理就好了
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) {
waitingEvents.add(packet);
} else {
processEvent(packet);
}
}
} else {
waitingEvents.add(packet);
}
}

  说清一个简单的道理。

上一篇:URL query string中文字符问题


下一篇:Python 调用自定义包