OkHttp源码分析 ConnecionPool

在分析拦截器ConnectInterceptor时,我也顺带分析过连接池,但是当时并没有特别清楚okthttp的连接池,经过这几天在探究,我打算在梳理一下okhttp的连接池ConnectionPool。

一:创建ConnecionPool及初始化

首先我们来看ConnectionPool的创建的地方。在OkHttpClient.Builder的构造方法里面,对ConnectionPool进行了初始化。

public static final class Builder {

    ......

    public Builder() {

        ......
      /**
       * 连接池
       */
      connectionPool = new ConnectionPool();
        ......
    }

从这里可以想到,ConnectionPool是与OkHttpClient绑定的

再来看ConnectionPool的构造函数及成员变量

public final class ConnectionPool {
  /**
   * Background threads are used to cleanup expired connections. There will be at most a single
   * thread running per connection pool. The thread pool executor permits the pool itself to be
   * garbage collected.
   * 后台线程用于清理过期的连接。 每个连接池最多只能运行一个线程。 线程池执行器允许池本身被垃圾收集
   * 注意executor是静态的,也就是说它是ConnectionPool所有的对象公用的
   */
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  /**
   * 最大的空闲连接数
   */
  private final int maxIdleConnections;
  /**
   * 每个空闲连接的存活时间的纳秒数
   */
  private final long keepAliveDurationNs;

  /**
   * 连接池中维护了一个双端队列Deque来保存可复用连接RealConnection
   */
  private final Deque<RealConnection> connections = new ArrayDeque<>();

  /**
   * 连接地址要避免的失败路由黑名单
   */
  final RouteDatabase routeDatabase = new RouteDatabase();

  //是否是清理过期连接的标记位
  boolean cleanupRunning;

