Kotlin Coroutine(协程): 三、了解协程

@

目录


前言

上一篇, 我们已经讲述了协程的基本用法, 这篇将从协程上下文, 启动模式, 异常处理角度来了解协程的用法

一、协程上下文

我们先看一下 启动协程构建函数; launch, async等 它们参数都差不多

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {

第一个参数: CoroutineContext 就是协程上下文.
第二个参数: CoroutineStart 时协程的启动模式, 我们后面再说
第三个参数: 就是协程的执行代码块.


CoroutineContext: 是一个接口, 它可以包含 调度器, 拦截协程执行, 局部变量等.
里面有一个操作符重载函数:

public operator fun plus(context: CoroutineContext): CoroutineContext = ...省略...

所以,才能看到 两个上下文元素相加; 例如: SupervisorJob() + Dispatchers.Main
没错, 这就是 MainScope() 定义的上下文;

//kotlin.coroutines.CoroutineContext
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

当然, 我们也可以看见 协程作用域 + 上下文

//kotlinx.coroutines.CoroutineScope
public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
    ContextScope(coroutineContext + context)

不管怎么加, 反正都是合并协程上下文中的内容.

1.调度器

上一篇已经介绍过了, 我们再次贴这几种调度器的区别:

调度器 意义
不指定 它从启动了它的 CoroutineScope 中承袭了上下文
Dispatchers.Main 用于Android. 在UI线程中执行
Dispatchers.IO 子线程, 适合执行磁盘或网络 I/O操作
Dispatchers.Default 子线程,适合 执行 cpu 密集型的工作
Dispatchers.Unconfined 从当前线程直接执行, 直到第一个挂起点

2.给协程起名

还记得线程别名吗? 没错 它们差不多; 它也是协程上下文元素

CoroutineName("name"):

launch(CoroutineName("v1coroutine")){...}

但要获取附带协程别名的线程名, 还得加JVM参数: -Dkotlinx.coroutines.debug

3.局部变量

有时,能够将一些线程局部数据传递到协程与协程之间是很方便的。 它们不受任何特定线程的约束
使用 ThreadLocal 构建; 用 asContextElement(value = "launch") 转换为协程上下文并赋值.

val threadLocal = ThreadLocal<String?>() // 声明线程局部变量
runBlocking {
    threadLocal.set("main")
    letUsPrintln("start!! 变量值为:'${threadLocal.get()}';;")
    launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        letUsPrintln("launch! 变量值为:'${threadLocal.get()}';;")
        delay(2000)
        launch{
            letUsPrintln("子协程! 变量值为:'${threadLocal.get()}';;")
        }
        letUsPrintln("launch! 变量值为:'${threadLocal.get()}';;")
    }
    launch {
        delay(1000)
        letUsPrintln("弟协程! 变量值为:'${threadLocal.get()}';;")
    }
    threadLocal.set(null)
    letUsPrintln("在末尾! 变量值为:'${threadLocal.get()}';;")
}

打印结果如下:

start!! 变量值为:'main';; Thread_name:main
launch! 变量值为:'launch';; Thread_name:DefaultDispatcher-worker-1
在末尾! 变量值为:'null';; Thread_name:main
弟协程! 变量值为:'null';; Thread_name:main
launch! 变量值为:'launch';; Thread_name:DefaultDispatcher-worker-1
子协程! 变量值为:'launch';; Thread_name:DefaultDispatcher-worker-1

注意:
当一个线程局部变量变化时,这个新值不会传播给协程调用者

当然还有:
拦截器(ContinuationInterceptor): 多用作线程切换, 有兴趣的小伙伴自行百度.
异常处理器(CoroutineExceptionHandler): 这个后面再说


二、启动模式 CoroutineStart

1.DEFAULT
默认模式, 立即执行; 虽说立即执行, 实际上是立即调度执行. 代码块是否接着执行 还得看线程的空闲状态啥的.

2.LAZY
延迟启动, 我们可以先把协程定义好. 在需要的时候调用 start()

下面我们用 async 为例:

suspend fun doSomethingUsefulOne(): Int {
    println("doSomethingUsefulOne")
    delay(1000L) // 假设我们在这里做了些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    println("doSomethingUsefulTwo")
    delay(500L) // 假设我们在这里也做了一些有用的事
    return 29
}

runBlocking {
    val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
    val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
    delay(2000)	//挂起一下, 看看 LAZY 协程是否被启动
    println("终于要启动了")
    one.start() // 启动第一个
    two.start() // 启动第二个
    println("The answer is ${one.await() + two.await()}")
}

打印结果:

