万字长文带你拥抱OkHttp源码

文章目录

什么是Okhttp?

  • Okhttp是由Squera公司基于Http协议封装开发的一个网络请求框架,是目前使用最多的网络框架之一,用于替代HttpUrlConnection,它比起原生的HttpURLConnection,是有过之而无不及,OkHttp现在已经称为广大Android开发者首选的网络通信库。

Okhttp有自己的官网:OKHttp官网

如果想了解源码也可以在GitHub上下载。地址是:https://github.com/square/okhttp

Okhttp的使用流程

步骤一:在gradle中添加Okhttp的依赖:

implementation 'com.squareup.okhttp3:okhttp:5.0.0-alpha.2'

步骤二:创建一个OkHttpClient对象实例client:

OkHttpClient okHttpClient = new OkHttpClient.Builder().build();

步骤三:建立一个请求对象Request:

Request request = new Request.Builder()
                .url("https://www.baidu.com/")
                .build();

步骤四:调用client的newCall()方法传入request建立一个call对象并且调用call()对象的execute或者enqueue方法进行网络请求:

步骤五:对于返回的response进行解析:

//步骤四
okHttpClient.newCall(request).enqueue(new okhttp3.Callback() {
    //步骤五
    @Override
    public void onFailure(okhttp3.Call call, IOException e) {

    }
	//步骤五
    @Override
    public void onResponse(okhttp3.Call call, okhttp3.Response response) throws IOException {

    }
});
  • 上面介绍的是最简单的一个Okhttp的请求建立方式,如果想了解更全面的Okhttp使用方式,请跳转到Okhttp官网:OKHttp官网

说完使用方式,我们就要开始对Okhttp的源码开始分析了。

Okhttp的请求流程源码分析:

在学习Okhttp的请求流程源码之前,我们要先了解一个类,就是OkHttpClient()类。

OkHttpClient()类

  • 在OkHttp的使用当中,第一步我们就创建了OkHttpClient对象,OkHttpClient类中包含了很多属性,这些属性是我们在建立Http连接,TCP连接中需要用到的属性,比如:进程调度器,代理,默认支持的Http协议版本等等…下面我们就看一下OkhttpCilent中包含了哪些属性:
public static final class Builder {
    Dispatcher dispatcher;//分发器
    @Nullable Proxy proxy;//代理
    List<Protocol> protocols;//应用层协议
    List<ConnectionSpec> connectionSpecs;//传输层协议
    final List<Interceptor> interceptors = new ArrayList<>();//应用拦截器
    final List<Interceptor> networkInterceptors = new ArrayList<>();//网络拦截器
    EventListener.Factory eventListenerFactory;//http请求回调监听
    ProxySelector proxySelector;//代理选择
    CookieJar cookieJar;//cookie
    @Nullable Cache cache;//网络缓存
    @Nullable InternalCache internalCache;//内部缓存
    SocketFactory socketFactory;//socket 工厂
    @Nullable SSLSocketFactory sslSocketFactory;//安全套接层socket 工厂,用于HTTPS
    @Nullable CertificateChainCleaner certificateChainCleaner;//验证确认响应证书,适用 HTTPS 请求连接的主机名
    HostnameVerifier hostnameVerifier;//主机名字确认
    CertificatePinner certificatePinner;//证书链
    Authenticator proxyAuthenticator;//代理身份验证
    Authenticator authenticator;//本地身份验证
    ConnectionPool connectionPool;//连接池,复用连接
    Dns dns;//域名
    boolean followSslRedirects;//安全套接层重定向
    boolean followRedirects;//本地重定向
    boolean retryOnConnectionFailure;//错误重连
    int callTimeout;//请求超时,它包括dns解析、connect、read、write和服务器处理的时间
    int connectTimeout;//connect超时
    int readTimeout;//read超时
    int writeTimeout;//write超时
    int pingInterval;//ping超时

    //这里是配置默认的参数
    public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;//Protocol.HTTP_2和Protocol.HTTP_1_1
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }

    //这里通过另外一个OkHttpClient配置参数
    Builder(OkHttpClient okHttpClient) {
      this.dispatcher = okHttpClient.dispatcher;
      this.proxy = okHttpClient.proxy;
      this.protocols = okHttpClient.protocols;
      //...
    }
    
    //...
    
    //配置完参数后,通过Builder的参数创建一个OkHttpClient
    public OkHttpClient build() {
        return new OkHttpClient(this);
    }
}

在创建OkHttpClient实例时,有两种创建方式。一种是new出来实例,一种是使用建造者模式来创建实例。如下:

OkHttpClient okHttpClient1 = new OkHttpClient();
OkHttpClient okHttpClient = new OkHttpClient.Builder()
        .build();

这两种的方式就是:直接new出一个实例会进行OkHttpClient中属性的默认设置,而建造者模式可以对单独的属性进行设置。

  • 说完了OkHttp类,下面就直接说请求流程的源码。

异步请求流程

异步请求的代码示例

先上一段异步请求的代码:

OkHttpClient okHttpClient = new OkHttpClient.Builder()
                .build();
