Kotlin 之 协程(二)启动取消协程

协程的构建器

launch和async构建器都用来启动新协程
launch,返回一个job并且不附带任何结果值
async,返回一个Deferred,Deferred也是一个job,可以使用.await()在一个延期的值上得到它的最终结果

    //等待一个作业:join与await
    private fun runBlocking1(){
        //runBlocking可以把主线程变成一个协程
        //job1和job2是runBlocking的子协程
        //runBlocking会等待job1和job2这两个子协程执行完毕,会阻塞主线程(阻塞:按钮按下不会立马弹起job1和job2执行完了才会弹起)
        runBlocking {

            val job1 = launch {
                delay(2000)
                Log.v("zx", "job1 to finish")
            }

            val job2 = async {
                delay(2000)
                Log.v("zx", "job2 to finish")
                "job2 value"
            }
            //await可以得到返回值
            val job2Result = job2.await()
            Log.v("zx", "job2的返回值:$job2Result")

        }

需求:等待job1执行完毕以后再执行job2和job3
如果通过launch来启动的话,用join函数
如果通过async来启动的话,用await函数

        
        //join和await都是挂起函数,不会阻塞主线程

        //如果通过launch来启动的话,用join函数
        runBlocking {

            val job1 = launch {
                delay(2000)
                Log.v("zx", "job1 to finish")
            }
            //这个函数会等待job1执行完后才会执行后面的
            job1.join()

            val job2 = launch {
                delay(100)
                Log.v("zx", "job2 to finish")
            }
            val job3 = launch {
                delay(100)
                Log.v("zx", "job3 to finish")
            }

        }
        //如果通过async来启动的话,用await函数
        runBlocking {

            val job1 = async {
                delay(2000)
                Log.v("zx", "job1 to finish2")
            }
            //这个函数会等待job1执行完后才会执行后面的
            job1.await()

            val job2 = async {
                delay(100)
                Log.v("zx", "job2 to finish2")
            }
            val job3 = async {
                delay(100)
                Log.v("zx", "job3 to finish2")
            }

        }
    }

需求:前面2个任务相加的结果给第三个任务(async结构化并发)

   //runBlocking 在主线程中,子协程会继承父协程的上下文
   //runBlocking是Dispatchers.Main中启动的,doOne和doTwo也会使用父协程的调度器Dispatchers.Main中启动
    private fun runBlocking2() {
        //前面2个任务相加的结果给第三个任务(async结构化并发)
        runBlocking {
            val time = measureTimeMillis {
                //同步的
                val one = doOne()
                val two = doTwo()
                Log.v("zx", "数据${one + two}")
            }
            Log.v("zx", "time = $time")
        }
        runBlocking {
            val time = measureTimeMillis {
                //异步的
                val one = async { doOne() }
                val two = async { doTwo() }
                Log.v("zx", "数据${one.await() + two.await()}")
                
                //下面这种写法是错误的
                //val one2 = async { doOne() }.await()
                //val two2 = async { doTwo() }.await()
                //Log.v("zx","数据${one2+two2}")
            }
            Log.v("zx", "asynctime = $time")
        }
    }


    private suspend fun doOne():Int{
        delay(1000)
        return 1
    }
    private suspend fun doTwo():Int{
        delay(1000)
        return 2
    }

协程的四种启动模式

CoroutineStart.DEFAULT: 协程创建后立即开始调度,调度前如果协程被取消,则执行取消
CoroutineStart.ATOMIC: 协程创建后立即开始调度,协程执行到第一个挂起点之前不响应取消
CoroutineStart.LAZY: 协程被需要时,包括主动调用协程的start,join,await等函数时才会开始调度,如果调度前被取消,则协程进入异常结束状态
CoroutineStart.UNDISPATCHED: 协程创建后立即在当前函数栈中执行,直到遇到第一个真正挂起的点

    private fun runBlocking3(){
        //runBlocking会等待所有子协程全部执行完
        runBlocking {
            val job1 = launch(start = CoroutineStart.DEFAULT) {
                delay(3000)
                Log.v("zx","finished")
            }
            delay(1000)
            //CoroutineStart.DEFAULT则会被取消
            job1.cancel()

            val job11 = launch(start = CoroutineStart.ATOMIC) {
                //delay就是第一个挂起函数,delay这里就是第一个挂起点,
                // 如果没执行到第一个挂起点之前取消,ATOMIC是不响应取消的
                delay(3000)
                Log.v("zx","finished")
            }
            delay(1000)
            job11.cancel()


            val job2 = async(start = CoroutineStart.LAZY) {
                20
            }
            delay(2000)
            //调度前被取消,那么进入异常状态
            job2.cancel()
            //如果是launch就用join启动,如果是async就用start或await启动
            Log.v("zx","job2 ${job2.await()}")

            //如何实现使用Dispatchers.IO,你的协程仍然在主线程里面?
            //答:使用CoroutineStart.UNDISPATCHED,因为当前函数runBlocking在主线程

            //DISPATCHED是转发,UNDISPATCHED的意思是不转发(在主线程创建的协程,就在主线程执行)
            //UNDISPATCHED是立即执行,而其他的是立即调度,立即调度不代表立即执行
            //立即在当前函数栈中执行,当前函数栈就是在主线程中
            val job3 = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
                Log.v("zx","当前${Thread.currentThread().name}")
            }
        }

    }

