OkHttp源码分析:分发器任务调配,拦截器责任链设计,连接池socket复用

目录

一,分发器和拦截器

二,分发器处理异步请求

1.分发器处理入口

2.分发器工作流程

3.分发器中的线程池设计

三,分发器处理同步请求

四,拦截器处理请求

1.责任链设计模式

 2.拦截器工作原理

3.OkHttp五大拦截器


一,分发器和拦截器

        OkHttp在内部维护了这几个重要对象:分发器dispatcher,连接池connectionPool,拦截器Interceptor;

//拦截器
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

//连接池
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
  
//拦截器
@get:JvmName("interceptors") val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()

@get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()

他们的作用分别为:

  • 分发器Dispatcher:调配请求任务,内部维护队列线程池  
  • 拦截器:处理请求与响应,完成请求过程
  • 连接池:管理socket连接与连接复用

        从OkHttp的请求处理流程来看: 拦截器负责完成网络请求过程,同步和异步请求必须经过分发器调配后才会发给拦截器进行网络请求;

二,分发器处理异步请求

1.分发器处理入口

private void visitInternet() {
    //1.创建HttpClient对象
    OkHttpClient okHttpClient = new OkHttpClient();
    //2.获取request对象
    Request.Builder builder = new Request.Builder()
            .url("https://www.bilibili.com/");
    Request request = builder.build();
    //3.获取call对象
    Call call = okHttpClient.newCall(request);
    //4.执行网络操作
    try {
        Response response = call.execute();
        String result = response.body().string();
        showResultOnUiThread(result);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

        从OkHttp处理流程来看,每次发送请求前我们需要调用 newCall() 方法获取call对象,这里的Call是一个接口,newCall返回的是Call接口的实现类RealCall;

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

call对象只能使用一次 

        发起异步请求需要调用call对象的 enqueue() 方法,enqueue方法首先会将call对象中的executed字段置为true,代表这个call对象已经使用过,第二次就无法使用,想要再次使用的话需要调用call对象的 clone() 方法;

        callStart方法执行后表示请求开始,之后便会执行分发器的enqueue方法处理异步请求,这里传入的对象AsyncCall是Runnable接口的实现类,可以理解为是我们要处理的异步任务;

  override fun enqueue(responseCallback: Callback) {
    //call对象只能使用一次
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart() //请求开始
    
    //分发器处理异步请求
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

2.分发器工作流程

分发器中维护了三个队列:

  • readyAsyncCalls:等待中异步请求队列
  • runningAsyncCalls:执行中异步请求队列
  • runningSyncCalls:执行中同步请求队列

        分发器dispatcher的enqueue方法执行后,异步请求AsyncCall默认先放到readAsyncCalls中,如果是非websocket连接,则检查一下runningAsyncCalls和readAsyncCalls中是否有相同域名host的请求,如果有则复用之前的域名的计数器existingCall

        计数器之后用于判断同一主机(域名)请求连接数

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

检查完之后调用promoteAndExecute()方法,在这个方法中会检查两件事:

  • 进行中异步请求数是否 ≥ 64(runningAsyncCalls队列的size是否 ≥ 64),
  • 对同一域名(主机)的请求callsPerHost是否大于5;

若条件符合,将异步任务加入到runningAsyncCalls中

检查完可执行请求并更新状态后,将请求提交到线程池中执行

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

          //检查可执行请求
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    //提交到线程池中执行
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService //线程池)
    }

    return isRunning
  }

将AsyncCall提交到线程池后,AsyncCall对象的run方法便会被执行;

在run方法中,从拦截器中获取了服务器的响应,完成请求后调用dispatcher的finish方法,结束本次异步请求;

override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
            //拦截器完成请求,返回响应
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
            //调用finish方法,结束本次异步请求
          client.dispatcher.finished(this)
        }
      }
    }

在完成一次请求后,runningAsyncCalls队列会空出位置

所以在finish方法中,会重新调用检查异步任务方法promoteAndExecute(),也就是在结束一次请求后,会去检查readyAsyncCalls队列中符合条件的异步任务,并去执行他们

idleCallback.run() 用于在所有请求完成后执行特定操作,操作内容自定义

internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    //重新调用promoteAndExecute,检查可执行异步请求
    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
        //用于在所有请求完成后执行特定操作,操作内容自定义
      idleCallback.run()
    }
  }

3.分发器中的线程池设计

分发器中的线程池:

  • 核心线程数:0
  • 最大线程数:Int.MAX_VALUE
  • 空闲时间:60s
  • 工作队列:SynchronousQueue()
@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(
            0, //核心线程数
            Int.MAX_VALUE, //最大线程数
            60, //空闲时间
            TimeUnit.SECONDS, //空闲时间单位(秒)
            SynchronousQueue(), //工作队列
            threadFactory("$okHttpName Dispatcher", false)
        )
      }
      return executorServiceOrNull!!
    }

线程池工作原理:

  1. 工作中线程 < 核心线程数 创建新线程
  2. 工作中线程 > 核心线程数且工作队列未满,加入工作队列
  3. 工作队列已满,工作中线程数若 < 最大线程数, 创建新线程
  4. 工作队列已满,工作中线程数 > 最大线程数, 执行拒绝策略(默认为抛出异常,可自定义)

