目录
一、协程实现线程切换
切换到指定的线程是一个很容易实现的功能,难点不是这个,了解即可
https://zhuanlan.zhihu.com/p/386757845
- 向CoroutineContext添加Dispatcher,指定运行的协程
- 在启动时将suspend block创建成Continuation,并调用intercepted生成DispatchedContinuation
- DispatchedContinuation就是对原有协程的装饰,在这里调用Dispatcher完成线程切换任务后,resume被装饰的协程,就会执行协程体内的代码了
CoroutineDispater官方提供了四种实现:Dispatchers.Main,Dispatchers.IO,Dispatchers.Default,Dispatchers.Unconfined
我们一起简单看下Dispatchers.Main的实现
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//...
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 利用主线程的 Handler 执行任务
handler.post(block)
}
}
可以看到,其实就是用handler切换到了主线程
handler.post(block)
如果用Dispatcers.IO也是一样的,只不过换成线程池切换了
如上所示,其实就是一个装饰模式
1.调用CoroutinDispatcher.dispatch方法切换线程
2.切换完成后调用DispatchedTask.run方法,执行真正的协程体
二.CPS
suspend原理
suspend就是CPS+状态机
CPS:做到省去了回调,避免死亡嵌套,用resumwith替代callback
状态机:实现在不同的线程中,顺序的执行,用阻塞写法写非阻塞代码(suspend中的各个suspend函数虽然都是运作在不同线程,但是会转换成状态机,根据label一步一步有序的执行)
当程序执行到requestDataAsync(挂起函数)内部时, 通过async启动了另外一个新的子协程去拉取数据, 启动这个新的子协程后, 当前的父协程就挂起了, 此时requestDataAsync还没有返回.
子协程一直在后台跑, 过了一段时间, 子协程把数据拉回来之后, 会恢复它的父协程, 父协程继续执行, requestDataAsync就把数据返回了.
这里涉及到一种机制俗称CPS(Continuation-Passing-Style)。每一个suspend修饰的方法或者lambda表达式都会在代码调用的时候为其额外添加Continuation类型的参数。
CPS:将以下
GET("/v2/news")
suspend fun newsGet(@QueryMap params: Map<String, String>): NewsResponse
转为
@GET("/v2/news")
fun newsGet(@QueryMap params: Map<String, String>, c: Continuation<NewsResponse>): Any?
经过转换之后,原有的返回类型NewsResponse被添加到新增的Continutation参数中,同时返回了Any?类型。
1.返回值Any:
当suspend函数被协程挂起时,它会返回一个特殊的标识COROUTINE_SUSPENDED,而它本质就是一个Any;当协程不挂起进行执行时,它将返回执行的结果或者引发的异常。这样为了让这两种情况的返回都支持,所以使用了Kotlin独有的Any?类型。
2.传参Continuation
Continutation就是与协程创建的时候一起被创建的。suspend修饰的函数都会获取上层的Continutation,并将其作为参数传递给自己。
GlobalScope.launch {
}
launch时创建
Continutation的源码
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
1.resumeWith
resumeWith替换传统的callback
唤醒挂起的协程。前面已经说过协程在执行的过程中,为了防止阻塞使用了挂起的特性,一旦协程内部的逻辑执行完毕之后,就是通过该方法来唤起协程。让它在之前挂起的位置继续执行下去。
2.CoroutineContext
协程的上下文,它包含用户定义的一些数据集合,这些数据与协程密切相关。它类似于map集合,可以通过key来获取不同类型的数据
协程的本质就是CPS+状态机
1.CPS转换
suspend 的本质,就是 CallBack。(将挂起函数 由可读性较高的普通的同步的写法 转化为CallBack,避免死亡嵌套)
挂起函数在 CPS 转换过程中,函数签名的变化:
CPS 转换过程中,函数的类型发生了变化:suspend ()->String 变成了 (Continuation)-> Any?
其中Continuation的部分就是
Continuation 就是接下来要运行的代码,剩余未执行的代码。(把接下来要执行的代码全部放到嵌套里面)
CPS 转换,就是将原本的同步挂起函数转换成CallBack 异步代码的过程。这个转换是编译器在背后做的,
我们程序员对此无感知,提高可读性。
三、状态机
状态机=when+Continuation(这是便于自己理解的很不规范的一种理解方式)
when(不同的suspended函数,划分开执行顺序)
Continuation
- 保存必要的数据(将上面函数的执行结果作为continuation的成员变量存起来,方便带到下面的挂起函数使用)
- 递归调用 loginUser 函数来恢复执行(保存每个挂起函数的执行结果方便后面使用,记录当前执行到第几个挂起函数label)
- 以上两个简而言之就是保存协程中栈帧的数据,切换线程的时候把栈帧数据带过去,切回来时把数据带回来
原代码
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
// Label 0 -> 第一次执行
val user = userRemoteDataSource.logUserIn(userId, password)
// Label 1 -> 从 userRemoteDataSource 恢复
val userDb = userLocalDataSource.logUserIn(user)
// Label 2 -> 从 userLocalDataSource 恢复
completion.resume(userDb)
}
使用when来实现不同状态
fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
when(label) {
// Label 0 -> 第一次执行
userRemoteDataSource.logUserIn(userId, password)
}
// Label 1 -> 从 userRemoteDataSource 恢复
userLocalDataSource.logUserIn(user)
}
// Label 2 -> 从 userLocalDataSource 恢复
completion.resume(userDb)
}
else -> throw IllegalStateException(...)
}
}
加上了Continuation后
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
class LoginUserStateMachine(
// completion 参数是调用了 loginUser 的函数的回调
completion: Continuation<Any?>
): CoroutineImpl(completion) {
// 要在整个挂起函数中存储的对象
var user: User? = null
var userDb: UserDb? = null
// 所有 CoroutineImpls 都包含的通用对象
var result: Any? = null
var label: Int = 0
// 这个函数再一次调用了 loginUser 来切换
// 状态机 (标签会已经处于下一个状态)
// result 将会是前一个状态的计算结果
override fun invokeSuspend(result: Any?) {
this.result = result
loginUser(null, null, this)
}
}
val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
when(continuation.label) {
0 -> {
// 错误检查
throwOnFailure(continuation.result)
// 下次 continuation 被调用时, 它应当直接去到状态 1
continuation.label = 1
// Continuation 对象被传入 logUserIn 函数,从而可以在结束时恢复
// 当前状态机的执行
userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
}
1 -> {
// 检查错误
throwOnFailure(continuation.result)
// 获得前一个状态的结果
continuation.user = continuation.result as User
// 下次这 continuation 被调用时, 它应当直接去到状态 2
continuation.label = 2
// Continuation 对象被传入 logUserIn 方法,从而可以在结束时恢复
// 当前状态机的执行
userLocalDataSource.logUserIn(continuation.user, continuation)
}
2 -> {
// 错误检查
throwOnFailure(continuation.result)
// 获取前一个状态的结果
continuation.userDb = continuation.result as UserDb
// 恢复调用了当前函数的执行
continuation.cont.resume(continuation.userDb)
}
else -> throw IllegalStateException(...)
}
}