协程的作用域构建器

coroutineScope和runBlocking

区别是
runBlocking是常规函数,而coroutineScope是挂起函数,他们都会等待子协程执行结束
runBlocking会阻塞当前线程来等待
coroutineScope只是挂起,会释放底层线程用于其他用途

coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消
supervisorScope:一个协程失败了,不会影响其他兄弟协程

coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消

    private fun runBlocking4(){
        //结构化并发,CoroutineScope(作用域构建器)

        runBlocking{
            //协程作用域,coroutineScope一定要等待job1和job2这两个子协程执行完毕,
            //coroutineScope继承的父协程的协程作用域
            coroutineScope {
                val job1 = launch {
                    delay(500)
                    Log.v("zx", "job1 to finish")
                }

                val job2 = async {
                    delay(100)
                    Log.v("zx", "job2 to finish")
                    "job2 value"
                    throw NullPointerException()
                }
            }
        }
    }

supervisorScope:一个协程失败了,不会影响其他兄弟协程

    private fun runBlocking4(){
        //结构化并发,CoroutineScope(作用域构建器)

        runBlocking{
            //协程作用域,coroutineScope一定要等待job1和job2这两个子协程执行完毕,
            //coroutineScope继承的父协程的协程作用域
            supervisorScope {
                val job1 = launch {
                    delay(500)
                    Log.v("zx", "job1 to finish")
                }

                val job2 = async {
                    delay(100)
                    Log.v("zx", "job2 to finish")
                    "job2 value"
                    throw NullPointerException()
                }
            }
        }
    }

Job对象

  • 每个创建的协程(通过launch或async)会返回一个job实例,该实例是协程的唯一标识,并负责管理协程的生命周期
  • 一个任务可以包含一系列状态:新创建(New),活跃(Active),完成中(completing),已完成(completed),取消中(Canceling),已取消(Cancelled),虽然我们无法直接访问这些状态,但是我们可以访问job的属性,isActive,isCanceled和isCompleted

job的生命周期

如果协程处于活跃状态,协程运行出错或者调用job.cancel()都会将当前任务置为取消中(isActive = false isCanceled = true),当所有子协程都完成后,协程会进入已取消状态(isCanceled = true),此时isCompleted = true

协程的取消

  • 取消作用域会取消它的子协程
  • 被取消的子协程并不会影响其他兄弟协程
  • 协程通过抛出CancellationException来处理取消操作
  • 所有kotlinx.coroutines中的挂起函数(withcontext,delay等)都是可取消的
               runBlocking {
            //CoroutineScope自己构建一个协程作用域,不继承runBlocking父协程的上下文
            val scope = CoroutineScope(Dispatchers.Default)
            val job1 = scope.launch {
                try {
                    delay(1000)
                    Log.v("zx", "job1")
                } catch (e: Exception) {
                    e.printStackTrace()
                }


            }
            val job2 = scope.launch {
                delay(1000)
                Log.v("zx", "job2")

            }

            delay(100)
            //这里取消作用域,那么子协程就会被取消
            //被取消的子协程并不会影响其他兄弟协程,所以job2打印出来了
            //job1.cancel()
            //自定义取消异常
            job1.cancel(CancellationException("我取消了"))
            //这里会先打印,runBlocking不会等待CoroutineScope里面的子协程执行完毕
            Log.v("zx", "runBlocking")
        }
        
打印:
com.z.zjetpack V/zx: runBlocking
com.z.zjetpack W/System.err: java.util.concurrent.CancellationException: 我取消了
com.z.zjetpack V/zx: job2

CPU密集型任务取消

isActive是一个可以使用在CoroutineScope的拓展属性,检查job是否处于活跃状态
ensureActive():如果job处于非活跃状态,这个方法会立即抛出异常
yield函数会检查所在协程状态,如果已经取消则抛出CancellationException予以响应。它还会尝试让出线程执行权,给其他协程提供执行机会。(如果此任务特别抢占系统资源,那么可以使用yield)

如下是不包含挂起函数的密集型任务
             runBlocking {
            val startTime = System.currentTimeMillis()
            val job1 = launch(Dispatchers.Default) {
                var nextPrintTime = startTime
                var i = 0
                while (i < 5) {
                    //每隔0.5秒打印一次
                    if (System.currentTimeMillis() > nextPrintTime) {
                        Log.v("zx", "i = ${i++}")
                        nextPrintTime += 500
                    }

                }
            }
           
            Log.v("zx", "等待取消")
            delay(1000)
            //因为不存在suspend关键字的挂起函数,所以无法取消
            //job1.cancel()
            //job1.join()
            //等同于上方2个方法,为什么要用join,join是等待的意思,执行cancel()方法后,不会立马取消而是进入cancelling,
            //即取消中,所以join方法是等待取消中变为取消完成。
            job1.cancelAndJoin()
            Log.v("zx", "取消中")
        }