在okhttp的分发器中,线程池使用SynchronousQueue()作为工作队列,这种容器没有容量,也就无法添加任务,所以当工作中线程 > 核心线程数,会直接创建新线程

三,分发器处理同步请求

对于同步请求,分发器只记录请求(放入RunningSyncCalls中)

  override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

//dispatcher.executed()
  @Synchronized internal fun executed(call: RealCall) {
       //分发器只记录同步请求
    runningSyncCalls.add(call)
  }

四,拦截器处理请求

1.责任链设计模式

OkHttp中的拦截器采用责任链设计模式:

        为避免请求发送者与多个请求处理者耦合在一起,于是将所有请求处理者通过前一对象记住下一对象的引用而形成一条链,当有请求发生时,请求只需沿着链传递,直到有对象处理它

模拟责任链设计模式:

我们定义一个Handler抽象类,并让他持有下一Handler对象的引用next,并创建Handler三个子类

abstract class Handler {

    protected var next : Handler? = null;

    fun setNext(next : Handler){
        this.next = next;
    }

    fun getNext() : Handler?{
        return next;
    }

    abstract fun handle(request : String);
}

class Handler1 : Handler() {

    override fun handle(request: String) {
        if("1".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

class Handler2 : Handler() {
    override fun handle(request: String) {
        if("2".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

class Handler3 : Handler() {
    override fun handle(request: String) {
        if("3".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

        我们让handler1拥有2的引用,2拥有3的引用,这样当我们调用1的handle("3")时,request对象就会一直沿着责任链执行,直到遇到能处理他的对象(handler3)

val handler1: Handler = Handler1()
val handler2: Handler = Handler2()
val handler3: Handler = Handler3()

handler1.setNext(handler2)
handler2.setNext(handler3)

handler1.handle("3")

 2.拦截器工作原理

拦截器的工作基本分为三步:

  1. 处理请求request
  2. 将请求传往下一拦截器,获取返回的请求response
  3. 处理响应response并返回

例如,我们自定义一个日志打印拦截器:

class LogInterceptor : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        //1.处理请求
        val request = chain.request();

        val requestLog = StringBuilder().apply {
            append("Request:\n")
            append("URL: ${request.url}\n")
            append("Method: ${request.method}\n")
            append("Headers: ${request.headers}\n")
            request.body?.let {
                append("Body: ${it.toString()}\n")
            }
        }
        Log.d("OkHttp", requestLog.toString())

        //将请求传往下一拦截器,获取响应
        val response = chain.proceed(request)

        //处理响应并返回
        val responseLog = StringBuilder().apply {
            append("Response:\n")
            append("Code: ${response.code}\n")
            append("Headers: ${response.headers}\n")
            response.body?.let {
                append("Body: ${it.string()}\n")
            }
        }
        Log.d("OkHttp", responseLog.toString())

        return response;
    }
}

在chain的proceed方法中,程序会找到拦截器链中的下一拦截器并将请求传给他,获取返回的请求

  @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // 找到拦截器链中的下一拦截器
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    //传递请求,获取响应
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

3.OkHttp五大拦截器

OkHttp中默认配置五个拦截器,分别为:

val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
    interceptors += client.networkInterceptors
}
nterceptors += CallServerInterceptor(forWebSocket)
  • 重试和重定向拦截器 RetryAndFollowUpInterceptor:重试拦截器在交出前(交给下一个拦截器),负责判断用户是否取消了请求。在获得了响应之后,会根据响应码判断是否需要重定向,如果满足所有条件就会重启执行所有拦截器
  • 桥接拦截器(处理请求头和响应头)BridgeInterceptor:在交出之前,负责将Http协议必备的请求头加入请求之中(如Host,Connection),并添加一些默认的行为(如RZIP压缩);获得响应后调用保存cookie接口并解析GZIP数据
  • 缓存拦截器 CacheInterceptor:交出之前读取并判断是否使用缓存;获取响应后判断是否缓存
  • 连接拦截器 ConnectInterceptor:交出之前,负责创建或找到一个连接,并获取socket流;获取响应后不进行额外处理
  • 网络请求拦截器(执行实际的网络请求)CallServerInterceptor:进行真正的与服务器通信,向服务器发送数据,解析读取的响应数据

OkHttp中添加拦截器有两种方式:addInterceptor()和 addNetworkInterceptor(),他们的主要区别如下:

  • 调用时机:Application拦截器在请求开始时调用,Network在网络连接建立后调用
  • 调用次数:Application只调用一次,Network可能调用多次(重定向)
  • 可见信息:Application只能看到最终请求/响应,Network能看到所有中间请求/响应
  • 缓存感知:Application无法感知缓存,Network可以感知缓存
  • 使用场景:Application一般用于业务处理(如:身份验证,日志记录,错误处理),Network一般用于网络层操作(如:网络监控,缓存处理,压缩处理)

OkHttp完整拦截器链如下:

上一篇:功能篇:springboot实现防盗链功能


下一篇:docker打包当前使用的某个容器为镜像,导出,导入