终于要启动了
doSomethingUsefulOne
doSomethingUsefulTwo
The answer is 42

可以看出, 即使 delay(2000); LAZY模式的协程, 仍没有启动. 调用 start() 后才会启动.

需要注意:
start() 或 await() 虽然都可以让 LAZY协程启动, 但上面的例子中, 只调用 await()的话, 两个async会变为顺序执行, 损失异步性质. 因此请使用 start() 来启动 LAZY协程

3.ATOMIC
跟 DEFAULT 差不多, 区别在于 开始运行之前无法取消

如果不是 LAZY模式, 从协程定义 到代码块执行还是很简短的. 这段时间内的取消与否 只能说也许在特殊业务中它才会被使用.


4.UNDISPATCHED
当前线程立即执行协程体,直到第一个挂起点.

怎么听起来这么耳熟呢? 没错 它跟 调度器:Dispatchers.Unconfined 效果类似. 实现方式是否一致不得而知.

三、异常处理

异常处理较为复杂, 注意点也比较多, 真正理解需要很多测试代码,或一定实战经验. 所以不能贯通理解也没有关系,我们只需要对它有一定了解, 做到大体心中有数即可.

子协程:

我们先来了解一下子协程的定义:
当一个协程被其它协程在 CoroutineScope 中启动的时候, 它将通过 CoroutineScope.coroutineContext 来承袭上下文,并且这个新协程的 Job 将会成为父协程作业的子作业。当一个父协程被取消的时候,所有它的子协程也会被递归的取消。
然而,当使用 GlobalScope 来启动一个协程时,则新协程的作业没有父作业。 因此它与这个启动的作用域无关且独立运作。一个父协程总是等待所有的子协程执行结束。父协程并不显式的跟踪所有子协程的启动,并且不必使用 Job.join 在最后的时候等待它们

简而言之:

  • 协程中启动的协程, 就是子协程. GlobalScope 除外; 新协程的Job, 也是子Job
  • 父协程取消时(主动取消或异常取消), 递归取消所有子协程, 及子子协程
  • 父协程会等待子协程全部执行完毕才会结束

当一个协程由于异常而运行失败时:

  1. 取消它自己的子级;
  2. 取消它自己;
  3. 将异常传播并传递给它的父级。

异常会到达层级的根部,而且当前 CoroutineScope 所启动的所有协程都会被取消。

1.异常测试

我们用几个例子来检测一下

runBlocking {
    launch {
        println("协程1-start")    //2
        delay(100)
        throw Exception("Failed coroutine") //4
    }
    launch {
        println("协程2-start")    //3
        delay(200)
        println("协程2-end")  //未打印
    }
    println("start")    //1
    delay(500)
    println("end")      //未打印
}

打印结果如下:

 start
 协程1-start
 协程2-start
 Exception in thread "main" java.lang.Exception: Failed coroutine ...

可以看出: 协程1异常. 协程2(兄弟协程)被取消. runBlocking(作用域)也被取消.


当 async 被用作根协程时,它的结果和异常会包装在 返回值 Deferred.await() 中;

runBlocking {
    //async 依赖用户来最终消费异常; 通过 await()
    val deferred = GlobalScope.async {
        letUsPrintln("协程1")
        throw Exception("Failed coroutine")
    }
    try {
        deferred.await()
    }catch (e: Exception){
        println("捕捉到了协程1异常")
    }
    letUsPrintln("end")
}

因此, try{..}catch {..} 需要包裹 await(); 而包裹 async{..} 是没有意义的.


然而 try{..}catch{..} 并不一定合适;

runBlocking {
    try {
        launch {
            letUsPrintln("协程1")
            throw Exception("Failed coroutine")
        }
    }catch (e: Exception){
        println("捕捉到了协程1异常")	//未打印
    }
    delay(100)
    letUsPrintln("end")	//未打印
}

打印结果:

协程1 Thread_name:main
Exception in thread "main" java.lang.Exception: Failed coroutine ...

未能捕获异常, runBlocking(父协程) 被终止; 我们尝试用真实环境,包裹根协程:

try {
    lifecycleScope.launch {
        letUsPrintln("111协程1")
        throw Exception("Failed coroutine")
    }
}catch (e: Exception){
    println(e.message)
}

好吧, 程序直接 crash; 想想也对, 协程块代码始终是要分发给线程去做. try catch 又不是包在代码块里面.

2.CoroutineExceptionHandler

异常处理器, 它是 CoroutineContext 的一个可选元素,它让您可以处理未捕获的异常。

我们先定义一个 handler