打印:
com.z.zjetpack V/zx: 等待取消
com.z.zjetpack V/zx: i = 0
com.z.zjetpack V/zx: i = 1
com.z.zjetpack V/zx: i = 2
com.z.zjetpack V/zx: i = 3
com.z.zjetpack V/zx: i = 4
com.z.zjetpack V/zx: 已取消

可以发现,我们调用了cancelAndJoin去执行取消,最终的结果是并没有取消,那么这种密集型任务怎么取消呢?

while (i < 5 && isActive) 
while (i < 5) {
  ensureActive()
  ...
while (i < 5) {
   yield()
   ...
打印:
com.z.zjetpack V/zx: 等待取消
com.z.zjetpack V/zx: i = 0
com.z.zjetpack V/zx: i = 1
com.z.zjetpack V/zx: i = 2
com.z.zjetpack V/zx: 已取消

上面3种方式都可以取消。

协程取消的副作用

  • 在finally种释放资源
  • use 函数:该函数只能被实现了Closeable的对象使用,程序结束时会自动调用close方法,适合文件对象。

因为协程取消了就会抛出异常,那么下面的代码就不会执行了,下面的代码有可能要释放资源,那么下面的代码不执行了,也就不会释放资源了,比如IO操作等,那么怎么处理呢?
答:在finally种释放资源,不管取不取消,finally代码块都会执行

        runBlocking {
           val job1=  launch {
               try {
                   repeat(10) {
                       Log.v("zx","sleep")
                       delay(1000)
                   }
               }finally {
                   //不管取不取消这里都会执行
                   //取消后会抛出异常,不影响我释放资源
                   Log.v("zx","释放资源")
               }

            }
            delay(2000)
            job1.cancel()
        }
use函数,比如我们需要读取txt文件

普通写法:

    private fun read(){
        val input = assets.open("1.txt")
        val br = BufferedReader(InputStreamReader(input))
        with(br) {
            var line: String?
            try {
                while (true) {
                    line = readLine() ?: break
                    Log.v("zx","数据$line")
                }
            } finally {
                close()
            }
        }
    }

use写法

    private fun readUse(){
        val input = assets.open("1.txt")
        val br = BufferedReader(InputStreamReader(input))
        with(br) {
            var line: String?
            use {
                while (true) {
                    line = readLine() ?: break
                    Log.v("zx","数据$line")
                }
            }
        }
    }

不能取消的任务

  • 处于取消中的协程不能够挂起,当协程被取消后想要调挂起函数,需要放在withContext(NonCancellable) 中,这样会挂起运行中的代码并保持取消中状态,直到任务处理完成。

例子:

        runBlocking {
            val job1 = launch {
                try {
                    repeat(10) {
                        Log.v("zx", "sleep")
                        delay(1000)
                    }
                } finally {
                    Log.v("zx", "开始sleep")
                    delay(1000)
                    Log.v("zx", "结束sleep")
                }


            }
            delay(2000)
            job1.cancel()
        }
打印:
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: 开始sleep

可以发现结束sleep永远不会打印出来,那怎么办呢?
使用 withContext(NonCancellable)

        runBlocking {
            val job1 = launch {
                try {
                    repeat(10) {
                        Log.v("zx", "sleep")
                        delay(1000)
                    }
                } finally {
                    //如果想要协程的取消不影响这里调用挂起函数,那么需要用到 withContext(NonCancellable) 
                        // 长驻任务也可以用这个
                    withContext(NonCancellable) {
                        Log.v("zx", "开始sleep")
                        delay(1000)
                        Log.v("zx", "结束sleep")
                    }

                }


            }
            delay(2000)
            job1.cancel()
        }
打印:
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: 开始sleep
com.z.zjetpack V/zx: 结束sleep

超时任务

  • 很多请求取消协程的理由是它有可能超时
  • withTimeoutOrNull通过返回null来进行超时操作,从而替代抛出一个异常
    例子
  runBlocking {
           //需要在1秒内处理完
            withTimeout(1000) {
                repeat(10) {
                    Log.v("zx", "sleep")
                    delay(500)
                }
            }
        }

如上,如果在一秒内没处理完,那么就会抛出异常 kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms

那么如果网络请求在1秒内没返回,我们不想抛出异常,只想返回个默认值怎么办呢?
那么如果我们不想抛出异常,只想返回个null值的情况,该怎么做呢?
答:使用withTimeoutOrNull

  runBlocking {
            //1秒内处理完,如果在1秒内没处理完,那么就返回null来代替抛出异常
            val result = withTimeoutOrNull(1000) {
                repeat(10) {
                    Log.v("zx", "sleep")
                    delay(500)
                }
                "完成"
            } ?: "默认数据"

            Log.v("zx", "结果:$result")
        }
打印:
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: 结果:默认数据

如上,如果在1秒内完成了,那么结果为 完成,如果没做完会返回结果null,为null即显示默认数据

上一篇:2020年Android面试笔试总结(Android精心整理篇),kotlin常量


下一篇:Android 高质量开发之崩溃优化,kotlin匿名内部类