二、OkHttp 异步 Request 请求源码分析
异步 Request 请求涉及到线程调度 , 比较复杂 ;
OKHttpClient 调用 newCall 获取 RealCall , 然后调用 RealCall 的 enqueue 方法进行异步 Get/Post 请求 , 在该方法中最终调用 OKHttpClient 对象中的 Dispatcher dispatcher 线程调度器 的 enqueue 方法 , 进行异步请求 ;
1、Dispatcher 调度器 enqueue 方法分析
在 Dispatcher 的 enqueue 方法中 , 调用了 findExistingCallWithHost 方法获取 AsyncCall , 然后在方法最后调用了 promoteAndExecute 进行后续执行异步任务操作 ;
/** * Policy on when async requests are executed. * * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number * of calls concurrently. */ public final class Dispatcher { void enqueue(AsyncCall call) { synchronized (this) { readyAsyncCalls.add(call); // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); } @Nullable private AsyncCall findExistingCallWithHost(String host) { for (AsyncCall existingCall : runningAsyncCalls) { if (existingCall.host().equals(host)) return existingCall; } for (AsyncCall existingCall : readyAsyncCalls) { if (existingCall.host().equals(host)) return existingCall; } return null; } }
AsyncCall 继承了 NamedRunnable , NamedRunnable 实现了 Runnable 接口 , AsyncCall 本质是 Runnable ;
final class AsyncCall extends NamedRunnable public abstract class NamedRunnable implements Runnable
2、Dispatcher 调度器 promoteAndExecute 方法分析
分析 promoteAndExecute 方法 : 将符合条件的调用从 readyAsyncCalls 提升为 runningAsyncCalls , 并且在线程池中调用它们 ; 这些操作必须同步调用 , 因为执行这些调用需要调用用户代码 ;
最终的异步请求执行调用的是 AsyncCall 的 executeOn 方法 ;
AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService());
Dispatcher | promoteAndExecute 方法源码 :
/** * Promotes eligible calls from {@link #readyAsyncCalls} to {@link #runningAsyncCalls} and runs * them on the executor service. Must not be called with synchronization because executing calls * can call into user code. * * @return true if the dispatcher is currently running calls. */ private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); asyncCall.callsPerHost().incrementAndGet(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; }
3、AsyncCall 的 executeOn 方法分析
AsyncCall 的 executeOn 方法中 , 主要使用了 传入的 ExecutorService executorService 线程池 , 执行异步请求任务 ;
RealCall $ AsyncCall | executeOn 方法代码 :
final class RealCall implements Call { final class AsyncCall extends NamedRunnable { /** * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); transmitter.noMoreExchanges(ioException); responseCallback.onFailure(RealCall.this, ioException); } finally { if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } } } }
三、OkHttp 请求时序图参考