ZooKeeper源码分析之Session(一)

文章目录

2021SC@SDUSC

客户端创建 Socket 连接后,会尝试与服务器连接,如果连接成功,则与服务器之间形成Session。

前言

在ZooKeeper中,客户端和服务端之间的会话是怎样创建、检查更新、删除以及被如何维护的,将再接下来的文章中进行介绍和分析。

SessionTrackerImpl

ZooKeeper 在 SessionTrackerImpl 中实现了Session的各种操作:创建session,检查更新session、删除session等。而本篇文章将对该类进行简单概述。
SessionTrackerImpl 主要是来维护客户端和服务器之间的session。SessionTrackerImpl实现了SessionTracker,同时也继承了ZooKeeperCriticalThread。即SessionTrackerImpl也是一个线程。

/**
  *按记号间隔分组跟踪会话。
  *总是将tick的间隔取整,以提供某种宽限期。
  *因此,会话将在由在给定时间间隔内过期的会话组成的批中过期。 
 */
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {}

属性

在服务端通过 SessionTrackerImpl 和 ExpiryQueue 来保存Session会话信息。
SessionTrackerImpl属性:

    // 存储着会话id
    protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = new ConcurrentHashMap<Long, SessionImpl>();
    // 队列存储失效的会话
    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
    // 存储超时会话
    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
    // 下一个会话id
    private final AtomicLong nextSessionId = new AtomicLong();
    // 失效的会话
    private final SessionExpirer expirer;

ExpiryQueue属性:

   // E即是session实例对象,long为失效时间
   private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
   // 当前失效时间的session集合
   private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
   // 下一次失效时间
   private final AtomicLong nextExpirationTime = new AtomicLong();
   // 失效时间间隔
   private final int expirationInterval;
   

run()线程机制

SessionTrackerImpl的run()与之前文章NIOServerCnxnFactory源码分析中ConnectionExpirerThread内的run()类似。

    public void run() {
        try {
            while (running) {
                long waitTime = sessionExpiryQueue.getWaitTime();//等待时间
                if (waitTime > 0) {// 如果等待时间大于0
                    Thread.sleep(waitTime);// 当前线程睡眠,等待时间到下一次失效时间
                    continue;
                }

                for (SessionImpl s : sessionExpiryQueue.poll()) {// 下次失效时间对应的session关闭
                    ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);// 进行closeSession操作
                }
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

initializeNextSessionId生成会话ID

初始化下一次会话的ID,以便客户端发起连接请求时能够获取不冲突且正确的会话ID。

    /**
     * 生成初始会话ID. 高阶1字节是serverId
     * 下5个字节来自timestamp ,低阶2字节是0s。
     * 使用“>>>8”,而不是“>>8”,以确保高位1字节完全由服务器Id决定
     * @param id server Id
     * @return the Session Id
     */
    public static long initializeNextSessionId(long id) {
        long nextSid;
        nextSid = (Time.currentElapsedTime() << 24) >>> 8; // 生成下一次会话ID
        nextSid = nextSid | (id << 56);
        if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
            ++nextSid;  // 这是一个不太可能的边缘情况
        }
        return nextSid;
    }

touchSession确定ticktime

touchSession是每次接收到客户端的消息之后就进行的操作,其主要是延长对应的session的超时的时间.

    public synchronized boolean touchSession(long sessionId, int timeout) {
        SessionImpl s = sessionsById.get(sessionId);

        if (s == null) {//对应session不存在
            logTraceTouchInvalidSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session
            return false;
        }

        if (s.isClosing()) {//对应session已经被标识为关闭
            logTraceTouchClosingSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session
            return false;
        }
        //更新会话失效时间
        updateSessionExpiry(s, timeout);
        return true;
    }

基本方法

    // 确定session的timeout和id
    public long createSession(int sessionTimeout) {
        long sessionId = nextSessionId.getAndIncrement();
        trackSession(sessionId, sessionTimeout);
        return sessionId;
    }

    // 删除会话
    public synchronized void removeSession(long sessionId) {
        LOG.debug("Removing session 0x{}", Long.toHexString(sessionId));
        // 删除会话ID
        SessionImpl s = sessionsById.remove(sessionId);
        // 删除会话timeout
        sessionsWithTimeout.remove(sessionId);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.SESSION_TRACE_MASK,
                "SessionTrackerImpl --- Removing session 0x" + Long.toHexString(sessionId));
        }
        if (s != null) {// 在会话失效队列中删除
            sessionExpiryQueue.remove(s);
        }
    }

总结

本章对会话的维护进行了简单分析,下一章将对会话的创建进行细节分析。

上一篇:YOLO添加Focal loss


下一篇:final关键字