在分析拦截器ConnectInterceptor时,我也顺带分析过连接池,但是当时并没有特别清楚okthttp的连接池,经过这几天在探究,我打算在梳理一下okhttp的连接池ConnectionPool。
一:创建ConnecionPool及初始化
首先我们来看ConnectionPool的创建的地方。在OkHttpClient.Builder的构造方法里面,对ConnectionPool进行了初始化。
public static final class Builder {
......
public Builder() {
......
/**
* 连接池
*/
connectionPool = new ConnectionPool();
......
}
从这里可以想到,ConnectionPool是与OkHttpClient绑定的
。
再来看ConnectionPool的构造函数及成员变量
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
* 后台线程用于清理过期的连接。 每个连接池最多只能运行一个线程。 线程池执行器允许池本身被垃圾收集
* 注意executor是静态的,也就是说它是ConnectionPool所有的对象公用的
*/
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
/**
* 最大的空闲连接数
*/
private final int maxIdleConnections;
/**
* 每个空闲连接的存活时间的纳秒数
*/
private final long keepAliveDurationNs;
/**
* 连接池中维护了一个双端队列Deque来保存可复用连接RealConnection
*/
private final Deque<RealConnection> connections = new ArrayDeque<>();
/**
* 连接地址要避免的失败路由黑名单
*/
final RouteDatabase routeDatabase = new RouteDatabase();
//是否是清理过期连接的标记位
boolean cleanupRunning;
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
/**
* 默认最大连接数是5个
* 默认每个连接的存活时间为5分钟
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
- 先看构造函数 创建了一个默认最大连接数是5个,最大连接存活时间为5分钟
- 成员变量 Executor executor 注意executor 是一个静态的变量,也就是此线程池是所有的ConnectionPool对象共用的,exector线程池用于清理无用的连接。配合cleanupRunning标志,保证每个ConnectionPool对象最多只有一个清理无用连接的线程在运行。
- connections连接队列 连接池维护的一个队列,用于保存可复用的RealConnection连接
- macIdleConnections 最大的空闲连接数
- keepAliveDurationNs 每个空闲连接存活的最长时间
在OkHttpClient创建时,就创建了
ConnectionPool实例对象。且已经和OkHttpClient绑定。
二:ConnectionPool的get方法
get方法是从连接池里面获取一个连接供使用。
/**
* Returns a recycled connection to {@code address}, or null if no such connection exists. The
* route is null if the address has not yet been routed.
* 通过遍历connections(用于存放用于存放RealConnection的ArrayDeque队列),
* 调用RealConnection的isEligible()方法判断其是否可用,如果可用就会调用streamAllocation的acquire()方法,
* 并返回connection。
*/
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
/**
* isEligible方法内部主要通过判断当前Connection拥有的StreamAllocation是否超过的限制,或者
* 当前Connection是否不允许分配stream等等途径,进而判断当前Connection是否有效。
*/
if (connection.isEligible(address, route)) {
//调用acquire,将StreamAllocation的弱引用添加到RealConnection的allocations中
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
get方法主要是遍历connections,找到一个合适的连接,判断一个连接是否可以使用,通过isEligible方法实现。isEligible方法将在RealConnection中详细介绍。
找到合适的连接会通过StreamAllocation的acquire方法,将RealConnection和StreamAllocation绑定。然后返回这个合适的连接。
如果没有找到则返回null。
下面看看StreamAllocation的acquire方法
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
//将RealConnection赋值成员变量connection.表示当前的连接分配给当前的流
this.connection = connection;
this.reportedAcquired = reportedAcquired;
//创建StreamAllocationReference对象并添加到allocations集合中
// StreamAllocationReference是用于保存StreamAllocation的弱引用
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
在acquire方法里面将RealConnection赋值成员变量connection.表示当前的连接分配给当前的流。并且创建StreamAllocationReference对象并添加到allocations集合中。
每一个RealConnection对象在于StreamAllocation对象绑定时,会将StreamAllocation的弱引用添加到allocations集合中,通过allocations的大小,可以知道Connection拥有的StreamAllocation数。
通过get方法我们可以查询当前连接池是否有可用的RealConnection。如果有会调用StreamAllocation的acquire方法,将RealConnection赋值给StreamAllocation的成员变量connection,并且会将StreamAllocation的弱引用添加到allocations集合中,这样就将连接RealConnection分配给流streamAllocation,建立绑定关系。如果没有找到则返回null。
三:ConnectionPool的put方法
put方法是将RealConnection对象存入连接池
/**
* put()方法在将连接添加到连接池之前,会先执行清理任务,通过判断cleanupRunning是否在执行,
* 如果当前清理任务没有执行,则更改cleanupRunning标识,并执行清理任务cleanupRunnable。
*
* @param connection
*/
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
/**
* cleanupRunning 清理是否在执行标志
*/
if (!cleanupRunning) {
cleanupRunning = true;
// executor是线程池,此处异步执行清理任务cleanupRunnable
executor.execute(cleanupRunnable);
}
//将连接添加到连接池
connections.add(connection);
}
put方法直接将Connection对象添加到connections队列。不过这里需要注意,当cleanupRunning为false,就会向线程池添加一个cleanuoRunnable,进行清理操作。清理操作下面会详细介绍。
四:清理无用的连接
在put方法中我们只是清理工作是由cleanupRunning来完成的,我们来具体看看cleanupRunning
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
//循环调用cleanup进行清除过期连接的任务
while (true) {
//cleanup方法对连接池进行清理,返回进行下次清理的间隔时间
long waitNanos = cleanup(System.nanoTime());
//如果返回的数据间隔为-1,则会接受循环
if (waitNanos == -1) return;
//如果大于1会调用wait进行等待
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
//进行等待
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
cleanupRunning是一个死循环的任务,只要cleanup方法不返回-1,就会一直执行。
当cleanup方法没有返回-1.当前的Runnable就会进入睡眠状态。真正的操作是在cleanup方法里。
下面来看cleanup方法:
long cleanup(long now) {
// 正在使用的连接数
int inUseConnectionCount = 0;
// 空闲的连接数
int idleConnectionCount = 0;
// 空闲时间最长的连接
RealConnection longestIdleConnection = null;
// 最大的空闲时间,初始化为Long的最小值,用于记录所有空闲连接中空闲最久的时间
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
// 遍历所有的连接,标记不活跃的连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 查询此连接内部的StreanAllocation的引用数量
// 如果遍历到的连接正在使用,则跳过,continue继续遍历下一个
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;//正在使用的连接数
continue;
}
// 当前连接处于空闲,空闲连接数++
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
// 计算空闲时间
long idleDurationNs = now - connection.idleAtNanos;
// 空闲时间如果超过最大空闲时间
if (idleDurationNs > longestIdleDurationNs) {
// 重新赋值最大空闲时间
longestIdleDurationNs = idleDurationNs;
// 赋值空闲最久的连接
longestIdleConnection = connection;
}
}
/**
* keepAliveDurationNs 持续连接时间
* maxIdleConnections 最大的空闲连接数
*/
/**
* 如果最大空闲时间超过每个连接存活的最大时间
* 或者
* 空闲连接数大于设定的空闲连接数
* 则删除空闲最久的连接
*/
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
/**
* 如果空闲连接数大于0,则计算空闲时间最长的连接离最大空闲时间还需要的时间,并返回此时间
*/
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
/**
* 如果以上都不满足则判断正在使用连接是否大于0 ,是则返回设定的最长连接时间
*/
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
/**
* 以上都不是则返回-1,并将cleanupRunning置为false
*/
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
首先通过foar循环找到当前正在使用的Connection
数量和未使用的Connection
数量,还有最大空闲时间和最大空闲时间的 Connection,判断当前Connection是否正在使用通过方法pruneAndGetAllocationCount返回的 Connection分配的流StreamAllocation的计数是否大于0。
找到的最大空闲时间跟默认的最大空闲时间比较,如果大于的话,表示当前Connection
已经超过了最大的空闲时间,或者空闲连接数大于大于设定的空闲连接数, 应该被回收,所以我们看到它会被remove掉:
/**
* keepAliveDurationNs 持续连接时间
* maxIdleConnections 最大的空闲连接数
*/
/**
* 如果最大空闲时间超过每个连接存活的最大时间
* 或者
* 空闲连接数大于设定的空闲连接数
* 则删除空闲最久的连接
*/
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
/**
* 如果空闲连接数大于0,则计算空闲时间最长的连接离最大空闲时间还需要的时间,并返回此时间
*/
}
如果当前已经有了空闲的Connection
,那么就会keepAliveDurationNs - longestIdleDurationNs
,表示当前的cleanupRunnable
还需要睡眠keepAliveDurationNs - longestIdleDurationNs
时间,至于为什么是这么多,大家可以想一想,这里就不做过多的解释。
其次,就是如果当前还有正在使用的Connection
,那么当前的cleanupRunnable
还需要沉睡keepAliveDurationNs
。这个上面的意思差不多。
如果当前既没有空闲的Connection
,又没有正在使用的Connection
,那么表示当前的ConnectionPool
已经空了,可以被回收了。
下面在来看看pruneAndGetAllocationCount方法:
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
/**
* 当所有的弱引用不为空则继续执行循环,不执行下面代码
*/
if (reference.get() != null) {
i++;
continue;
}
/**
* 当发现有弱引用为null
*/
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
/**
* 删除弱引用
*/
references.remove(i);
/**
* 设置成连接无法在此基础上创建新流
*/
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
}
遍历StreamAllocation
弱引用链表allocations,移除为空的引用,遍历结束后返回链表中弱引用的数量 ,StreamAllocation
在列表中的数量就是就是物理socket被引用的次数
StreamAllocation
被高层反复执行aquire
与release
。这两个函数在执行过程中其实是在一直在改变Connection
中的 List<WeakReference<StreamAllocation>>
allocations大小。
1: 当allocations保存分配的流StreamAllocation计数为0时,则返回0;
2:当allocations保存分配的流StreamAllocation计数为大于0;且引用的StreamAllocation不为null。则返回分配流的个数。
3:当 allocations保存分配的流StreamAllocation计数为大于0; 且存在引用的StreamAllocation为null。则删除弱引用,判断删除后弱引用是否为null。为空则设置connection.idleAtNanos = now - keepAliveDurationNs,会从连接池删除此Connection。
清理闲置连接的核心主要是引用计数器
List<Reference<StreamAllocation>>
和选择排序的算法
以及excutor
的清理线程池。
下面还将分析StreamAllocation和RealConnection
finley_feng 发布了20 篇原创文章 · 获赞 1 · 访问量 336 私信 关注