val handler = CoroutineExceptionHandler {
    context, exception -> println("Caught $exception")
}

然后:

runBlocking {
    val scope = CoroutineScope(Job())	//自定义一个作用域
    val job = scope.launch(handler) {
        letUsPrintln("one")
        throw Exception("Failed coroutine")
    }
    job.join()
    letUsPrintln("end")
}

打印结果如下:

one Thread_name:DefaultDispatcher-worker-1
Caught java.lang.Exception: Failed coroutine
end Thread_name:main

这里新建作用域的目的, 是防止 launch 作为 runBlocking 的子协程; 我们去掉自定义作用域:

runBlocking {
    val job = launch(handler) {
        letUsPrintln("one")
        throw Exception("Failed coroutine")
    }
    job.join()
    letUsPrintln("end")	//未打印
}

打印结果如下:

one Thread_name:main
Exception in thread "main" java.lang.Exception: Failed coroutine ...

没有捕获异常, crash了. 这是为什么呢?

可以向上取消的子协程(非supervisor) 会委托父协程处理它们的异常. 所以异常是交给父协程处理. 而CoroutineExceptionHandler只能处理未被处理的异常, 因此:

  • 把它加到 根协程 或作用域上. runBlocking,coroutineScope 中创建的协程不是根协程
  • 单向取消的子协程(例如: supervisorScope 下的一级子协程), 这样写: launch(handler), 可以捕获异常
  • 其他情况, 子协程即便带上Handler, 它也不生效

所以这样可以捕获异常:

lifecycleScope.launch(handler) {	//根协程 成功捕获异常
    letUsPrintln("111协程1")
    throw Exception("Failed coroutine")
}

这样无法捕获异常:

lifecycleScope.launch {
    letUsPrintln("111协程1")
    launch(handler) {	//不能捕获异常, 并引发 crash
        throw Exception("Failed coroutine")
    }
}

异常聚合:

当协程的多个子协程因异常而失败时, 一般规则是“取第一个异常”,因此将处理第一个异常。 在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常。

runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException  失败时,它将被取消
            } finally {
                throw ArithmeticException() // 第二个异常
            }
        }
        launch {
            delay(100)
            throw IOException() // 首个异常
        }
        delay(Long.MAX_VALUE)
    }
    job.join()
}

打印结果只有一句, 如下所示:

CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]

结论:

CoroutineExceptionHandler : 以下称之为 Handler

  • async异常 依赖用户调用 deferred.await(); 因此 Handler 在 async 这类协程构造器中无效;
  • 当子协程的取消可以向上传递时(非supervisor类), Handler 只能加到 根协程 或作用域上, 子协程即便带上Handler, 它也不生效
  • CoroutineExceptionHandler 将等到所有子协程运行结束后再回调, 在收尾工作完成后.
  • 它只是获得异常信息. 抛出异常时, 协程将会递归终止, 并且无法通过 Handler 恢复.
  • Handler 并不能恢复异常, 如果想捕获异常, 并使协程继续执行, 则应当使用 try{..}catch{..}

如下所示, try{..}catch{..} 放到协程体内部, 捕获最初的异常本体:

launch {
    try {
    	// do something
    	throw ArithmeticException()	// 假定这里是可能抛异常的正常代码
        delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException  失败时,它将被取消
    } catch (e: ArithmeticException){
        // do something
    }
}

四、监督:

我们知道, 当子协程异常时, 会连带父协程取消,直至取消整个作用域. 有时我们并不想要这样, 例如 UI 作用域被取消, 导致其他正常的UI操作不能执行. 因此我们需要让异常只向后传递.

1.SupervisorJob

使用 SupervisorJob 时,子协程的运行失败不会影响到其他子协程。也不会传播异常给它的父级,它会让子协程自己处理异常。

runBlocking {
    val supervisor = SupervisorJob() //取消单向传递的 job
    with(CoroutineScope(coroutineContext + supervisor)) {
        launch {	//兄弟协程
            delay(100)
            println("第一个协程执行完毕")
        }
        launch {	//第二个协程抛出异常;
            throw AssertionError("The second child is cancelled")
        }
        delay(300)
        println("作用域被取消没?")
    }
    println("全部执行完毕")
}

打印结果如下:

Exception in thread "main" java.lang.AssertionError: The first child is cancelled ...
第一个协程执行完毕
作用域被取消没?
全部执行完毕

可以看出, 异常打印后. 兄弟协程 及 作用域都没有被取消; 我们去掉 supervisor 再运行, 发现作用域协程被取消了. 可见是 SupervisorJob() 起了作用.

2.supervisorScope

