OkHttp的源码分析(五)——连接拦截器ConnectInterceptor分析

我们知道,OkHttp是通过Socket连接的,但是到现在为止,我们还没看到和Socket连接相关的操作,本节内容主要就是为了看OkHttp是如何通过ConnectInterceptor做Socket连接的

目录

一、主流程

二、核心代码

三、流程梳理

四、总结


一、主流程

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    //第一行
    val realChain = chain as RealInterceptorChain
    //第二行
    val exchange = realChain.call.initExchange(chain)
    //第三行
    val connectedChain = realChain.copy(exchange = exchange)
    //第四行
    return connectedChain.proceed(realChain.request)
  }
}

看到这个ConnectInterceptor的代码,是不是很简洁,但是并不简单

第一行:获得责任链,这个操作在其他的拦截器中也是有的

第二行:将责任链作为参数,通过RealCall的initExchange获得Exchange对象,这个Exchange对象是干嘛的?好神秘,字面意思也不好理解,我们看下Exchange的注释

/**
 * Transmits a single HTTP request and a response pair. This layers connection management and events
 * on [ExchangeCodec], which handles the actual I/O.
 */

大致翻译:传输单个 HTTP 请求和响应对。这层的连接管理和事件都在ExchangeCodec,它是处理实际 I/O 的

我们可以知道Exchange是通过ExchangeCodec负责管理连接的

第三行:将Exchange对象作为参数传入责任链中,其他参数拷贝,获取新的责任链

第四行:通过新的责任链返回响应数据

通过上面代码,我们知道,核心步骤是第二行的Exchange,接下来,我们以Exchange为入口

二、核心代码

先来看initExchange这里,这里负责创建或者在连接池里面找到一个可用的连接

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released" }
      check(!responseBodyOpen)
      check(!requestBodyOpen)
    }
    //在RetryAndFollowUpInterceptor中创建的,前面章节有提到,这里在RealCall中用到
    val exchangeFinder = this.exchangeFinder!!
    //核心代码在这里
    //find方法,返回ExchangeCodec,负责请求和响应的编解码
    //有两个具体的实现,分别是Http2ExchangeCodec和Http1ExchangeCodec
    //Http2ExchangeCodec是支持HTTP/2的socket连接,Http1ExchangeCodec是支持HTTP/1.1的            
    //soceket连接
    val codec = exchangeFinder.find(client, chain)
    //创建Exchange,Exchange是管理连接的角色,通过ExchangeCodec处理I/O操作
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
  }

上面关键代码已经注释,我主要关注socket连接部分,看下find方法

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

通过findHealthyConnection方法,创建RealConnection,在RealConnection中完成socket连接,tls连接等操作,我们先来看findHealthyConnection方法

findHealthyConnection方法为了找到一个可用的连接,是一个死循环,有两种结果:要么找到一个可用的连接被找到,要么就抛出一个异常

private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Confirm that the connection is good.
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

      // If it isn't, take it out of the pool.
      candidate.noNewExchanges()

      // Make sure we have some routes left to try. One example where we may exhaust all the routes
      // would happen if we made a new connection and it immediately is detected as unhealthy.
      if (nextRouteToTry != null) continue

      val routesLeft = routeSelection?.hasNext() ?: true
      if (routesLeft) continue

      val routesSelectionLeft = routeSelector?.hasNext() ?: true
      if (routesSelectionLeft) continue

      throw IOException("exhausted all routes")
    }
  }

重点看下里面的findConnection方法

private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    if (call.isCanceled()) throw IOException("Canceled")

    // Attempt to reuse the connection from the call.
    //1
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      // If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
      // because we already acquired it.
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      // The call's connection was released.
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    // We need a new connection. Give it fresh stats.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // Attempt to get a connection from the pool.
    //2
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    // Nothing in the pool. Figure out what route we'll try next.
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      // Use a route from a preceding coalesced connection.
      //3
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      // Use a route from an existing route selection.
      //4
      routes = null
      route = routeSelection!!.next()
    } else {
      // Compute a new route selection. This is a blocking operation!
      //5 第一次执行走这个分支
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. We have a better chance of matching thanks to connection coalescing.
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    // Connect. Tell the call about the connecting call so async cancels work.
    //6
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    // If we raced another call connecting to this host, coalesce the connections. This makes for 3
    // different lookups in the connection pool!
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }
  1. 先检查上一次连接是否可用,可用直接返回
  2. 上一次连接不可用,从连接池ConnectionPool中查找,看是否有可用连接
  3. 如果 ConnectionPool 中没有, 使用来自先前合并的连接中查找一个路由
  4. 如果3步骤中的路由找不到,则使用现有的routeSelection中的路由
  5. 如果4步骤中,也没有找到可用的路由,则通过RouteSelector得到一个路由集合routes,再次通过IP地址以及路由集合,查找可用连接,找到则返回,找不到,则通过localRouteSelection返回一个路由
  6. 通过3/4/5中某一步返回的路由,创建一个新的连接RealConnection(connectionPool, route),新连接通过connect方法,完成tunnel隧道建立,socket连接,TLS握手、加密等操作