  /**
   * Create a new connection pool with tuning parameters appropriate for a single-user application.
   * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
   * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
   */
  /**
   * 默认最大连接数是5个
   * 默认每个连接的存活时间为5分钟
   */
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }
  1. 先看构造函数  创建了一个默认最大连接数是5个,最大连接存活时间为5分钟
  2. 成员变量 Executor executor   注意executor 是一个静态的变量,也就是此线程池是所有的ConnectionPool对象共用的,exector线程池用于清理无用的连接。配合cleanupRunning标志,保证每个ConnectionPool对象最多只有一个清理无用连接的线程在运行。
  3. connections连接队列  连接池维护的一个队列,用于保存可复用的RealConnection连接
  4. macIdleConnections   最大的空闲连接数
  5. keepAliveDurationNs  每个空闲连接存活的最长时间

在OkHttpClient创建时,就创建了ConnectionPool实例对象。且已经和OkHttpClient绑定

二:ConnectionPool的get方法

get方法是从连接池里面获取一个连接供使用。

/**
   * Returns a recycled connection to {@code address}, or null if no such connection exists. The
   * route is null if the address has not yet been routed.
   * 通过遍历connections(用于存放用于存放RealConnection的ArrayDeque队列),
   * 调用RealConnection的isEligible()方法判断其是否可用,如果可用就会调用streamAllocation的acquire()方法,
   * 并返回connection。
   */
  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      /**
       * isEligible方法内部主要通过判断当前Connection拥有的StreamAllocation是否超过的限制,或者
       * 当前Connection是否不允许分配stream等等途径,进而判断当前Connection是否有效。
       */
      if (connection.isEligible(address, route)) {
        //调用acquire,将StreamAllocation的弱引用添加到RealConnection的allocations中
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

get方法主要是遍历connections,找到一个合适的连接,判断一个连接是否可以使用,通过isEligible方法实现。isEligible方法将在RealConnection中详细介绍。

找到合适的连接会通过StreamAllocation的acquire方法,将RealConnection和StreamAllocation绑定。然后返回这个合适的连接。

如果没有找到则返回null。

 

下面看看StreamAllocation的acquire方法

  public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();
    //将RealConnection赋值成员变量connection.表示当前的连接分配给当前的流
    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    //创建StreamAllocationReference对象并添加到allocations集合中
    // StreamAllocationReference是用于保存StreamAllocation的弱引用
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

在acquire方法里面将RealConnection赋值成员变量connection.表示当前的连接分配给当前的流。并且创建StreamAllocationReference对象并添加到allocations集合中。

每一个RealConnection对象在于StreamAllocation对象绑定时,会将StreamAllocation的弱引用添加到allocations集合中,通过allocations的大小,可以知道Connection拥有的StreamAllocation数。

通过get方法我们可以查询当前连接池是否有可用的RealConnection。如果有会调用StreamAllocation的acquire方法,将RealConnection赋值给StreamAllocation的成员变量connection,并且会将StreamAllocation的弱引用添加到allocations集合中,这样就将连接RealConnection分配给流streamAllocation,建立绑定关系。如果没有找到则返回null。

 三:ConnectionPool的put方法

put方法是将RealConnection对象存入连接池

 /**
   * put()方法在将连接添加到连接池之前,会先执行清理任务,通过判断cleanupRunning是否在执行,
   * 如果当前清理任务没有执行,则更改cleanupRunning标识,并执行清理任务cleanupRunnable。
   *
   * @param connection
   */
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    /**
     * cleanupRunning 清理是否在执行标志
     */
    if (!cleanupRunning) {
      cleanupRunning = true;
      // executor是线程池,此处异步执行清理任务cleanupRunnable
      executor.execute(cleanupRunnable);
    }
    //将连接添加到连接池
    connections.add(connection);
  }

put方法直接将Connection对象添加到connections队列。不过这里需要注意,当cleanupRunning为false,就会向线程池添加一个cleanuoRunnable,进行清理操作。清理操作下面会详细介绍。

四:清理无用的连接

在put方法中我们只是清理工作是由cleanupRunning来完成的,我们来具体看看cleanupRunning

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      //循环调用cleanup进行清除过期连接的任务
      while (true) {
        //cleanup方法对连接池进行清理,返回进行下次清理的间隔时间
        long waitNanos = cleanup(System.nanoTime());
        //如果返回的数据间隔为-1,则会接受循环
        if (waitNanos == -1) return;
        //如果大于1会调用wait进行等待
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              //进行等待
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

cleanupRunning是一个死循环的任务,只要cleanup方法不返回-1,就会一直执行。

当cleanup方法没有返回-1.当前的Runnable就会进入睡眠状态。真正的操作是在cleanup方法里。

下面来看cleanup方法:

long cleanup(long now) {
    // 正在使用的连接数
    int inUseConnectionCount = 0;
    // 空闲的连接数
    int idleConnectionCount = 0;
    // 空闲时间最长的连接
    RealConnection longestIdleConnection = null;
    // 最大的空闲时间,初始化为Long的最小值,用于记录所有空闲连接中空闲最久的时间
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      // 遍历所有的连接,标记不活跃的连接
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        // 查询此连接内部的StreanAllocation的引用数量
        // 如果遍历到的连接正在使用,则跳过,continue继续遍历下一个
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;//正在使用的连接数
          continue;
        }
        // 当前连接处于空闲,空闲连接数++
        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        // 计算空闲时间
        long idleDurationNs = now - connection.idleAtNanos;
        // 空闲时间如果超过最大空闲时间
        if (idleDurationNs > longestIdleDurationNs) {
          // 重新赋值最大空闲时间
          longestIdleDurationNs = idleDurationNs;
          // 赋值空闲最久的连接
          longestIdleConnection = connection;
        }
      }
      /**
       * keepAliveDurationNs 持续连接时间
       * maxIdleConnections 最大的空闲连接数
       */
      /**
       * 如果最大空闲时间超过每个连接存活的最大时间
       * 或者
       * 空闲连接数大于设定的空闲连接数
       * 则删除空闲最久的连接
       */
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        connections.remove(longestIdleConnection);
        /**
         * 如果空闲连接数大于0,则计算空闲时间最长的连接离最大空闲时间还需要的时间,并返回此时间
         */
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs;
        /**
         * 如果以上都不满足则判断正在使用连接是否大于0 ,是则返回设定的最长连接时间
         */
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        return keepAliveDurationNs;
        /**
         * 以上都不是则返回-1,并将cleanupRunning置为false
         */
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

首先通过foar循环找到当前正在使用的Connection数量和未使用的Connection数量,还有最大空闲时间和最大空闲时间的 Connection,判断当前Connection是否正在使用通过方法pruneAndGetAllocationCount返回的 Connection分配的流StreamAllocation的计数是否大于0。

找到的最大空闲时间跟默认的最大空闲时间比较,如果大于的话,表示当前Connection已经超过了最大的空闲时间,或者空闲连接数大于大于设定的空闲连接数, 应该被回收,所以我们看到它会被remove掉:

/**
       * keepAliveDurationNs 持续连接时间
       * maxIdleConnections 最大的空闲连接数
       */
      /**
       * 如果最大空闲时间超过每个连接存活的最大时间
       * 或者
       * 空闲连接数大于设定的空闲连接数
       * 则删除空闲最久的连接
       */
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        connections.remove(longestIdleConnection);
        /**
         * 如果空闲连接数大于0,则计算空闲时间最长的连接离最大空闲时间还需要的时间,并返回此时间
         */
      }