Request request = new Request.Builder()
        .url("https://www.baidu.com")
        .build();
okHttpClient.newCall(request).enqueue(new okhttp3.Callback() {
            @Override
            public void onFailure(@NotNull okhttp3.Call call, @NotNull IOException e) {

            }

            @Override
            public void onResponse(@NotNull okhttp3.Call call, @NotNull okhttp3.Response response) throws IOException {

            }
        });
  • 看上面的代码,我们对异步请求的步骤进行分析。

RealCall.enqueue

首先enqueue是call接口中的一个方法,所以newCall方法中,会创建一个Call的实现类RealCall,然后执行realCall中的enqueue方法,我们先看一下这个realCall中的enqueue方法:

override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

callStart()

这里面有两行主要代码,第一行是callStart方法:

private fun callStart() {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)
  }

在callStart方法中,第一行是跟踪程序的错误,在出错的时候去做错误分析。第二行是一个回调功能,eventListener是一个监听器,会监听各种事件。所以callStart这个方法就是会监听各种事件并给回调一个反馈,让我们解决。

dispatcher.enqueue()

第二行就是client.dispatcher.enqueue(AsyncCall(responseCallback))方法,再说这个之前,我们要先了解什么是dispatcher?

下面是Dispatcher类的部分定义:

public final class Dispatcher {
  private int maxRequests = 64;
  //每个主机的最大请求数,如果超过这个数,那么新的请求就会被放入到readyAsyncCalls队列中
  private int maxRequestsPerHost = 5;
  //是Dispatcher中请求数量为0时的回调,这儿的请求包含同步请求和异步请求,该参数默认为null。 
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  //任务队列线程池
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  //待执行异步任务队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  //运行中的异步任务队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //运行中同步任务队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }
}

  • Dispatcher是用来做线程调度的,它的内部用的是ExecutorService进行线程管理,并且Dispatcher中有maxRequests和maxRequestsPerHost,maxRequests是最大承受请求同时进行的数量,如果已经达到这个请求数量,则下一个请求就要排队等候,maxRequestsPerHost是某个主机同时最多能有几个请求。

接下来了解了Dispatcher,我们就要正式来看dispatcher.enqueue(AsyncCall(responseCallback))做了些什么:

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

这个enqueue方法当中主要有两行操作,readyAsyncCalls.add(call)和promoteAndExecute()

其中readyAsyncCalls.add(call),是把异步请求放进readyAsyncCalls这个双端队列中,这个队列中放的是准备执行但是还没执行的请求,然后会调用下面这个promoteAndExecute()方法去执行这请求。

promoteAndExecute()

promoteAndExecute()方法的源码如下:

  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    //第一部分:选出符合条件的call请求
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
	//第二部分:遍历请求执行
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }
  • 这个方法中的代码分成两个for循环:

第一个for循环,把符合条件的异步请求任务从readyAsyncCalls转移到runningAsyncCalls队列和添加到executableCalls列表中。然后进行第二个for循环,就是遍历executableCalls列表,从executableCalls列表中获取AsyncCall对象,并且调用它的executeOn()方法,并在里面传进去一个Dispatcher的executeService对象

asyncCall.executeOn(executorService)

下面我们看asyncCall.executeOn(executorService)方法中做了什么:

    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        //......
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

这里面重要的一句代码就是executorService.execute(this),这一行代码是异步请求的关键。我们刚才说了executorService是Dispatcher调度器内部真正进行管理的东西,也就是说,在这行代码开始,就已经进入了另一个线程。这个方法中传入的是this,也就是AsyncCall,意味着在这个AsyncCall中实现了Runnable接口,我们看一下他自己的run函数干了什么,也就知道了okhttp异步请求的后台线程干了什么:

AsyncCall.run()

AsyncCall的run方法:

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          //获取响应  
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          //回调    
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            //回调  
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            //......
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

在这个run方法中,调用了getResponseWithInterceptorChain()得到响应Response,然后我们调用responseCallback.onResponse(this@RealCall, response)或者responseCallback.onFailure(this@RealCall, e)回调到主线程,完成请求进行相应的操作。

client.dispatcher.finished(this)

最后还要执行asyncCall.executeOn方法中finally的client.dispatcher.finished(this)方法:

private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
        //尝试移除队列中的同步请求任务
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }
	//紧接着调用promoteAndExecute()方法进行异步任务的调度,如果没有异步任务要进行,promoteAndExecute()返回false
    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

finished()方法中首先尝试从runningSyncCalls队列把刚才通过 executed()入队的同步任务RealCall移除,如果移除失败,就抛出异常,如果移除成功,就紧接着调用promoteAndExecute()方法进行其他响应异步任务的调度。

小结

