协程的构建器
launch和async构建器都用来启动新协程
launch,返回一个job并且不附带任何结果值
async,返回一个Deferred,Deferred也是一个job,可以使用.await()在一个延期的值上得到它的最终结果
//等待一个作业:join与await
private fun runBlocking1(){
//runBlocking可以把主线程变成一个协程
//job1和job2是runBlocking的子协程
//runBlocking会等待job1和job2这两个子协程执行完毕,会阻塞主线程(阻塞:按钮按下不会立马弹起job1和job2执行完了才会弹起)
runBlocking {
val job1 = launch {
delay(2000)
Log.v("zx", "job1 to finish")
}
val job2 = async {
delay(2000)
Log.v("zx", "job2 to finish")
"job2 value"
}
//await可以得到返回值
val job2Result = job2.await()
Log.v("zx", "job2的返回值:$job2Result")
}
需求:等待job1执行完毕以后再执行job2和job3
如果通过launch来启动的话,用join函数
如果通过async来启动的话,用await函数
//join和await都是挂起函数,不会阻塞主线程
//如果通过launch来启动的话,用join函数
runBlocking {
val job1 = launch {
delay(2000)
Log.v("zx", "job1 to finish")
}
//这个函数会等待job1执行完后才会执行后面的
job1.join()
val job2 = launch {
delay(100)
Log.v("zx", "job2 to finish")
}
val job3 = launch {
delay(100)
Log.v("zx", "job3 to finish")
}
}
//如果通过async来启动的话,用await函数
runBlocking {
val job1 = async {
delay(2000)
Log.v("zx", "job1 to finish2")
}
//这个函数会等待job1执行完后才会执行后面的
job1.await()
val job2 = async {
delay(100)
Log.v("zx", "job2 to finish2")
}
val job3 = async {
delay(100)
Log.v("zx", "job3 to finish2")
}
}
}
需求:前面2个任务相加的结果给第三个任务(async结构化并发)
//runBlocking 在主线程中,子协程会继承父协程的上下文
//runBlocking是Dispatchers.Main中启动的,doOne和doTwo也会使用父协程的调度器Dispatchers.Main中启动
private fun runBlocking2() {
//前面2个任务相加的结果给第三个任务(async结构化并发)
runBlocking {
val time = measureTimeMillis {
//同步的
val one = doOne()
val two = doTwo()
Log.v("zx", "数据${one + two}")
}
Log.v("zx", "time = $time")
}
runBlocking {
val time = measureTimeMillis {
//异步的
val one = async { doOne() }
val two = async { doTwo() }
Log.v("zx", "数据${one.await() + two.await()}")
//下面这种写法是错误的
//val one2 = async { doOne() }.await()
//val two2 = async { doTwo() }.await()
//Log.v("zx","数据${one2+two2}")
}
Log.v("zx", "asynctime = $time")
}
}
private suspend fun doOne():Int{
delay(1000)
return 1
}
private suspend fun doTwo():Int{
delay(1000)
return 2
}
协程的四种启动模式
CoroutineStart.DEFAULT: 协程创建后立即开始调度,调度前如果协程被取消,则执行取消
CoroutineStart.ATOMIC: 协程创建后立即开始调度,协程执行到第一个挂起点之前不响应取消
CoroutineStart.LAZY: 协程被需要时,包括主动调用协程的start,join,await等函数时才会开始调度,如果调度前被取消,则协程进入异常结束状态
CoroutineStart.UNDISPATCHED: 协程创建后立即在当前函数栈中执行,直到遇到第一个真正挂起的点
private fun runBlocking3(){
//runBlocking会等待所有子协程全部执行完
runBlocking {
val job1 = launch(start = CoroutineStart.DEFAULT) {
delay(3000)
Log.v("zx","finished")
}
delay(1000)
//CoroutineStart.DEFAULT则会被取消
job1.cancel()
val job11 = launch(start = CoroutineStart.ATOMIC) {
//delay就是第一个挂起函数,delay这里就是第一个挂起点,
// 如果没执行到第一个挂起点之前取消,ATOMIC是不响应取消的
delay(3000)
Log.v("zx","finished")
}
delay(1000)
job11.cancel()
val job2 = async(start = CoroutineStart.LAZY) {
20
}
delay(2000)
//调度前被取消,那么进入异常状态
job2.cancel()
//如果是launch就用join启动,如果是async就用start或await启动
Log.v("zx","job2 ${job2.await()}")
//如何实现使用Dispatchers.IO,你的协程仍然在主线程里面?
//答:使用CoroutineStart.UNDISPATCHED,因为当前函数runBlocking在主线程
//DISPATCHED是转发,UNDISPATCHED的意思是不转发(在主线程创建的协程,就在主线程执行)
//UNDISPATCHED是立即执行,而其他的是立即调度,立即调度不代表立即执行
//立即在当前函数栈中执行,当前函数栈就是在主线程中
val job3 = async(context = Dispatchers.IO, start = CoroutineStart.UNDISPATCHED) {
Log.v("zx","当前${Thread.currentThread().name}")
}
}
}
协程的作用域构建器
coroutineScope和runBlocking
区别是
runBlocking是常规函数,而coroutineScope是挂起函数,他们都会等待子协程执行结束
runBlocking会阻塞当前线程来等待
coroutineScope只是挂起,会释放底层线程用于其他用途
coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消
supervisorScope:一个协程失败了,不会影响其他兄弟协程
coroutineScope:一个协程失败了,所有其他兄弟协程也会被取消
private fun runBlocking4(){
//结构化并发,CoroutineScope(作用域构建器)
runBlocking{
//协程作用域,coroutineScope一定要等待job1和job2这两个子协程执行完毕,
//coroutineScope继承的父协程的协程作用域
coroutineScope {
val job1 = launch {
delay(500)
Log.v("zx", "job1 to finish")
}
val job2 = async {
delay(100)
Log.v("zx", "job2 to finish")
"job2 value"
throw NullPointerException()
}
}
}
}
supervisorScope:一个协程失败了,不会影响其他兄弟协程
private fun runBlocking4(){
//结构化并发,CoroutineScope(作用域构建器)
runBlocking{
//协程作用域,coroutineScope一定要等待job1和job2这两个子协程执行完毕,
//coroutineScope继承的父协程的协程作用域
supervisorScope {
val job1 = launch {
delay(500)
Log.v("zx", "job1 to finish")
}
val job2 = async {
delay(100)
Log.v("zx", "job2 to finish")
"job2 value"
throw NullPointerException()
}
}
}
}
Job对象
- 每个创建的协程(通过launch或async)会返回一个job实例,该实例是协程的唯一标识,并负责管理协程的生命周期
- 一个任务可以包含一系列状态:新创建(New),活跃(Active),完成中(completing),已完成(completed),取消中(Canceling),已取消(Cancelled),虽然我们无法直接访问这些状态,但是我们可以访问job的属性,isActive,isCanceled和isCompleted
job的生命周期
如果协程处于活跃状态,协程运行出错或者调用job.cancel()都会将当前任务置为取消中(isActive = false isCanceled = true),当所有子协程都完成后,协程会进入已取消状态(isCanceled = true),此时isCompleted = true
协程的取消
- 取消作用域会取消它的子协程
- 被取消的子协程并不会影响其他兄弟协程
- 协程通过抛出CancellationException来处理取消操作
- 所有kotlinx.coroutines中的挂起函数(withcontext,delay等)都是可取消的
runBlocking {
//CoroutineScope自己构建一个协程作用域,不继承runBlocking父协程的上下文
val scope = CoroutineScope(Dispatchers.Default)
val job1 = scope.launch {
try {
delay(1000)
Log.v("zx", "job1")
} catch (e: Exception) {
e.printStackTrace()
}
}
val job2 = scope.launch {
delay(1000)
Log.v("zx", "job2")
}
delay(100)
//这里取消作用域,那么子协程就会被取消
//被取消的子协程并不会影响其他兄弟协程,所以job2打印出来了
//job1.cancel()
//自定义取消异常
job1.cancel(CancellationException("我取消了"))
//这里会先打印,runBlocking不会等待CoroutineScope里面的子协程执行完毕
Log.v("zx", "runBlocking")
}
打印:
com.z.zjetpack V/zx: runBlocking
com.z.zjetpack W/System.err: java.util.concurrent.CancellationException: 我取消了
com.z.zjetpack V/zx: job2
CPU密集型任务取消
isActive是一个可以使用在CoroutineScope的拓展属性,检查job是否处于活跃状态
ensureActive():如果job处于非活跃状态,这个方法会立即抛出异常
yield函数会检查所在协程状态,如果已经取消则抛出CancellationException予以响应。它还会尝试让出线程执行权,给其他协程提供执行机会。(如果此任务特别抢占系统资源,那么可以使用yield)
如下是不包含挂起函数的密集型任务
runBlocking {
val startTime = System.currentTimeMillis()
val job1 = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) {
//每隔0.5秒打印一次
if (System.currentTimeMillis() > nextPrintTime) {
Log.v("zx", "i = ${i++}")
nextPrintTime += 500
}
}
}
Log.v("zx", "等待取消")
delay(1000)
//因为不存在suspend关键字的挂起函数,所以无法取消
//job1.cancel()
//job1.join()
//等同于上方2个方法,为什么要用join,join是等待的意思,执行cancel()方法后,不会立马取消而是进入cancelling,
//即取消中,所以join方法是等待取消中变为取消完成。
job1.cancelAndJoin()
Log.v("zx", "取消中")
}
打印:
com.z.zjetpack V/zx: 等待取消
com.z.zjetpack V/zx: i = 0
com.z.zjetpack V/zx: i = 1
com.z.zjetpack V/zx: i = 2
com.z.zjetpack V/zx: i = 3
com.z.zjetpack V/zx: i = 4
com.z.zjetpack V/zx: 已取消
可以发现,我们调用了cancelAndJoin去执行取消,最终的结果是并没有取消,那么这种密集型任务怎么取消呢?
while (i < 5 && isActive)
while (i < 5) {
ensureActive()
...
while (i < 5) {
yield()
...
打印:
com.z.zjetpack V/zx: 等待取消
com.z.zjetpack V/zx: i = 0
com.z.zjetpack V/zx: i = 1
com.z.zjetpack V/zx: i = 2
com.z.zjetpack V/zx: 已取消
上面3种方式都可以取消。
协程取消的副作用
- 在finally种释放资源
- use 函数:该函数只能被实现了Closeable的对象使用,程序结束时会自动调用close方法,适合文件对象。
因为协程取消了就会抛出异常,那么下面的代码就不会执行了,下面的代码有可能要释放资源,那么下面的代码不执行了,也就不会释放资源了,比如IO操作等,那么怎么处理呢?
答:在finally种释放资源,不管取不取消,finally代码块都会执行
runBlocking {
val job1= launch {
try {
repeat(10) {
Log.v("zx","sleep")
delay(1000)
}
}finally {
//不管取不取消这里都会执行
//取消后会抛出异常,不影响我释放资源
Log.v("zx","释放资源")
}
}
delay(2000)
job1.cancel()
}
use函数,比如我们需要读取txt文件
普通写法:
private fun read(){
val input = assets.open("1.txt")
val br = BufferedReader(InputStreamReader(input))
with(br) {
var line: String?
try {
while (true) {
line = readLine() ?: break
Log.v("zx","数据$line")
}
} finally {
close()
}
}
}
use写法
private fun readUse(){
val input = assets.open("1.txt")
val br = BufferedReader(InputStreamReader(input))
with(br) {
var line: String?
use {
while (true) {
line = readLine() ?: break
Log.v("zx","数据$line")
}
}
}
}
不能取消的任务
- 处于取消中的协程不能够挂起,当协程被取消后想要调挂起函数,需要放在withContext(NonCancellable) 中,这样会挂起运行中的代码并保持取消中状态,直到任务处理完成。
例子:
runBlocking {
val job1 = launch {
try {
repeat(10) {
Log.v("zx", "sleep")
delay(1000)
}
} finally {
Log.v("zx", "开始sleep")
delay(1000)
Log.v("zx", "结束sleep")
}
}
delay(2000)
job1.cancel()
}
打印:
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: 开始sleep
可以发现结束sleep永远不会打印出来,那怎么办呢?
使用 withContext(NonCancellable)
runBlocking {
val job1 = launch {
try {
repeat(10) {
Log.v("zx", "sleep")
delay(1000)
}
} finally {
//如果想要协程的取消不影响这里调用挂起函数,那么需要用到 withContext(NonCancellable)
// 长驻任务也可以用这个
withContext(NonCancellable) {
Log.v("zx", "开始sleep")
delay(1000)
Log.v("zx", "结束sleep")
}
}
}
delay(2000)
job1.cancel()
}
打印:
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: 开始sleep
com.z.zjetpack V/zx: 结束sleep
超时任务
- 很多请求取消协程的理由是它有可能超时
- withTimeoutOrNull通过返回null来进行超时操作,从而替代抛出一个异常
例子
runBlocking {
//需要在1秒内处理完
withTimeout(1000) {
repeat(10) {
Log.v("zx", "sleep")
delay(500)
}
}
}
如上,如果在一秒内没处理完,那么就会抛出异常 kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
那么如果网络请求在1秒内没返回,我们不想抛出异常,只想返回个默认值怎么办呢?
那么如果我们不想抛出异常,只想返回个null值的情况,该怎么做呢?
答:使用withTimeoutOrNull
runBlocking {
//1秒内处理完,如果在1秒内没处理完,那么就返回null来代替抛出异常
val result = withTimeoutOrNull(1000) {
repeat(10) {
Log.v("zx", "sleep")
delay(500)
}
"完成"
} ?: "默认数据"
Log.v("zx", "结果:$result")
}
打印:
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: sleep
com.z.zjetpack V/zx: 结果:默认数据
如上,如果在1秒内完成了,那么结果为 完成,如果没做完会返回结果null,为null即显示默认数据
完