【OkHttp】OkHttp 源码分析 ( 同步 / 异步 Request 请求执行原理分析 )(二)

二、OkHttp 异步 Request 请求源码分析


异步 Request 请求涉及到线程调度 , 比较复杂 ;


OKHttpClient 调用 newCall 获取 RealCall , 然后调用 RealCall 的 enqueue 方法进行异步 Get/Post 请求 , 在该方法中最终调用 OKHttpClient 对象中的 Dispatcher dispatcher 线程调度器 的 enqueue 方法 , 进行异步请求 ;



1、Dispatcher 调度器 enqueue 方法分析


在 Dispatcher 的 enqueue 方法中 , 调用了 findExistingCallWithHost 方法获取 AsyncCall , 然后在方法最后调用了 promoteAndExecute 进行后续执行异步任务操作 ;


/**
 * Policy on when async requests are executed.
 *
 * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
 * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
 * of calls concurrently.
 */
public final class Dispatcher {
  void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);
      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }
 
  @Nullable private AsyncCall findExistingCallWithHost(String host) {
    for (AsyncCall existingCall : runningAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    for (AsyncCall existingCall : readyAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    return null;
  }
}


AsyncCall 继承了 NamedRunnable , NamedRunnable 实现了 Runnable 接口 , AsyncCall 本质是 Runnable ;

final class AsyncCall extends NamedRunnable
public abstract class NamedRunnable implements Runnable



2、Dispatcher 调度器 promoteAndExecute 方法分析


分析 promoteAndExecute 方法 : 将符合条件的调用从 readyAsyncCalls 提升为 runningAsyncCalls , 并且在线程池中调用它们 ; 这些操作必须同步调用 , 因为执行这些调用需要调用用户代码 ;


最终的异步请求执行调用的是 AsyncCall 的 executeOn 方法 ;


   

AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());


Dispatcher | promoteAndExecute 方法源码 :


 

/**
   * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs
   * them on the executor service. Must not be called with synchronization because executing calls
   * can call into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }
    return isRunning;
  }




3、AsyncCall 的 executeOn 方法分析


AsyncCall 的 executeOn 方法中 , 主要使用了 传入的 ExecutorService executorService 线程池 , 执行异步请求任务 ;


RealCall $ AsyncCall | executeOn 方法代码 :


final class RealCall implements Call {
  final class AsyncCall extends NamedRunnable {
    /**
     * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }
  }
}






三、OkHttp 请求时序图参考


【OkHttp】OkHttp 源码分析 ( 同步 / 异步 Request 请求执行原理分析 )(二)




上一篇:公众平台商户接入(微信支付)功能申请教程


下一篇:5.【kafka运维】生产者消费组压力测试运维(5)