到这里okhttp的异步请求就分析完成了,在这里总结一下。我们首先是调用call.enqueue(callback)去进行发起异步请求,由于enqueue是call接口中的方法,所以我们实际执行的是call的实现类RealCall的enqueue方法,这个比同步请求多了一个callback参数,然后RealCall中执行了callStart方法来进行对网络请求事件的监听和回调,并把callback参数包装成一个AsyncCall传入dispatcher的enqueue方法;在dispatcher的enqueue中,先把这个请求保存到readyAsyncCalls队列中,成为一个准备运行但是并未运行的任务,然后调用promoteAndExecute方法进行异步任务的调度;在promoteAndExecute方法中,两层for循环,第一层for循环寻找出符合要求的异步请求任务并放到runningAsyncCalls队列和添加到executableCalls列表中,第二个for循环遍历executableCalls列表,并对每个任务执行asyncCall.executeOn(executorService)方法,这个方法传入了实质进行线程调度的executorService对象,所以在这个executedOn中会调用executorService.execute(this)方法,来把线程切换到后台线程;由于传的参数是this,所以在AsyncCall中就实现了Runnable,也就是重写了run方法,所以调用的也就是当前AsyncCall中的run方法来进行后台请求任务;在这个run中,也是调用了getResponseWithInterceptorChain()方法来进行网络请求,返回服务器的响应,然后回调到主线程的onResponse或者onFailure。这样就完成了一次okhttp的异步请求。

同步请求流程

同步请求的代码示例

先上一段同步请求的代码:

OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
                .url("https://www.baidu.com/")
                .build();
try {
    //接收到回复Response
    Response response = okHttpclient.newCall(request).execute();
    Log.d(TAG, response.body().string());
} catch (IOException e) {
    e.printStackTrace();
}

同步请求流程分以下几个步骤:

  • 利用newCall方法创建一个RealCall的实例
okhttp3.Response response = okHttpClient.newCall(request).execute();

 override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

RealCal.execute()

  • Call是一个接口,RealCall是Call的实现类,我们走进RealCall的内部,看一下execute()的实现:
/* RealCall中的execute()方法 */
override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

在RealCall的execute()方法中,我们一一说明一下每个方法都是干嘛的:

Callstart()方法

private fun callStart() {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)
  }

在callStart方法中,第一行是跟踪程序的错误,在出错的时候去做错误分析。第二行是一个回调功能,eventListener是一个监听器,会监听各种事件。所以callStart这个方法就是会监听各种事件并给回调一个反馈,让我们解决。

client.dispatcher.executed(this)方法

/** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }

我们看到,对于一个同步请求,dispatcher的executed方法并没有做什么工作,只不过就是简单的把同步请求任务放入runningSyncCalls队列中,等待请求。

getResponseWithInterceptorChain()方法

这个方法是进行一个正式的网络请求的入口:

  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.  
    //添加默认拦截器
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

这个方法和异步请求一样,最终会返回一个Response,也就是网络请求的响应,这个方法中会把用户自定义的拦截器和okhttp中默认的拦截器统一放在一个集合当中,然后后续进行这些拦截器的处理。拦截器具体怎么进行处理请求工作,这些我们一会会单独说。总之这个方法会返回网络请求的响应

最后还要执行RealCal.execute()中的finally中的client.dispatcher.finished(this)方法,这个方法在异步请求中我们已经说过了,这里就不做过多的解释了。

小结

到这里Okhttp的同步请求流程就介绍完了,这里总结一下:当我们调用call.execute方法之后,会发起一个同步网络请求,call是一个接口,它的实现类是RealCall类,所以我们执行的是RealCall的execute方法,在realCall中的execute方法中会先执行callStart()来对所有的时间进行一个监听回调,然后会调用Dispatcher类的executed方法,这个方法会将同步请求任务放在runningSyncCall队列中,然后再执行getResponseWithInterceptorChain()处理同步请求,在这个方法中会进行拦截器的一系列操作进行网络请求最后返回服务器的响应Response返回给用户,最后在调用Dispatcher的finish方法,把事先放入队列中的请求任务移除。这就完成了一个同步请求任务。

  • 说完了异步请求和同步请求的流程,下面附上一张图:

万字长文带你拥抱OkHttp源码

Okhttp请求的拦截器链以及连接实现的源码解析

在上面我们说的okhtp的请求流程中,最后都是调用了一个叫getResponseWithInterceptorChain()的方法,来返回服务器的响应Response,那么这个方法到底干了什么呢?

getResponseWithInterceptorChain()

我们先看一下getResponseWithInterceptorChain()的源码:

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    //添加拦截器
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    //创建chain    
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    //调用chain.proceed()开始请求责任链条的运作  
    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

代码中出现的Interceptor都是一个一个的拦截器,这个方法实际上就是通过几个拦截器组成了一个责任链条,依次经过用户自定义普通拦截器重试拦截器桥接拦截器缓存拦截器连接拦截器用户自定义网络拦截器访问服务器拦截器等拦截处理过程,来获取到一个响应并交给用户。

从上面的代码可以总结出,getResponseWithInterceptorChain()干了三件事:

  1. 把所有配置好的interceptor方法放到interceptors集合中组成一个链条。

    • 在这个方法中有7个拦截器,分别如下:
      • client.interceptors:用户自己定义的应用拦截器。
      • RetryAndFollowUpInterceptor(client):负责网络请求重试重定向的拦截器。
      • BridgeInterceptor(client.cookieJar()):负责转换请求&响应报文的拦截器。
      • CacheInterceptor(client.internalCache()):负责检测缓存和添加缓存的拦截器。
      • ConnectInterceptor(client):负责建立&管理连接的拦截器。
      • client.networkInterceptors:用户自定义的网络拦截器
      • CallServerInterceptor(forWebSocket):负责发起请求获取响应的拦截器。
  2. 创建第一个chain对象(负责把每个拦截器连接起来)。

    • 这个Chain是Interceptor的一个接口,它的实现类是RealInterceptorChain。里面有六个参数:

      • class RealInterceptorChain(
          internal val call: RealCall,
          private val interceptors: List<Interceptor>,		//拦截器集合
          private val index: Int,							//拦截器集合中的索引
          internal val exchange: Exchange?,					//Exchange对象
          internal val request: Request,					//请求
          internal val connectTimeoutMillis: Int,
          internal val readTimeoutMillis: Int,
          internal val writeTimeoutMillis: Int
        ) : Interceptor.Chain 
        
  3. 调用chain的proceed(Request)处理请求,让链条转起来。

    • 调用proceed方法:

      • @Throws(IOException::class)
        override fun proceed(request: Request): Response {
          check(index < interceptors.size)
          
          calls++
          
          //......
          
          //主要代码在这两行
          val next = copy(index = index + 1, request = request)
          val interceptor = interceptors[index]
          
          //......
          
          check(response.body != null) { "interceptor $interceptor returned a response with no body" }
          
          return response
        }
        

在这里面,会再创建一个index+1的Chain对象next,也就是这个新Chain指的是拦截器集合中的下一个拦截器,创建完Chain之后,再通过index获取到当前的Interceptor。然后调用interceptor.intercept(next)方法,每个拦截器都是实现了interceptor接口,调用这个方法,就会处理相应的拦截器中的逻辑,处理逻辑之后每个拦截器中还会有chain.proceed方法,来执行传入的Chain的next对象,然后继续获取下一个拦截器,新建下一个Chain对象…就这样如此循环重复下去,这样就把每个拦截器通过一个个Chain连接起来,形成一条链,把Request沿着链传递下去,直到请求被处理,然后返回Response响应,该响应也是沿着链传递上去。

万字长文带你拥抱OkHttp源码

  • 说完了什么是责任链,我们就要说一下责任链条中的每个拦截器的工作了:

一个拦截器的功能

在分开说每个拦截器的功能之前,我先大概说一下拦截器都做什么。

拦截器中大概有三个工作:前置工作,中置工作和后置工作。

  • 前置工作:就是完成本拦截器需要处理的一些事情。
  • 中置工作:向下传接力棒,让下一个拦截器工作。
  • 后置工作:服务器已经响应Response,处理Response时本连接器做的一些工作。

注意:有些拦截器,不完全有以上三个工作,也就是说并不是所有的拦截器都是这个结构。

client.interceptors用户自定义普通拦截器

这是请求责任链中第一个用户自定义的拦截器,按照开发者的要求,在所有其他Interceptor处理之前,可以在这里进⾏最早的预处理⼯作,以及在收到Response之后,做最后的善后⼯作。如果你有统⼀的header要添加,可以在这⾥设置。

RetryAndFollowUpInterceptor重试拦截器

  • 如果我们最开始没有自定义一个拦截器,那么默认执行的第一个拦截器就是RetryAndFollowUpInterceptor拦截器,这个拦截器作为默认的第一个interceptor,它主要做的工作就是做一个网络连接之前的准备,然后进行proceed执行链条的下一个拦截器,再interceptor,在执行下一个拦截器时,可能会产生一些错误,比如说端口错误,客户端/服务器错误,本拦截器会对这些错误异常进行相应的操作。最后返回请求的响应码。

经过上面的介绍,我们能够认识到,RetryAndFollowUpInterceptor这个拦截器有完善的前置、中置、后置工作,它们分别是:

  • 前置工作:做一个连接之前的准备。
  • 中置工作:调用proceed去执行下一个interceptor,并根据执行的结果进行相应的操作。
  • 后置工作:根据请求返回的响应码,决定是继续循环进行请求还是直接return。

对于这三个工作,在下面RetryAndFollowUpInterceptor的源码中我们会具体的说:

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    var request = chain.request
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    while (true) {
      //前置工作  
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)
      var response: Response
      var closeActiveExchange = true
      try {
        if (call.isCanceled()) {
          throw IOException("Canceled")
        }
		
        //中置工作  
        try {
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          // The attempt to connect via a route failed. The request will not have been sent.
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
          newExchangeFinder = false
          continue
        } catch (e: IOException) {
          // An attempt to communicate with a server failed. The request may have been sent.
          if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
            throw e.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e
          }
          newExchangeFinder = false
          continue
        }

        if (priorResponse != null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        //后置工作
        val exchange = call.interceptorScopedExchange
        val followUp = followUpRequest(response, exchange)

        if (followUp == null) {
          if (exchange != null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }

        val followUpBody = followUp.body
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }

        response.body?.closeQuietly()

        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }

        request = followUp
        priorResponse = response
      } finally {
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
  }

针对上面的源码,我们一点一点说:

前置工作

call.enterNetworkInterceptorExchange(request, newExchangeFinder)

  • 首先说前置工作,前置工作只有一行代码,就是call.enterNetworkInterceptorExchange(request, newExchangeFinder),我们点进去看一下源码:
  fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    check(interceptorScopedExchange == null)

    //......

    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

这个代码中,最重要的一部分就是if语句里面的ExchangeFinder,finder是寻找搜索者的意思,而Exchange则是数据的一次发送和接收,也就是一次数据交换,所以ExchangeFinder就是一个数据交换搜索器,说白了就是找到一个连接,找到一个可用的TCP/SSL连接,以及连接需要的各种参数。这就是做一个连接之前需要做的准备,详细的这个对象我们后面会有详细的解答。

中置工作

proceed()

try {
          response = realChain.proceed(request)
          newExchangeFinder = true
        }catch (e: RouteException) {
          // The attempt to connect via a route failed. The request will not have been sent.
          if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
            throw e.firstConnectException.withSuppressed(recoveredFailures)
          } else {
            recoveredFailures += e.firstConnectException
          }
}
  • 说完了前置工作,下面就是中置工作了,中置工作是一个try/catch语句,在这里面先进行了一个proceed方法,去传递让下一个拦截器执行逻辑,在这个传递的过程中,很可能会出现一些网络连接的错误,比如连接超时、服务器要求你做重定向等一系列问题。在catch当中,就是我们处理这些错误的逻辑,我们在catch当中还会看到一个recover方法:

recover()

 if (!recover(......)){
     ......
 } 

这个recover方法的源码这里就不放了,说一下具体做了什么吧,在这个recover方法中,会根据传进去的参数来判断当前发生这个错误是否可以被系统修改,是否是一个致命错误。

后置工作

  • 如果没有上述的异常,我们会根据response中的响应码,来判断是否进行重定向循环请求或者return
  • 下面的代码:
val exchange = call.interceptorScopedExchange
val followUp = followUpRequest(response, exchange)

这个followUp,就是决定我们是否要重定向,这个followUpRequest方法会根据响应码的CODE,301、302等等,就会返回一个request,执行下一次循环执行这个新请求。如果哦不需要重定向,则返回一个空的request,就会直接return response结束循环。

到这里RetryAndFollowUpInterceptor这个拦截器就介绍完了。

BridgeInterceptor桥拦截器

BridgeInterceptor拦截器,顾名思义又叫做桥拦截器,他做的工作就是为客户端发的Request和服务器返回的Response的Header做着一些处理,添加或移除一些东西,比如添加/移除Content-length,Content-Encoding等等…

经过上面的介绍我们发现这个拦截器的工作比较简单,那么它的前置、中置、后置工作是什么呢?

  • 前置工作:对客户端发的Request的Header,进行一些添加处理。
  • 中置工作:传接力棒,让下一个拦截器interceptor执行逻辑。
  • 后置工作:对服务器返回的Response的Header,进行一些移除处理。

BridgeInterceptor拦截器的源码如下:

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()
	
    //前置工作    
    val body = userRequest.body
    if (body != null) {
      val contentType = body.contentType()
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }

    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
	
    //中置工作  
    val networkResponse = chain.proceed(requestBuilder.build())

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)
	
    //后置工作    
    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }

从源码中,我们可以看到,前置工作会添加一些Content-Type、Content-Length、Transfer-Encoding等等。中置工作就是简单的proceed传棒。后置工作就是移除Content-Encoding、Content-Length等等…

以上就是BridgeInterceptor拦截器的工作。

CacheInterceptor缓存拦截器

这个CacheInterceptor拦截器,看名字我们就知道它和缓存有关系。他的主要工作就是在进行请求Request之前,去Cache中找有没有需要的缓存,如果有并且正是自己这次请求想要的,我们就直接应用这个缓存。如果没有可用的缓存,就继续请求,执行proceed方法传棒。当请求完成,接收得到响应的数据时,在看一下需不需要把这个数据内容存下来,以便下次请求时应用。

CacheInterceptor的前置、中置、后置工作:

  • 前置工作:去Cache中寻找有无可用缓存。
  • 中置工作:传接力棒,令下一个拦截器执行逻辑。
  • 后置工作:把请求得到的数据缓存。

下面我们看一下CacheInterceptor的源码:

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()

    //前置工作    
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

    if (cacheCandidate != null && cacheResponse == null) {
      cacheCandidate.body?.closeQuietly()
    }

    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build().also {
            listener.cacheHit(call, it)
          }
    }

    if (cacheResponse != null) {
      listener.cacheConditionalHit(call, cacheResponse)
    } else if (cache != null) {
      listener.cacheMiss(call)
    }

    //中置工作  
    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally {
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    //后置工作
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }

    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if (cacheResponse != null) {
            listener.cacheMiss(call)
          }
        }
      }

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
        }
      }
    }

    return response
  }

CacheStrategy.Factory(now, chain.request(), cacheCandidate)

这个源码中,我们就说一下CacheStrategy.Factory(now, chain.request(), cacheCandidate)方法,这个方法是前置工作的主要部分,其中放了一些关于缓存的参数,Header的参数等等。

  • 然后这个缓存拦截器我们目前就了解这些,因为这个涉及到Okhttp的缓存策略,是一个单独分出去的东西,所以我们以后再说明Okhttp缓存策略的时候,会详细的说一下这里,请持续关注我博客的更新。

ConnectInterceptor连接拦截器

注:这个拦截器特别关键,了解了这个拦截器interceptor,我们才会对Okhttp的网络请求了解。这个 拦截器的功能就是在客户端和服务端之间建立一个连接。这个拦截器只有前置工作和中置工作,因为他是建立一个连接,没什么后置工作可做。

  • 前置工作:去找有没有现成可用的连接,没有则创建一个。
  • 中置工作:传递接力,让下一个interceptor执行操作。

我们先看一下ConnectInterceptor拦截器的源码:

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //前置工作    
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    //中置工作    
    return connectedChain.proceed(realChain.request)
  }
}

虽然只有短短的几行代码,但是里面包含的工作很多。

它的前置工作只有一行,也就是realChain.call.initExchange(chain)方法:

realChain.call.initExchange(chain)

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      //......
    }

    val exchangeFinder = this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      //......
    }

    if (canceled) throw IOException("Canceled")
    return result
  }

从方法名不难看出这是要初始化一个Exchange,这个方法中主要的两行代码如下:

val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)

第一句返回一个codec,codec就是一个编码解码器。我们发消息接收消息的响应报文,我们是按照某种格式(http1/http2)去写请求都相应。然后我们用获得的这个codec作为参数去获取一个Exchange。那么如何获取的这个codec的,我们看一下exchangeFinder.find(client, chain)这个方法。

exchangeFinder.find(client, chain)

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      //第一步操作找到健康连接  
      val resultConnection = findHealthyConnection(
          	connectTimeout = chain.connectTimeoutMillis,
          	readTimeout = chain.readTimeoutMillis,
          	writeTimeout = chain.writeTimeoutMillis,
          	pingIntervalMillis = client.pingIntervalMillis,
          	connectionRetryEnabled = client.retryOnConnectionFailure,
          	doExtensiveHealthChecks = chain.request.method != "GET"
      )
      //第二步根据健康连接创建codec并返回    
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

这里面主要有两个操作,一个是resultConnection = findHealthyConnection找到一个健康连接,第二步就是用这个连接去执行newCodec方法创建一个编码解码器codec出来。

我们继续往下走,看一下findHealthyConnection方法

findHealthyConnection(…)

  @Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      //第一部分  
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )
		
      //第二部分    
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }
        
      //第三部分
      candidate.noNewExchanges()
      ......
    }
  }

上面这个代码大概分为三部分,这个方法叫做findHealthyConnection意为找到一个健康连接,然后**第一部分就是findConnection()方法,找到一个连接;第二部分是判断这个连接是否健康,如果健康就返回这个健康连接,如果不是健康的就一直执行这个while循环,直到找到健康连接。**我们现在看一下这两个部分调用的方法原理:

findConnection()

  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
      
    //如果call被取消,则抛出异常  
    if (call.isCanceled()) throw IOException("Canceled")

    //第一步:先去看call里面有没有一个连接,有没有一个已经建立完的连接
    val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    //第二步:第一次尝试去连接池拿连接    
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    //找到请求的Routes  
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      // Use a route from an existing route selection.
      routes = null
      route = routeSelection!!.next()
    } else {
      // Compute a new route selection. This is a blocking operation!
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

	  //第三步:第二次去连接池中找连接	 
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    //第三步:自己创建一个连接
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    //第四步:去连接池找有没有相同的连接
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    //第五步:将连接放到连接池中  
    synchronized(newConnection) {
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
  }
  • 第一步:findConnection()方法中第一步:去看一下当前call中有没有现存的自己需要的连接
  • 第二步:第一次去连接池拿连接,connectionPool.callAcquirePooledConnection(address, call, null, false)
  fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    for (connection in connections) {
      synchronized(connection) {
        if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
        if (!connection.isEligible(address, routes)) return@synchronized
        call.acquireConnectionNoEvents(connection)
        return true
      }
    }
    return false
  }

这里面有一层循环,会遍历连接池中的所有连接,里面有两个if判断

第一个if判断是:

if (requireMultiplexed && !connection.isMultiplexed) 

这一步判断我们找的和遍历出来的连接是否是可多路复用的,由于第一次我们传进来的requireMultiplexed参数是false,所以这个if永远返回false,意味着这次的遍历,我们不会考虑可多路复用的连接。

第二个if判断是:

if (!connection.isEligible(address, routes))

这个if判断的是**isEligible,意为判断这个连接是否可用,如果可用就向下执行call.acquireConnectionNoEvents(connection)**方法,把这个连接设置为当前call的连接,不是则continue重复for循环。那么isEligible如何判断连接是否可用呢?我们看一下它的源码:

  • isEligible()
  internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
    assertThreadHoldsLock()

    //当前连接所承受的最大请求数量是否超额
    if (calls.size >= allocationLimit || noNewExchanges) return false

    //比对一些配置协议等等
    if (!this.route.address.equalsNonHost(address)) return false

    //比对主机名
    if (address.url.host == this.route().address.url.host) {
      return true // This connection is a perfect match.
    }

    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false

    //IP和代理是否一样
    if (routes == null || !routeMatchesAny(routes)) return false

    //Verifier是否一样
    if (address.hostnameVerifier !== OkHostnameVerifier) return false
    
    //证书是否一致    
    if (!supportsUrl(address.url)) return false

    //Pinner符合
    try {
      address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
    } catch (_: SSLPeerUnverifiedException) {
      return false
    }

    return true // The caller's address can be carried by this connection.
  }

可见在这个方法中,有许多要求,比如说是否达到了连接的请求数量限制、 当前请求和连接所作的请求是否是同一个(比较IP地址和TCP端口)、加密条件和支持TLS版本、代理配置(equalsNonHost方法中)。对于http2而言,由于虚拟主机的可能存在,单IP地址可以支持多域名,所以还要判断IP和代理是否一致、证书是否一致等等…

以上就是第一次去连接池里获取连接。如果我们最后拿到了就把这个connection赋值给result并return即可。如果没拿到我们要进行下一次拿的尝试。

  • 第三步:再一次去连接池中拿连接,connectionPool.callAcquirePooledConnection(address, call, routes, false)

可见这一次拿连接我们多填了一个Routes参数,Routes参数中包含了Address(Address可以通过主机名抽取TCP端口)、IP地址和代理,传Routes主要是判断可不可以做连接复用(http2的特性,一个连接可以执行多个请求,前提IP地址、TCP端口、代理必须一致),我们在这里看一下刚刚isEligible中的部分代码:

// 1. This connection must be HTTP/2.
    if (http2Connection == null) return false

//IP和代理是否一样
if (routes == null || !routeMatchesAny(routes)) return false

我们看到,如果是http2的连接,才会进行路由的判断,**也就是说如果我们不传路由进来,我们永远无法获取http2的连接,**也就是说我们第一次去连接池找,只找http1连接,第二次因为添加了一个Routes参数,我们不仅找http1的可用连接,还找http2的可用的复用连接。

如果第二次取连接取成功了,还是直接赋值给result返回。

  • 第四步:自己创建一个连接,newConnection()

如果两次都没拿到,我们就要自己创建一个连接了,代码如下:

call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
  • 第五步:再次访问连接池,去找有没有与刚建立的连接相同的连接,connectionPool.callAcquirePooledConnection(address, call, routes, true)

自己创建之后,在我们返回连接之前,我们还要再尝试一次去连接池里取连接,这次我们调用方法传的参数是这样的:connectionPool.callAcquirePooledConnection(address, call, routes, true):

在这个方法中我们多传了一个true,这个参数对应的是callAcquirePooledConnection中的requireMultiplexed参数,我们看一下以下这一行代码:

if (requireMultiplexed && !connection.isMultiplexed)
    
//isMultiplexed方法源码是
  internal val isMultiplexed: Boolean
    get() = http2Connection != null

也就是说只找一个可多路复用的连接(只拿http2的多路复用的连接)。我们为什么要这样创建完再去拿一次呢?

  • 因为有一个极端情况,也就是说我有同时多个请求,访问的是一个IP地址、同一个TCP端口、同一个代理,本来连接池中没有符合要求的连接,所以这三个请求都建立了http2连接,其实建立一个http2连接就够了,所以我们建立完之后要再取连接池中找一次,如果有一个连接恰好在之前建立,我们就舍弃当前这个连接,用连接池里的这个,这样才能做到节省资源,如果没有,我们会把建立的这个连接放到连接池中以便于之后继续用。具体的代码如下:
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!! 
      nextRouteToTry = route
      //舍弃刚刚建立的连接    
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      //将连接放进连接池  
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

findConnection()小结

  • 我们对findConnection()方法做一个小结,寻找连接可以分为5个步骤:第一步:看当前有没有可以用的连接,也就是之前用过,现在还能用的连接,如果没有向下执行。第二步:去连接池中招http1的连接(不支持复用),如果没有向下执行。第三步:去连接池找http1和http2可复用链接,因为传了一个Routes而扩大了一个搜索范围,如果没有向下执行。第四步:两个取连接都没取到,我们自己创建一个连接。第五步:为了避免资源浪费,我们再去连接池找一次,看有没有http2可复用连接,先于自己创建的可用链接,有则废弃刚刚创建的这个连接。

isHealthy()

以上就是findConnection()方法,但是不要忘了findConnection()方法是findHealthyConnection()方法里面的,刚说findHealthyConnection()方法中第一步是找一个连接,第二步是判断连接是否健康:

// findHealthyConnection()中的代码
 if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

//isHealthy源码
  fun isHealthy(doExtensiveChecks: Boolean): Boolean {
    assertThreadDoesntHoldLock()

    val nowNs = System.nanoTime()

    val rawSocket = this.rawSocket!!
    val socket = this.socket!!
    val source = this.source!!
    if (rawSocket.isClosed || socket.isClosed || socket.isInputShutdown ||
            socket.isOutputShutdown) {
      return false
    }

    val http2Connection = this.http2Connection
    if (http2Connection != null) {
      return http2Connection.isHealthy(nowNs)
    }

    val idleDurationNs = synchronized(this) { nowNs - idleAtNs }
    if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
      return socket.isHealthy(source)
    }

    return true
  }

由上面源码可知,判断健康的标准分几个要求:判断socket关闭没,HTTP2健康不,心跳验证是否正常等等…健康则使用,不健康则再次找连接重用。

最后在findHealthyConnection()方法中return一个健康的连接,就这样我们的findHealthyConnection()方法也结束了,我们回到了find方法中的第二步:

return resultConnection.newCodec(client, chain)

也就是根据健康连接创造一个codec,也就是编码解码器。然后又回到了initExchange()方法中,根据获得的codec创建一个Exchange对象,将codec放到Exchange中进行交互。并返回了一个exchange对象。

/*initExchange()方法中*/
val result = Exchange(this, eventListener, exchangeFinder, codec)
    