如果当前已经有了空闲的Connection,那么就会keepAliveDurationNs - longestIdleDurationNs,表示当前的cleanupRunnable还需要睡眠keepAliveDurationNs - longestIdleDurationNs时间,至于为什么是这么多,大家可以想一想,这里就不做过多的解释。
  其次,就是如果当前还有正在使用的Connection,那么当前的cleanupRunnable还需要沉睡keepAliveDurationNs。这个上面的意思差不多。
  如果当前既没有空闲的Connection,又没有正在使用的Connection,那么表示当前的ConnectionPool已经空了,可以被回收了。

下面在来看看pruneAndGetAllocationCount方法:

  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);
      /**
       * 当所有的弱引用不为空则继续执行循环,不执行下面代码
       */
      if (reference.get() != null) {
        i++;
        continue;
      }
      /**
       * 当发现有弱引用为null
       */
      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
      /**
       * 删除弱引用
       */
      references.remove(i);
      /**
       * 设置成连接无法在此基础上创建新流
       */
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }
}

遍历StreamAllocation 弱引用链表allocations,移除为空的引用,遍历结束后返回链表中弱引用的数量 ,StreamAllocation 在列表中的数量就是就是物理socket被引用的次数

StreamAllocation被高层反复执行aquirerelease。这两个函数在执行过程中其实是在一直在改变Connection中的 List<WeakReference<StreamAllocation>> allocations大小。

1:   当allocations保存分配的流StreamAllocation计数为0时,则返回0;

2:当allocations保存分配的流StreamAllocation计数为大于0;且引用的StreamAllocation不为null。则返回分配流的个数。

3:当 allocations保存分配的流StreamAllocation计数为大于0; 且存在引用的StreamAllocation为null。则删除弱引用,判断删除后弱引用是否为null。为空则设置connection.idleAtNanos = now - keepAliveDurationNs,会从连接池删除此Connection。

清理闲置连接的核心主要是引用计数器List<Reference<StreamAllocation>> 和 选择排序的算法以及excutor的清理线程池。

 下面还将分析StreamAllocation和RealConnection

OkHttp源码分析 ConnecionPoolOkHttp源码分析 ConnecionPool finley_feng 发布了20 篇原创文章 · 获赞 1 · 访问量 336 私信 关注
上一篇:常见的transformation 和 Action


下一篇:jmeter连接配置带跳板机(SSH)的mysql服务器