文章目录
- 2021SC@SDUSC
- 前言
- SessionTrackerImpl
- 属性
- run()线程机制
- initializeNextSessionId生成会话ID
- touchSession确定ticktime
- 基本方法
- 总结
2021SC@SDUSC
客户端创建 Socket 连接后,会尝试与服务器连接,如果连接成功,则与服务器之间形成Session。
前言
在ZooKeeper中,客户端和服务端之间的会话是怎样创建、检查更新、删除以及被如何维护的,将再接下来的文章中进行介绍和分析。
SessionTrackerImpl
ZooKeeper 在 SessionTrackerImpl 中实现了Session的各种操作:创建session,检查更新session、删除session等。而本篇文章将对该类进行简单概述。
SessionTrackerImpl 主要是来维护客户端和服务器之间的session。SessionTrackerImpl实现了SessionTracker,同时也继承了ZooKeeperCriticalThread。即SessionTrackerImpl也是一个线程。
/**
*按记号间隔分组跟踪会话。
*总是将tick的间隔取整,以提供某种宽限期。
*因此,会话将在由在给定时间间隔内过期的会话组成的批中过期。
*/
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {}
属性
在服务端通过 SessionTrackerImpl 和 ExpiryQueue 来保存Session会话信息。
SessionTrackerImpl属性:
// 存储着会话id
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = new ConcurrentHashMap<Long, SessionImpl>();
// 队列存储失效的会话
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
// 存储超时会话
private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
// 下一个会话id
private final AtomicLong nextSessionId = new AtomicLong();
// 失效的会话
private final SessionExpirer expirer;
ExpiryQueue属性:
// E即是session实例对象,long为失效时间
private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
// 当前失效时间的session集合
private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
// 下一次失效时间
private final AtomicLong nextExpirationTime = new AtomicLong();
// 失效时间间隔
private final int expirationInterval;
run()线程机制
SessionTrackerImpl的run()与之前文章NIOServerCnxnFactory源码分析中ConnectionExpirerThread内的run()类似。
public void run() {
try {
while (running) {
long waitTime = sessionExpiryQueue.getWaitTime();//等待时间
if (waitTime > 0) {// 如果等待时间大于0
Thread.sleep(waitTime);// 当前线程睡眠,等待时间到下一次失效时间
continue;
}
for (SessionImpl s : sessionExpiryQueue.poll()) {// 下次失效时间对应的session关闭
ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
setSessionClosing(s.sessionId);
expirer.expire(s);// 进行closeSession操作
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
initializeNextSessionId生成会话ID
初始化下一次会话的ID,以便客户端发起连接请求时能够获取不冲突且正确的会话ID。
/**
* 生成初始会话ID. 高阶1字节是serverId
* 下5个字节来自timestamp ,低阶2字节是0s。
* 使用“>>>8”,而不是“>>8”,以确保高位1字节完全由服务器Id决定
* @param id server Id
* @return the Session Id
*/
public static long initializeNextSessionId(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8; // 生成下一次会话ID
nextSid = nextSid | (id << 56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // 这是一个不太可能的边缘情况
}
return nextSid;
}
touchSession确定ticktime
touchSession是每次接收到客户端的消息之后就进行的操作,其主要是延长对应的session的超时的时间.
public synchronized boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {//对应session不存在
logTraceTouchInvalidSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session
return false;
}
if (s.isClosing()) {//对应session已经被标识为关闭
logTraceTouchClosingSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session
return false;
}
//更新会话失效时间
updateSessionExpiry(s, timeout);
return true;
}
基本方法
// 确定session的timeout和id
public long createSession(int sessionTimeout) {
long sessionId = nextSessionId.getAndIncrement();
trackSession(sessionId, sessionTimeout);
return sessionId;
}
// 删除会话
public synchronized void removeSession(long sessionId) {
LOG.debug("Removing session 0x{}", Long.toHexString(sessionId));
// 删除会话ID
SessionImpl s = sessionsById.remove(sessionId);
// 删除会话timeout
sessionsWithTimeout.remove(sessionId);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- Removing session 0x" + Long.toHexString(sessionId));
}
if (s != null) {// 在会话失效队列中删除
sessionExpiryQueue.remove(s);
}
}
总结
本章对会话的维护进行了简单分析,下一章将对会话的创建进行细节分析。