return result

这样我们这个拦截器就成功在前置工作建立了一个TCP连接(Http)或者说是TSL连接(Https)。中置工作就是一行proceed方法传棒。

client.networkInterceptors用户自定义网络拦截器

这个拦截器也是为开发者提供的,这是在整个链条中开发者所能插入自定义拦截器的最后一个地方,他与之前的那个为开发者准备的拦截器位置不同,意味着这个拦截器可以看到每个请求和响应的数据(包含重定向以及重试的中间请求和响应),并且看到的是完整的原始数据,这个拦截器是让开发者做一些网络调试用的。

callServerInterceptor访问服务器拦截器

这个拦截器interceptor是Okhttp请求链条中的最后一个拦截器,刚刚我们在连接拦截器ConnectInterceptor中已经创建了连接exchange,并传入到了当前这个callServerInterceptor拦截器,所以这最后一个拦截器的工作就是发送网络请求以及读取服务器的响应。这个拦截器只有前置工作和后置工作。

  • 前置工作:进行实质的网络请求的I/O操作,往Socket中写入请求数据。
  • 后置工作:获取响应并读取。

我们根据它的源码来讲解具体的流程:

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    var sendRequestException: IOException? = null
    try {
      //向服务器发送 request header  
      exchange.writeRequestHeaders(request)

      if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
        //POST请求会发送两个包,先发送请求头,获得相应为100后再发送请求体。
        if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
          exchange.flushRequest()
          responseBuilder = exchange.readResponseHeaders(expectContinue = true)
          exchange.responseHeadersStart()
          invokeStartEvent = false
        }
          
        //通过OKIO写入Body  
        if (responseBuilder == null) {
          if (requestBody.isDuplex()) {
            exchange.flushRequest()
            val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
            requestBody.writeTo(bufferedRequestBody)
          } else {
            val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
            requestBody.writeTo(bufferedRequestBody)
            bufferedRequestBody.close()
          }
        } else {
          exchange.noRequestBody()
          if (!exchange.connection.isMultiplexed) {
            exchange.noNewExchangesOnConnection()
          }
        }
      } else {
        exchange.noRequestBody()
      }

      if (requestBody == null || !requestBody.isDuplex()) {
        exchange.finishRequest()
      }
    } catch (e: IOException) {
      //......
      sendRequestException = e
    }

    try {
      if (responseBuilder == null) {
          
        //先构造一个 Response 对象,读取响应的header,
        responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
        if (invokeStartEvent) {
          exchange.responseHeadersStart()
          invokeStartEvent = false
        }
      }
      var response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      var code = response.code
      if (code == 100) {
       //......
      }

      exchange.responseHeadersEnd(response)

      //构造Response的body,并且通过Exchange的openResponseBody(Response)方法读取响应的body,然后通过响应的body继续构造Response
      response = if (forWebSocket && code == 101) {
        response.newBuilder()
            .body(EMPTY_RESPONSE)
            .build()
      } else {
        response.newBuilder()
            .body(exchange.openResponseBody(response))
            .build()
      }
      //......
  }

我们先看发送请求的过程:

  1. 通过Exchange的writeRequestHeaders(request)方法写入请求的header
  2. 如果请求的body不为空,通过okio写入请求的body。

读取响应的过程:

  1. 先构造一个Response对象
  2. 通过Exchange的readResponseHeaders(boolean)方法读取响应的header
  3. 通过Exchange的openResponseBody(Response)方法读取响应的body

最后就是将结果一次按照拦截器链返回,直到返回到我们我们调用Okhttp的地方。

总结

下面列出几个关于OkHttp的知识流程图
Okhttp源码框架的图:

万字长文带你拥抱OkHttp源码

拦截器的连接图:

万字长文带你拥抱OkHttp源码

万字长文带你拥抱OkHttp源码

上一篇:[Java]-HTTP客户端工具OkHttp简介


下一篇:写给互联网大厂员工的真心话,成功定级腾讯T3-2