这个过程涉及到了RouteSelector,Selection,Route这三个类,我们先看下他们之间的关系

OkHttp的源码分析(五)——连接拦截器ConnectInterceptor分析

 

先看下RouteSelector的类注释

/**
 * Selects routes to connect to an origin server. Each connection requires a choice of proxy server,
 * IP address, and TLS mode. Connections may also be recycled.
 */

翻译:选择连接到源服务器的路由。每个连接都需要选择代理服务器, IP 地址和 TLS 模式。连接也可以被回收。

那就很清晰了,是负责选择路由的

Selection可以简单理解一个router路由的集合,上面代码:

val localRouteSelection = localRouteSelector.next()

返回的就是RouteSelection,那接下来就是核心的Route类了,我们继续看一下

class Route(
  @get:JvmName("address") val address: Address,
  /**
   * Returns the [Proxy] of this route.
   *
   * **Warning:** This may disagree with [Address.proxy] when it is null. When
   * the address's proxy is null, the proxy selector is used.
   */
  @get:JvmName("proxy") val proxy: Proxy,
  @get:JvmName("socketAddress") val socketAddress: InetSocketAddress
) {
//省略
}

可以看出,Route是把address,proxy,soketAddress的封装,Selection是List<Route>的一个迭代器,负责遍历route。

RouteSelector的调用路径:

RouteSelector-->>nextProxy-->>resetNextInetSocketAddres

在resetNextInetSocketAddres中,最终获取的ip地址,是通过调用address.dns.lookup,dns是client.dns,是OkHttpclient的构建过程中传递进来Dns.System,在lookup方法里面,调用InetAddress.getAllByName 方法获取到对应域名的IP,也就是默认的Dns实现

val addresses = address.dns.lookup(socketHost)

到这里,完成了DNS解析,也找到了RealConnection,接下来就是socket连接了

直接看RealConnection的connect方法

省略前面
    while (true) {
      try {
        //如果此路由通过 HTTP 代理隧道传输 HTTPS,则返回 true
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break
          }
        } else {
          //socket连接
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        }
        //各种协议连接在这一步完成;https和非https
        //https会走TLS协议连接,握手成功后,如果支持HTTP/2,优先走HTTP2;
        //非Https优先HTTP/2 ,不支持的话,走HTTP/1
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
        break
      } catch (e: IOException) {
        //省略
      }
    }

这里涉及到一个隧道建立的问题,可以看到在connectTunnel方法,这里先判断是否走隧道连接,如果不需要,就走socket连接,并不是上来就是socket连接;隧道连接的意义就是代理服务器只能转发,不能篡改数据,反之,则相反;

基本流程就到这里了,下面通过一个图来梳理下

三、流程梳理

OkHttp的源码分析(五)——连接拦截器ConnectInterceptor分析

 以上就是ConnectInterceptor的时序图,涉及到的一些核心类 ;在RealConnection通过newCodec返回ExchangeCodec对象,这个类有两个实现,一个是Http1ExchangeCodec,即HTTP/1.1协议,一个是Http2Exchangecodec,即HTTP/2协议

四、总结

  • ConnectInterceptor涉及DNS解析,TLS连接,socket连接几个核心连接过程
  • DNS解析在RouteSelector查找route过程中,通过address的dns来查找IP地址,而address的dns是在构建client的时候传进来Dns.System
  • 建立socket连接和隧道连接要经过判断,不是执行connect方法就是建立了socket连接
  • RealConnection.connectTls是TLS连接,涉及到握手、证书校验等

上一篇:Who is the Champion


下一篇:建立VLAN虚拟局域网