retrofit对协程支持的源码分析

2.6.0时retrofit支持了kotlin的suspend,

 

 

retrofit的代码是怎么识别一个方法时suspend函数的呢

private suspend fun testSuspend(key: String, age: Int) {
    withContext(Dispatchers.Default) {
        delay(10000)
        1
    }
}

 

 

对应的java代码为:

 

private static final Object testSuspend(String key, int age, Continuation $completion)

可以看到在testSuspend参数列表中增加了一个参数,kotlin.coroutines.Continuation,

 

而retrofit就是根据这个特点来判断的,具体在retrofit2.RequestFactory.Builder#build:

 

int parameterCount = parameterAnnotationsArray.length;
parameterHandlers = new ParameterHandler<?>[parameterCount];
for (int p = 0, lastParameter = parameterCount - 1; p < parameterCount; p++) {
  parameterHandlers[p] =
      parseParameter(p, parameterTypes[p], parameterAnnotationsArray[p], p == lastParameter); //最后入参表示是否为最后一个参数
}

 

 

retrofit2.RequestFactory.Builder#parseParameter

 

private @Nullable ParameterHandler<?> parseParameter(
    int p, Type parameterType, @Nullable Annotation[] annotations, boolean allowContinuation) {
  ParameterHandler<?> result = null;
  if (annotations != null) {
    // 对注解进行解析,并赋值给result,
  }

  if (result == null) {
    if (allowContinuation) {
      try {
        if (Utils.getRawType(parameterType) == Continuation.class) { // 如果最后一个参数时Continuation,则表示此方法时kotlin的suspend函数。
          isKotlinSuspendFunction = true;
          return null;
        }
      } catch (NoClassDefFoundError ignored) {
      }
    }
    throw parameterError(method, p, "No Retrofit annotation found.");
  }

  return result;
}

 

 

如何支持kotlin协程呢

 

retrofit2.HttpServiceMethod#parseAnnotations

 

static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
    Retrofit retrofit, Method method, RequestFactory requestFactory) {
  boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
  boolean continuationWantsResponse = false;
  boolean continuationBodyNullable = false;

  Annotation[] annotations = method.getAnnotations();
  Type adapterType;
  if (isKotlinSuspendFunction) { // 1.kotlin suspend
    Type[] parameterTypes = method.getGenericParameterTypes();
    Type responseType =
        Utils.getParameterLowerBound(
            0, (ParameterizedType) parameterTypes[parameterTypes.length - 1]);
    if (getRawType(responseType) == Response.class && responseType instanceof ParameterizedType) { // 2.如果接口方法的返回值是Response<XXX>的话
      // Unwrap the actual body type from Response<T>.
      responseType = Utils.getParameterUpperBound(0, (ParameterizedType) responseType);
      continuationWantsResponse = true;
    } else {
      // TODO figure out if type is nullable or not
      // Metadata metadata = method.getDeclaringClass().getAnnotation(Metadata.class)
      // Find the entry for method
      // Determine if return type is nullable or not
    }

    adapterType = new Utils.ParameterizedTypeImpl(null, Call.class, responseType);
    annotations = SkipCallbackExecutorImpl.ensurePresent(annotations);
  } else {
    adapterType = method.getGenericReturnType();
  }

  CallAdapter<ResponseT, ReturnT> callAdapter =
      createCallAdapter(retrofit, method, adapterType, annotations);
  Type responseType = callAdapter.responseType();
  if (responseType == okhttp3.Response.class) {
    throw methodError(
        method,
        "'"
            + getRawType(responseType).getName()
            + "' is not a valid response body type. Did you mean ResponseBody?");
  }
  if (responseType == Response.class) {
    throw methodError(method, "Response must include generic type (e.g., Response<String>)");
  }
  // TODO support Unit for Kotlin?
  if (requestFactory.httpMethod.equals("HEAD") && !Void.class.equals(responseType)) {
    throw methodError(method, "HEAD method must use Void as response type.");
  }

  Converter<ResponseBody, ResponseT> responseConverter =
      createResponseConverter(retrofit, method, responseType);

  okhttp3.Call.Factory callFactory = retrofit.callFactory;
  if (!isKotlinSuspendFunction) {
    return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
  } else if (continuationWantsResponse) { // 2.如果接口方法的返回值是Response<XXX>的话
    //noinspection unchecked Kotlin compiler guarantees ReturnT to be Object.
    return (HttpServiceMethod<ResponseT, ReturnT>)
        new SuspendForResponse<>(
            requestFactory,
            callFactory,
            responseConverter,
            (CallAdapter<ResponseT, Call<ResponseT>>) callAdapter);
  } else { // 3.如果接口方法的返回值是不是Response<XXX>的话
    //noinspection unchecked Kotlin compiler guarantees ReturnT to be Object.
    return (HttpServiceMethod<ResponseT, ReturnT>)
        new SuspendForBody<>(
            requestFactory,
            callFactory,
            responseConverter,
            (CallAdapter<ResponseT, Call<ResponseT>>) callAdapter,
            continuationBodyNullable);
  }
}

这里看一下SuspendForBody

 

static final class SuspendForBody<ResponseT> extends HttpServiceMethod<ResponseT, Object> {
  private final CallAdapter<ResponseT, Call<ResponseT>> callAdapter;
  private final boolean isNullable;

  SuspendForBody(
      RequestFactory requestFactory,
      okhttp3.Call.Factory callFactory,
      Converter<ResponseBody, ResponseT> responseConverter,
      CallAdapter<ResponseT, Call<ResponseT>> callAdapter,
      boolean isNullable) {
    super(requestFactory, callFactory, responseConverter);
    this.callAdapter = callAdapter;
    this.isNullable = isNullable;
  }

  @Override
  protected Object adapt(Call<ResponseT> call, Object[] args) {
    call = callAdapter.adapt(call);// 1.还是原call

    //noinspection unchecked Checked by reflection inside RequestFactory.
    Continuation<ResponseT> continuation = (Continuation<ResponseT>) args[args.length - 1];

    // Calls to OkHttp Call.enqueue() like those inside await and awaitNullable can sometimes
    // invoke the supplied callback with an exception before the invoking stack frame can return.
    // Coroutines will intercept the subsequent invocation of the Continuation and throw the
    // exception synchronously. A Java Proxy cannot throw checked exceptions without them being
    // declared on the interface method. To avoid the synchronous checked exception being wrapped
    // in an UndeclaredThrowableException, it is intercepted and supplied to a helper which will
    // force suspension to occur so that it can be instead delivered to the continuation to
    // bypass this restriction.
    try {
      return isNullable
          ? KotlinExtensions.awaitNullable(call, continuation)
          : KotlinExtensions.await(call, continuation); // 2.默认是不支持null的,所以调用这个
    } catch (Exception e) {
      return KotlinExtensions.suspendAndThrow(e, continuation);
    }
  }
}

adapt方法是在最后请求是调用的,具体可以看之前的retrofit源码分析。

 

 

 

接着看retrofit2.KotlinExtensions#await

 

suspend fun <T : Any> Call<T>.await(): T {
    return suspendCancellableCoroutine { continuation ->
        continuation.invokeOnCancellation {
            cancel()
        }
        enqueue(object : Callback<T> {
            override fun onResponse(call: Call<T>, response: Response<T>) {
                if (response.isSuccessful) {
                    val body = response.body()
                    if (body == null) {
                        val invocation = call.request().tag(Invocation::class.java)!!
                        val method = invocation.method()
                        val e = KotlinNullPointerException(
                            "Response from " +  method.declaringClass.name + '.' + method.name +
                            " was null but response body type was declared as non-null"
                        )
                        continuation.resumeWithException(e)
                    } else {
                        continuation.resume(body)
                    }
                } else {
                    continuation.resumeWithException(HttpException(response))
                }
            }

            override fun onFailure(call: Call<T>, t: Throwable) {
                continuation.resumeWithException(t)
            }
        })
    }
}

可以看到这个是kotlin的代码了,上边java代码调用此方法时传递的时两个参数,对于kotlin来说第一个参数就是扩展函数的接收者receiver,第二个参数是kotlin生成的continuation。

 

上一篇:zabbix-api系列6 监控项


下一篇:Python爬虫:如何爬取一个网站的源码?