OKHttp源码学习同步请求和异步请求(二)

OKHttp get

 private void doGet(String method, String s) throws IOException {
String url = urlAddress + method + "?sex=" + s;
Request request = new Request.Builder().url(url).get().build();
Response respone = okHttpClient.newCall(request).execute();
if (respone.isSuccessful()) {
Log.d("test", respone.body().string());
} else {
Log.d("test", "get failed");
} }

在get请求,用到了 Request Response okHttpClient,分别学习一下这三个类

Request:用于构建一个HTTP请求,使用了建造这模式.如果它们的{@link #body}为null或者它本身是不可变的,那么这个类的实例是不可变的。

Response:用于构建一个HTTP响应。 这个类的实例不是不可变的:响应体是一次性的值,可能只消耗一次然后关闭。 所有其他属性都是不可变的。 <p>这个类实现{@link Closeable}。 关闭它只是关闭其响应主体。

okHttpClient:{@linkplain Call calls}的工厂,可用于发送HTTP请求并读取其响应。

我们创建一个OKHttpClient时,完成了如下初始化的工作:

  public OkHttpClient() {
this(new Builder());
}
 OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory; boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
} if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = systemDefaultTrustManager();
this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
} if (sslSocketFactory != null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
} this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval; if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}

new Builder()的时候做了什么呢?

  public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}

所以到这里明白了,new OKHttpClient()时初始化的参数其实是从Builder中获取到的。当然这些都是默认值,我们也可以重新设置一些值比如

okHttpClient.Builder() .readTimeout(30, TimeUnit.SECONDS) .build();

接下来再回到doGet方法的第四行看  OkHttpClient.newCall()返回一个RealCall对象并调用RealCall.enqueue()方法,最后会进入Dispacher.enqueue()方法中,这里会将RealCall对象放入线程池中调度执行。

  /**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
  //RealCall 类中
1 static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
//RealCall
  @Override public Response execute() throws IOException {
     synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}

第9行

client.dispatcher().executed(this);
Dispatcher是什么呢? dispatcher是new Builder的时候new的一个Dispatcher 对象
接下来接着看
Dispatcher:何时执行异步请求的策略。
//Dispatcher类中
1 /** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}

到这里只是将人物添加到runningSyncCalls这个队列当中,那什么时候会执行这个任务呢?回到RealCall的execute方法,

dispatcher的executed方法运行结束之后会接着第10行  Response result = getResponseWithInterceptorChain();方法获取响应,最后调用Dispatcher的finished方法
  /** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
} if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}

在这个finish方法中,runningSyncCalls队列做为参数calls被传入。

首先是从队列中移除请求,如果不能移除,则抛出异常;

然后调用runningCallsCount统计目前还在运行的请求,最后,如果正在运行的请求数为0表示Dispatcher中没有可运行的请求了,进入Idle状态,此时如果idleCallback不为null,则调用其run方法。下面是runningCallsCount()方法的实现:

 public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}

至此,同步请求的执行流程分析完成。接下来学习异步请求的流程。

  private void doPost(String method, String s) {
FormBody formBody = new FormBody.Builder().add("sex", s).build();
RequestBody body = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), "{\"sex\",\""+s+"\"}");
Request request = new Request.Builder().url(urlAddress + method).post(body).build();
okHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onResponse(Call arg0, Response arg1) throws IOException {
Log.d("test", arg1.body().string());
}
@Override
public void onFailure(Call arg0, IOException arg1) {
Log.d("test", "post failed");
}
});
}

异步请求会调用qnqueue方法:

 @Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
在这里有一个新的类AsyncCall,AsyncCall是RealCall的一个内部类并且继承NamedRunnable,NamedRunnable实现了Runnable接口,并且在run方法中调用了需要子类复写的execute

接下来看dispatcher的enqueue的方法:

 synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
这里先判断正在运行的异步请求的数量 如果小于maxRequests  并且与该请求相同的主机数量小于maxRequestsPerHost,也就是说符合放入runningAsyncCalls队列的要求,那么放入队列,然后将AsyncCall交给线程池;如果不符合,那么就放入到readyAsyncCalls队列中。

 当线程池执行AsyncCall任务时,它的execute方法会被调用,下面看AsyncCall的execute方法
 @Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}

在execute方法中,首先是调用getResponseWithInterceptorChain()方法获取响应,然后获取成功后,就调用回调的onReponse方法,如果失败,就调用回调的onFailure方法。最后,调用Dispatcher的finished方法。

然后看dispatcher的finished方法:

 /** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
} if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}

与同步调用不同的是最后一个参数是true所以会执行promoteCalls方法。

 private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

//遍历等待队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//如果当前请求的主机处理的请求数量小于最大数量就将该请求从等待队列移除并添加到runningAsyncCalls队列中,然后交给线程池
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
} if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
 至此,异步请求的过程学习完毕,不管是同步请求还是异步请求,最终都会调用getResponseWithInterceptorChain()方法进行具体的网络请求,接下来学习一下具体的网络请求  getResponseWithInterceptorChain()
有用的集合类
Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

参考一篇比较好的讲解Dispatcher的文章

https://www.cnblogs.com/laughingQing/p/7296486.html

 
上一篇:TFS 强制撤销别人签出的代码


下一篇:XCTest各种断言