前言:
前一篇文章中我们通过对节点创建请求的分析,大致了解了客户端发送请求的一个大概过程。
简单来说就是Zookeeper封装request成为Packet,并将Packet添加到outgoingQueue队列中,后续SendThread不断轮询队列中的包,并通过SocketChannel发送到服务端,SocketChannel监听读事件,获取到服务端的响应response后,将response添加到Packet中,返回客户端。
有了之前分析节点创建请求分析的经验,我们以此类推,可以继续分析下其他重要的请求。主要就是创建会话、删除节点、更新节点数据、读取节点数据,我们依次来分析。
1.创建会话
1.1 会话创建时机
// SendThread.run
class SendThread extends ZooKeeperThread {
@Override
public void run() {
...
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
// 如果未创建连接,则需要先创建一个对服务端的长连接
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
// 通过hostProvider选择一个合适的服务端ip:port(集群模式下有多个ip:port)
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
// 在这里创建连接,见下
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
...
}
private void startConnect(InetSocketAddress addr) throws IOException {
saslLoginFailed = false;
state = States.CONNECTING;
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
...
logStartConnect(addr);
// 创建连接,具体见1.1.1
clientCnxnSocket.connect(addr);
}
}
SendThread会先检查当前连接是否已创建,若未创建,则先创建连接,创建连接交由clientCnxnSocket执行
1.1.1 ClientCnxnSocketNIO.connect() 创建连接
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
// 在这里
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
// 注册连接事件后,使用SocketChannel创建连接,这里是三次握手时间
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
// 连接成功后,客户端发送首个事件,具体见1.2
sendThread.primeConnection();
}
}
}
总结下来,Zookeeper客户端创建连接的时机就是:Zookeeper.java创建完成后,SendThread启动后,会默认检查是否已创建连接,若无,则主动发起一次连接。
1.2 SendThread.primeConnection() 发送连接事件
class SendThread extends ZooKeeperThread {
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
// 创建连接请求,具体见1.2.1
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
// watch相关,非本文重点,先忽略
if (!disableAutoWatchReset) {
...
}
// 权限认证相关,非重点,先忽略
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
// 将Connect请求封装到Packet中,并添加到outgoingQueue中
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
}
1.2.1 ConnectRequest 连接请求
public class ConnectRequest implements Record {
private int protocolVersion;
private long lastZxidSeen;
private int timeOut;
private long sessionId;
private byte[] passwd;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(protocolVersion,"protocolVersion");
a_.writeLong(lastZxidSeen,"lastZxidSeen");
a_.writeInt(timeOut,"timeOut");
a_.writeLong(sessionId,"sessionId");
a_.writeBuffer(passwd,"passwd");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
protocolVersion=a_.readInt("protocolVersion");
lastZxidSeen=a_.readLong("lastZxidSeen");
timeOut=a_.readInt("timeOut");
sessionId=a_.readLong("sessionId");
passwd=a_.readBuffer("passwd");
a_.endRecord(tag);
}
}
1.3 获取创建连接请求的响应
按照之前文章的分析,解析响应最终在SendThread.readResponse()方法中执行
class SendThread extends ZooKeeperThread {
...
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
...
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
try {
...
// 获取响应头replyHdr中的基本信息
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
// 反序列化为ConnectResponse,具体见1.3.1
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
}
1.3.1 ConnectResponse 连接事件响应体
public class ConnectResponse implements Record {
private int protocolVersion;
private int timeOut;
// 最重要的就是这个sessionID,是服务端分配的唯一的一个会话ID,用于标识会话
private long sessionId;
private byte[] passwd;
...
}
总结:创建会话的请求和响应分别是ConnectRequest、ConnectResponse,分别如下图所示
2.删除节点(delete)
与创建节点类似,具体过程不再分析,我们直接看删除请求和响应体
2.1 DeleteRequest 删除请求体
public class DeleteRequest implements Record {
// 主要就是传入需要删除的路径
private String path;
private int version;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");
a_.writeInt(version,"version");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
path=a_.readString("path");
version=a_.readInt("version");
a_.endRecord(tag);
}
删除请求主要就是传入需要删除的路径path,请求协议图示如下:
2.2 删除节点响应
我们会发现没有DeleteResponse这个类,删除节点不需要再返回具体response,通过ReplyHeader既可以判断是否请求成功
3.更新节点请求(setData)
与创建节点类似,具体过程不再分析,我们直接看更新节点请求和其响应体
3.1 SetDataRequest 更新节点数据请求
public class SetDataRequest implements Record {
// 把需要更新的节点的路径和值传入即可
private String path;
private byte[] data;
private int version;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");
a_.writeBuffer(data,"data");
a_.writeInt(version,"version");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
path=a_.readString("path");
data=a_.readBuffer("data");
version=a_.readInt("version");
a_.endRecord(tag);
}
}
针对需要修改的节点,把节点路径和需要修改的值传入即可,具体协议图如下所示:
3.2 SetDataResponse 更新节点数据响应
public class SetDataResponse implements Record {
private org.apache.zookeeper.data.Stat stat;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeRecord(stat,"stat");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
stat= new org.apache.zookeeper.data.Stat();
a_.readRecord(stat,"stat");
a_.endRecord(tag);
}
}
// Stat.java
public class Stat implements Record {
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private int dataLength;
private int numChildren;
private long pzxid;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeLong(czxid,"czxid");
a_.writeLong(mzxid,"mzxid");
a_.writeLong(ctime,"ctime");
a_.writeLong(mtime,"mtime");
a_.writeInt(version,"version");
a_.writeInt(cversion,"cversion");
a_.writeInt(aversion,"aversion");
a_.writeLong(ephemeralOwner,"ephemeralOwner");
a_.writeInt(dataLength,"dataLength");
a_.writeInt(numChildren,"numChildren");
a_.writeLong(pzxid,"pzxid");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
czxid=a_.readLong("czxid");
mzxid=a_.readLong("mzxid");
ctime=a_.readLong("ctime");
mtime=a_.readLong("mtime");
version=a_.readInt("version");
cversion=a_.readInt("cversion");
aversion=a_.readInt("aversion");
ephemeralOwner=a_.readLong("ephemeralOwner");
dataLength=a_.readInt("dataLength");
numChildren=a_.readInt("numChildren");
pzxid=a_.readLong("pzxid");
a_.endRecord(tag);
}
}
总结:更新节点后,主要信息存放在Stat中。具体协议图如下:
4.获取节点数据(getData)
与创建节点类似,具体过程不再分析,我们直接看读取节点请求和其响应体
4.1 GetDataRequest 获取节点数据请求
public class GetDataRequest implements Record {
// 主要就是将路径传入
private String path;
private boolean watch;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");
a_.writeBool(watch,"watch");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
path=a_.readString("path");
watch=a_.readBool("watch");
a_.endRecord(tag);
}
}
在获取节点信息时,请求体主要就是节点路径,具体协议信息如下:
4.2 GetDataResponse 获取节点数据请求
public class GetDataResponse implements Record {
// 数据存入data数据中
private byte[] data;
private org.apache.zookeeper.data.Stat stat;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeBuffer(data,"data");
a_.writeRecord(stat,"stat");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
data=a_.readBuffer("data");
stat= new org.apache.zookeeper.data.Stat();
a_.readRecord(stat,"stat");
a_.endRecord(tag);
}
}
获取节点数据后,节点内容存放在data[]中,节点状态信息存在stat中,具体协议如下: