进阶:协程原理

目录

一、协程实现线程切换

二.CPS

三、状态机


一、协程实现线程切换

切换到指定的线程是一个很容易实现的功能,难点不是这个,了解即可

https://zhuanlan.zhihu.com/p/386757845

  1. 向CoroutineContext添加Dispatcher,指定运行的协程
  2. 在启动时将suspend block创建成Continuation,并调用intercepted生成DispatchedContinuation
  3. DispatchedContinuation就是对原有协程的装饰,在这里调用Dispatcher完成线程切换任务后,resume被装饰的协程,就会执行协程体内的代码了

CoroutineDispater官方提供了四种实现:Dispatchers.Main,Dispatchers.IO,Dispatchers.Default,Dispatchers.Unconfined

我们一起简单看下Dispatchers.Main的实现

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    //...

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // 利用主线程的 Handler 执行任务
        handler.post(block)
    }
}

可以看到,其实就是用handler切换到了主线程

handler.post(block)

如果用Dispatcers.IO也是一样的,只不过换成线程池切换了

进阶:协程原理

如上所示,其实就是一个装饰模式

1.调用CoroutinDispatcher.dispatch方法切换线程

2.切换完成后调用DispatchedTask.run方法,执行真正的协程体

 

二.CPS

     

suspend原理

suspend就是CPS+状态机

CPS:做到省去了回调,避免死亡嵌套,用resumwith替代callback

状态机:实现在不同的线程中,顺序的执行,用阻塞写法写非阻塞代码(suspend中的各个suspend函数虽然都是运作在不同线程,但是会转换成状态机,根据label一步一步有序的执行)

当程序执行到requestDataAsync(挂起函数)内部时, 通过async启动了另外一个新的子协程去拉取数据, 启动这个新的子协程后, 当前的父协程就挂起了, 此时requestDataAsync还没有返回.

子协程一直在后台跑, 过了一段时间, 子协程把数据拉回来之后, 会恢复它的父协程, 父协程继续执行, requestDataAsync就把数据返回了.

这里涉及到一种机制俗称CPS(Continuation-Passing-Style)。每一个suspend修饰的方法或者lambda表达式都会在代码调用的时候为其额外添加Continuation类型的参数。

CPS:将以下

GET("/v2/news")
suspend fun newsGet(@QueryMap params: Map<String, String>): NewsResponse

转为

@GET("/v2/news")
fun newsGet(@QueryMap params: Map<String, String>, c: Continuation<NewsResponse>): Any?

经过转换之后,原有的返回类型NewsResponse被添加到新增的Continutation参数中,同时返回了Any?类型。

1.返回值Any:

当suspend函数被协程挂起时,它会返回一个特殊的标识COROUTINE_SUSPENDED,而它本质就是一个Any;当协程不挂起进行执行时,它将返回执行的结果或者引发的异常。这样为了让这两种情况的返回都支持,所以使用了Kotlin独有的Any?类型。

2.传参Continuation

Continutation就是与协程创建的时候一起被创建的。suspend修饰的函数都会获取上层的Continutation,并将其作为参数传递给自己。

GlobalScope.launch { 
}

launch时创建

Continutation的源码

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

1.resumeWith

resumeWith替换传统的callback

唤醒挂起的协程。前面已经说过协程在执行的过程中,为了防止阻塞使用了挂起的特性,一旦协程内部的逻辑执行完毕之后,就是通过该方法来唤起协程。让它在之前挂起的位置继续执行下去。

2.CoroutineContext

协程的上下文,它包含用户定义的一些数据集合,这些数据与协程密切相关。它类似于map集合,可以通过key来获取不同类型的数据

协程的本质就是CPS+状态机

1.CPS转换

suspend 的本质,就是 CallBack。(将挂起函数 由可读性较高的普通的同步的写法 转化为CallBack,避免死亡嵌套)

挂起函数在 CPS 转换过程中,函数签名的变化:

进阶:协程原理

CPS 转换过程中,函数的类型发生了变化:suspend ()->String 变成了 (Continuation)-> Any?

其中Continuation的部分就是

进阶:协程原理

Continuation 就是接下来要运行的代码,剩余未执行的代码。(把接下来要执行的代码全部放到嵌套里面)

CPS 转换,就是将原本的同步挂起函数转换成CallBack 异步代码的过程。这个转换是编译器在背后做的,

我们程序员对此无感知,提高可读性。

进阶:协程原理

三、状态机

状态机=when+Continuation(这是便于自己理解的很不规范的一种理解方式)

when(不同的suspended函数,划分开执行顺序)

Continuation

  1. 保存必要的数据(将上面函数的执行结果作为continuation的成员变量存起来,方便带到下面的挂起函数使用)
  2. 递归调用 loginUser 函数来恢复执行(保存每个挂起函数的执行结果方便后面使用,记录当前执行到第几个挂起函数label)
    1. 以上两个简而言之就是保存协程中栈帧的数据,切换线程的时候把栈帧数据带过去,切回来时把数据带回来

原代码

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  // Label 0 -> 第一次执行
  val user = userRemoteDataSource.logUserIn(userId, password)
  // Label 1 -> 从 userRemoteDataSource 恢复
  val userDb = userLocalDataSource.logUserIn(user)
  // Label 2 -> 从 userLocalDataSource 恢复
  completion.resume(userDb)
}

使用when来实现不同状态

fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
  when(label) {
              // Label 0 -> 第一次执行
        userRemoteDataSource.logUserIn(userId, password)
    }
              // Label 1 -> 从 userRemoteDataSource 恢复
        userLocalDataSource.logUserIn(user)
    }
              // Label 2 -> 从 userLocalDataSource 恢复
        completion.resume(userDb)
    }
    else -> throw IllegalStateException(...)
  }
}

加上了Continuation后

/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */
fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {

    class LoginUserStateMachine(
        // completion 参数是调用了 loginUser 的函数的回调
        completion: Continuation<Any?>
    ): CoroutineImpl(completion) {
        // 要在整个挂起函数中存储的对象
        var user: User? = null
        var userDb: UserDb? = null
        // 所有 CoroutineImpls 都包含的通用对象
        var result: Any? = null
        var label: Int = 0
        // 这个函数再一次调用了 loginUser 来切换
        // 状态机 (标签会已经处于下一个状态) 
        // result 将会是前一个状态的计算结果
        override fun invokeSuspend(result: Any?) {
            this.result = result
            loginUser(null, null, this)
        }
    }

    val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)

    when(continuation.label) {
        0 -> {
            // 错误检查
            throwOnFailure(continuation.result)
            // 下次 continuation 被调用时, 它应当直接去到状态 1
            continuation.label = 1
            // Continuation 对象被传入 logUserIn 函数,从而可以在结束时恢复 
            // 当前状态机的执行
            userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
        }
        1 -> {
            // 检查错误
            throwOnFailure(continuation.result)
            // 获得前一个状态的结果
            continuation.user = continuation.result as User
            // 下次这 continuation 被调用时, 它应当直接去到状态 2
            continuation.label = 2
            // Continuation 对象被传入 logUserIn 方法,从而可以在结束时恢复 
            // 当前状态机的执行
            userLocalDataSource.logUserIn(continuation.user, continuation)
        }
        2 -> {
            // 错误检查
            throwOnFailure(continuation.result)
            // 获取前一个状态的结果
            continuation.userDb = continuation.result as UserDb
            // 恢复调用了当前函数的执行
            continuation.cont.resume(continuation.userDb)
        }
        else -> throw IllegalStateException(...)
    }
}

上一篇:工作笔记——CPLD与MCU通过SPI通信


下一篇:UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd6 in position 305: invalid con