对于作用域的并发,可以用 supervisorScope 来替代 coroutineScope 来实现相同的目的。它的直接子协程 将不会传播异常给它的父级.

runBlocking {
    supervisorScope {
        launch {	//兄弟协程
            delay(100)
            println("第一个协程执行完毕")
        }
        launch {	//第二个协程抛出异常;
            throw AssertionError("The second child is cancelled")
        }
        delay(300)
        println("作用域被取消没?")
    }
    println("全部执行完毕")
}

打印结果跟使用 with(CoroutineScope(coroutineContext + supervisor)) 时完全一致;


越级子协程

子子协程会不会将异常向上传递呢?

runBlocking {
	val scope = CoroutineScope(SupervisorJob())
    scope.launch {	//兄弟协程
        delay(100)
        println("第一个协程执行完毕")
    }
    scope.launch {	//协程二
        launch {    //第二个协程 的子协程 抛出异常;
            throw AssertionError("The second child is cancelled")
        }
        delay(200)
        println("第二个协程执行完毕?")	//未打印
    }
    delay(300)
    println("全部执行完毕")
}

打印结果如下:

Exception in thread "main" java.lang.AssertionError: The first child is cancelled ...
第一个协程执行完毕
全部执行完毕

可见, 第二个协程的完毕信息 未打印; 协程二 被取消; 这是因为监督只能作用一层, 它的直接子协程不会向上传递取消. 但子协程的内部还是普通的双向传递模式;

小结:

  • supervisorScope 会创建一个子作用域 (使用一个 SupervisorJob 作为父级); 以SupervisorJob 为父级的协程, 不会将取消操作向上级传递.
  • SupervisorJob 只有作为 supervisorScope 或 CoroutineScope(SupervisorJob()) 的一部分时,才会按照上面的描述工作。

SupervisorJob() 的使用,一定是配合作用域(CoroutineScope) 的创建; 但当它作为参数传入一个协程的 Builder 时 会怎么样?:

runBlocking {
    val handler = CoroutineExceptionHandler { _, exception -> println("Caught $exception")}
    val jobBase = SupervisorJob()
    launch(jobBase) {	//与异常协程同一父 job;
        delay(50)
        println("协程1 执行完毕")
    }
    launch {	//新建 Job 承袭 父Job
        delay(60)
        println("协程2 执行完毕")
    }
    launch {	//新建 Job 承袭 父Job
        delay(70)
        println("协程3 执行完毕")
    }
    launch(jobBase+handler) {	//新建 Job 承袭 jobBase
        throw AssertionError("The first child is cancelled")
    }
    delay(100)
    println("全部执行完毕")
}

打印结果如下:

Caught java.lang.AssertionError: The first child is cancelled
协程1 执行完毕
协程2 执行完毕
协程3 执行完毕
全部执行完毕

这种方式, 实际上是替换了本该从父协程中承袭的Job;
可见 同父Job的 协程1 并没有被取消; 我们换成 Job 试试; 只需要更换一句代码:

val jobBase = Job()

结果如下:

Caught java.lang.AssertionError: The first child is cancelled
协程2 执行完毕
协程3 执行完毕
全部执行完毕

可见 同父Job的 协程1 被取消; 协程2和协程3正常执行;

注意: 这种直接将Job传入协程Builder 的方式, 会破坏原本协程继承 Job的模式;

总结

CoroutineContext 协程上下文;

  • 调度器: 四种调度器, 可以指定协程的执行方式, 或执行线程
  • 还有协程别名, 局部变量, 拦截器, 异常处理器等

CoroutineStart 启动模式

  • 四种启动模式, 延迟启动等

异常处理:

  • CoroutineExceptionHandler: 处理未被处理的异常
  • 监督: 一般配合创建作用域 CoroutineScope(SupervisorJob()); 或使用 supervisorScope;

注意点:

  • 当一个协程由于异常而运行失败时, 会取消所有子协程, 取消自己, 再传播给父级, 直到取消整个作用域,
  • 异常处理器只能处理 未被处理的异常, 在双向取消的子协程中不起作用. 在 async 类协程中不起作用
  • 监督: 会在作用域内 使用一个SupervisorJob作为父级. 只能生效一层. 因为子协程会新建自己的Job, 子子协程继承的是 Job, 而不是 SupervisorJob
  • 当 async 不是根协程时, 异常仍然会通过 Job 向上传递, 导致作用域取消, crash等; runBlocking, coroutineScope 的代码块中创建的协程, 并不是根协程
上一篇:C++ boost非对称协程代码演示


下一篇:Skynet基于Actor模式的开源框架