我们知道,OkHttp是通过Socket连接的,但是到现在为止,我们还没看到和Socket连接相关的操作,本节内容主要就是为了看OkHttp是如何通过ConnectInterceptor做Socket连接的
目录
一、主流程
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//第一行
val realChain = chain as RealInterceptorChain
//第二行
val exchange = realChain.call.initExchange(chain)
//第三行
val connectedChain = realChain.copy(exchange = exchange)
//第四行
return connectedChain.proceed(realChain.request)
}
}
看到这个ConnectInterceptor的代码,是不是很简洁,但是并不简单
第一行:获得责任链,这个操作在其他的拦截器中也是有的
第二行:将责任链作为参数,通过RealCall的initExchange获得Exchange对象,这个Exchange对象是干嘛的?好神秘,字面意思也不好理解,我们看下Exchange的注释
/** * Transmits a single HTTP request and a response pair. This layers connection management and events * on [ExchangeCodec], which handles the actual I/O. */
大致翻译:传输单个 HTTP 请求和响应对。这层的连接管理和事件都在ExchangeCodec,它是处理实际 I/O 的
我们可以知道Exchange是通过ExchangeCodec负责管理连接的
第三行:将Exchange对象作为参数传入责任链中,其他参数拷贝,获取新的责任链
第四行:通过新的责任链返回响应数据
通过上面代码,我们知道,核心步骤是第二行的Exchange,接下来,我们以Exchange为入口
二、核心代码
先来看initExchange这里,这里负责创建或者在连接池里面找到一个可用的连接
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
//在RetryAndFollowUpInterceptor中创建的,前面章节有提到,这里在RealCall中用到
val exchangeFinder = this.exchangeFinder!!
//核心代码在这里
//find方法,返回ExchangeCodec,负责请求和响应的编解码
//有两个具体的实现,分别是Http2ExchangeCodec和Http1ExchangeCodec
//Http2ExchangeCodec是支持HTTP/2的socket连接,Http1ExchangeCodec是支持HTTP/1.1的
//soceket连接
val codec = exchangeFinder.find(client, chain)
//创建Exchange,Exchange是管理连接的角色,通过ExchangeCodec处理I/O操作
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
上面关键代码已经注释,我主要关注socket连接部分,看下find方法
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
通过findHealthyConnection方法,创建RealConnection,在RealConnection中完成socket连接,tls连接等操作,我们先来看findHealthyConnection方法
findHealthyConnection方法为了找到一个可用的连接,是一个死循环,有两种结果:要么找到一个可用的连接被找到,要么就抛出一个异常
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
// If it isn't, take it out of the pool.
candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")
}
}
重点看下里面的findConnection方法
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// Attempt to reuse the connection from the call.
//1
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null) {
check(toClose == null)
return callConnection
}
// The call's connection was released.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
//2
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
//3
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
//4
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
//5 第一次执行走这个分支
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// Connect. Tell the call about the connecting call so async cancels work.
//6
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
- 先检查上一次连接是否可用,可用直接返回
- 上一次连接不可用,从连接池
ConnectionPool
中查找,看是否有可用连接 - 如果
ConnectionPool
中没有, 使用来自先前合并的连接中查找一个路由 - 如果3步骤中的路由找不到,则使用现有的routeSelection中的路由
- 如果4步骤中,也没有找到可用的路由,则通过RouteSelector得到一个路由集合routes,再次通过IP地址以及路由集合,查找可用连接,找到则返回,找不到,则通过localRouteSelection返回一个路由
- 通过3/4/5中某一步返回的路由,创建一个新的连接RealConnection(connectionPool, route),新连接通过connect方法,完成tunnel隧道建立,socket连接,TLS握手、加密等操作
这个过程涉及到了RouteSelector,Selection,Route这三个类,我们先看下他们之间的关系
先看下RouteSelector的类注释
/** * Selects routes to connect to an origin server. Each connection requires a choice of proxy server, * IP address, and TLS mode. Connections may also be recycled. */
翻译:选择连接到源服务器的路由。每个连接都需要选择代理服务器, IP 地址和 TLS 模式。连接也可以被回收。
那就很清晰了,是负责选择路由的
Selection可以简单理解一个router路由的集合,上面代码:
val localRouteSelection = localRouteSelector.next()
返回的就是RouteSelection,那接下来就是核心的Route类了,我们继续看一下
class Route(
@get:JvmName("address") val address: Address,
/**
* Returns the [Proxy] of this route.
*
* **Warning:** This may disagree with [Address.proxy] when it is null. When
* the address's proxy is null, the proxy selector is used.
*/
@get:JvmName("proxy") val proxy: Proxy,
@get:JvmName("socketAddress") val socketAddress: InetSocketAddress
) {
//省略
}
可以看出,Route是把address,proxy,soketAddress的封装,Selection是List<Route>的一个迭代器,负责遍历route。
RouteSelector的调用路径:
RouteSelector-->>nextProxy-->>resetNextInetSocketAddres
在resetNextInetSocketAddres中,最终获取的ip地址,是通过调用address.dns.lookup,dns是client.dns,是OkHttpclient的构建过程中传递进来Dns.System,在lookup方法里面,调用InetAddress.getAllByName 方法获取到对应域名的IP,也就是默认的Dns实现
val addresses = address.dns.lookup(socketHost)
到这里,完成了DNS解析,也找到了RealConnection,接下来就是socket连接了
直接看RealConnection的connect方法
省略前面
while (true) {
try {
//如果此路由通过 HTTP 代理隧道传输 HTTPS,则返回 true
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
//socket连接
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
//各种协议连接在这一步完成;https和非https
//https会走TLS协议连接,握手成功后,如果支持HTTP/2,优先走HTTP2;
//非Https优先HTTP/2 ,不支持的话,走HTTP/1
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
//省略
}
}
这里涉及到一个隧道建立的问题,可以看到在connectTunnel方法,这里先判断是否走隧道连接,如果不需要,就走socket连接,并不是上来就是socket连接;隧道连接的意义就是代理服务器只能转发,不能篡改数据,反之,则相反;
基本流程就到这里了,下面通过一个图来梳理下
三、流程梳理
以上就是ConnectInterceptor的时序图,涉及到的一些核心类 ;在RealConnection通过newCodec返回ExchangeCodec对象,这个类有两个实现,一个是Http1ExchangeCodec,即HTTP/1.1协议,一个是Http2Exchangecodec,即HTTP/2协议
四、总结
- ConnectInterceptor涉及DNS解析,TLS连接,socket连接几个核心连接过程
- DNS解析在RouteSelector查找route过程中,通过address的dns来查找IP地址,而address的dns是在构建client的时候传进来Dns.System
- 建立socket连接和隧道连接要经过判断,不是执行connect方法就是建立了socket连接
-
RealConnection.connectTls是TLS连接,涉及到握手、证书校验等