协程基础
协程是一个可挂起的计算实例。和线程的概念相似,协程可以让一段代码块与另一端代码块并发执行。协程不绑定到一个特定线程上,有可能一个协程在一个线程中挂起,然后在另一个线程中恢复。
协程可以被想象成轻量级的线程,但是一些重要的区别决定了协程在日常使用中与线程的不同。
以下是你的第一个协程代码:
import kotlinx.coroutines.*;
fun main() = runBlocking {
launch {
delay(1000L)
println("World!")
}
println("Hello")
}
launch
是一个协程构建器,它启动一个新的协程与剩余代码独立的并发执行。
delay
是一个特殊的挂起函数
(suspending function),它使协程挂起一段特定的时间。挂起一个协程并不阻塞其底层线程,同时允许其他协程使用该底层线程执行它们的代码。
runBlocking
连接非协程世界里的main
方法和runBlocking{ ... }
大括号中的代码。
runBlocking
中写的代码都在CoroutineScope
中,可以使用其中定义的所有方法,比如launch
。这个设计模式是结合了Kotlin的lambda表达式整出的花活,最近在Jetpack Compose中经常看到,emmm,我还不太懂。
注意,launch
并不是一个全局方法,只有在CoroutineScope
中才能使用,这也是为什么使用了runBlocking
包裹。
runBlocking
方法还意味着,运行它的线程将暂时阻塞,直到runBlocking
花括号中的代码全部执行完毕。
提取方法
可以将launch
中的逻辑提取到一个方法中。
fun main() {
runBlocking {
launch { doWork() }
println("Hello")
}
}
suspend fun doWork() {
delay(1000L)
println("World!")
}
这里使用了suspend
关键字,也可以不使用,suspend
关键字的含义是该方法可以被挂起。一般只有在方法中有耗时操作时才加suspend
关键字。suspend
函数只能在协程中或另一个suspend
函数中被调用,也就是说普通函数无法调用suspend
函数,delay
这种函数我们都没法在普通函数中调用。
coroutineScope
和runBlocking
除了使用runBlocking
创建一个协程作用域之外,还可以使用coroutineScope
构建自己的协程作用域。
suspend fun doWork() = coroutineScope {
launch { println("Hello!") }
}
它俩看起来没区别,实际上还是有区别的。
它们都创建一个协程作用域,且等待内部协程和所有子协程执行完成,但是runBlocking
是阻塞它正在运行的线程的,而coroutineScope
没这回事。
再次强调,runBlocking
只是连接非协程世界和协程世界的一座桥,应该只在main
或测试中使用它,也正是因为这个原因,所以它是一个非suspend
方法,以确保它能从非协程世界过渡到协程世界,而coroutineScope
则是一个suspend
方法,它只能在你当前已经在协程作用域时构建。
Scope构建器和并发
你可以在一个CoroutineScope中通过多个launch
操作并发的启动多个协程
fun main() = runBlocking {
doWork()
println("Done")
}
suspend fun doWork() = coroutineScope {
launch {
delay(2000L)
println("World 2")
}
launch {
delay(1000L)
println("World 1")
}
println("Hello")
}
结果
Hello
World 1
World 2
Done
Job
launch
返回一个代表该协程的job
,可以使用该job
对象进行一些操作,比如调用job.join()
等待协程完成。
fun main() = runBlocking {
doWork()
}
suspend fun doWork() = coroutineScope {
val job1 = launch {
delay(2000L)
println("World 2")
}
val job2 = launch {
delay(1000L)
println("World 1")
}
println("Hello")
job1.join()
job2.join()
println("Done")
}
结果
Hello
World 1
World 2
Done
协程很轻量
你可以轻松的启动10万个协程,而使用线程的话,这将是个很难以做到的事。
fun main() = runBlocking {
repeat(100000) { num ->
launch {
delay(5000L)
print(num)
}
}
}
取消与超时
可以通过job
对象取消运行中协程
fun main() = runBlocking {
val job = launch {
repeat(5000) { num ->
delay(1000L)
print(num)
}
}
delay(5000L)
job.cancel()
job.join()
println("Done")
}
结果
0123Done
job.cancelAndJoin()
可以进一步简化上面的代码,将cancel
和join
操作合并为一步。
取消是协作的
当使用线程时,线程的取消是协作的,但刚刚仿佛我们并没做什么协作,协程就直接取消了。其实协程的取消也是协作的,所有kotlinx.coroutines
中的suspend
方法(像delay
这种方法),它会检测协程是否已经被取消,如果取消了就抛出CancellationException
停止协程的执行。
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
var i = 0;
try {
while ( i < 10 ) {
delay(1000L)
print(i++)
}
} catch (e: CancellationException) {
println("Job is cancelled")
}
}
delay(5000L)
job.cancelAndJoin()
println("Done")
}
结果
01234Job is cancelled
Done
所以说当我们没写try-catch
时,是delay()
抛出的异常帮我们隐式结束了协程。
特别注意:这个try-catch最好不要写在循环之中,很多情况下它会造成问题。
思考下,这个job
在被取消时会发生什么
val job = launch(Dispatchers.Default) {
var i = 0;
while ( i < 10 ) {
try {
delay(1000L)
print(i++)
} catch (e: CancellationException) {
println("Job is cancelled")
}
}
}
会发生死循环,因为delay
检测到协程已经被取消,抛出异常,异常被catch
正常捕获后,就不会退出循环了,继续执行循环,然后下一次循环时delay
依然检测到协程已被取消,继续抛出异常,异常又被正常捕获,继续恢复循环......
如果没有使用能够自动检测协程取消的suspend
方法,可以使用isActive来检测协程是否已经被取消。
fun main() = runBlocking {
val job = launch(Dispatchers.Default) {
var i = 0;
while ( isActive && i < 10000000) {
println(i++)
}
}
delay(1000L)
job.cancelAndJoin()
println("Done")
}
协程明显是执行过程中就退出了
运行不能取消的代码块
由于suspend
方法都会检测协程是否已经取消了,取消后就不能再调用,这时如果业务逻辑还限定我们还必须调用,我们就很难受了。
fun main() = runBlocking {
val job = launch {
try {
repeat(5000) { num ->
delay(1000L)
print(num)
}
} finally {
delay(1000)
// 在协程被取消的情况下不会成功输出
println("OKAY. RELEASE THEM!!")
}
}
delay(5000L)
job.cancelAndJoin()
println("Done")
}
这里,finally
中希望挂起一秒再进行资源释放操作,但很不巧,因为协程已被挂起delay
会继续抛出异常,下面的println
永远不会被输出。
可以使用withContext(NonCancellable)
包裹,它创建一个不可被取消的CoroutineScope
。
finally {
withContext(NonCancellable) {
delay(1000)
println("OKAY. RELEASE THEM!!")
}
}
超时
虽然使用上面我们一直使用的joinAndCancel
也能做到超时放弃任务的效果,但是写起来还是很不舒服。
withTimeout
等待一个超时时间,时间到了之后抛出TimeoutCancellationException
异常
fun main() = runBlocking {
withTimeout(5000){
repeat(5000) { num ->
delay(1000L)
print(num)
}
}
println("Done")
}
结果
0123Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms
TimeoutCancellationException
是CancellationException
的子类,但(我猜)只有CancellationException
被认为是协程正常结束的原因,所以这个异常没有被隐式捕获,而是抛出了。
另一个就是withTimeoutOrNull
,它不会抛出异常,它在超时时只是返回null。
如下的代码,当没有超时正常完成时,result
接收到Done
,而当超时时,withTimeoutOrNull
会返回null
给它。
fun main() = runBlocking {
val result = withTimeoutOrNull(5000){
repeat(5000) { num ->
delay(1000L)
print(num)
}
"Done"
}
println(result)
}
结果
0123null
异步超时和资源
withTimeout
中的超时事件相对于其大括号中的代码是异步执行的,也就是说超时可能在任何时机被触发,甚至是withTimeout
返回前。
如下的代码使用一个计数器计算有多少资源被拿走了,预期情况下,该代码应该返回0,但是由于存在部分Resource()
创建完毕还未实际返回给resource
时60ms的超时时间已经到了,所以并没有被正常关闭。
var acquired = 0
class Resource {
init { acquired++ } // 拿走一个资源对象
fun close() { acquired-- } // 释放该资源对象
}
fun main() {
runBlocking {
repeat(100_000) { // Launch 100K coroutines
launch {
val resource = withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
Resource() // 拿走一个资源对象并返回给resource
}
resource.close() // 释放该资源对象
}
}
}
// 当`runBlocking`中所有协程执行完执行
println(acquired) // 打印出有多少资源被拿走
}
注意,这里的递增和递减操作是完全安全的,官方文档说具体会在“协程上下文”一节中说明。
修改上面launch
代码块中的代码,加入try-catch
var resource: Resource? = null
try {
withTimeout(60) {
delay(50)
resource = Resource()
}
} finally {
resource?.close()
}
也就是说,对于那些必须安全关闭的资源,不适合去依赖withTimeout
或withTimeoutOrNull
的返回
组合挂起函数
默认顺序调用
suspend fun doSomethingUsefulOne(): Int {
delay(1000L)
return 13;
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L)
return 26;
}
我们有两个挂起函数,如果我们想要它俩的返回值,并计算它们返回值相加的结果,按照默认顺序就是这样:
fun main() = runBlocking {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Total Time: ${time}")
}
想想就知道doSomethingUsefulOne
和doSomethingUsefulTwo
会被顺序调用,第二个会等待第一个完成再开始执行。
结果
The answer is 39
Total Time: 2009
但是很多时候我们需要两个函数异步执行,等两个都执行完毕,去计算结果,其实我们用返回Job
的launch
和两个外部变量,或者用withTimeout
这些都能完成,只不过哪个好像都不是用在这的东西。async
可以解决这个问题。
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
async
并不直接返回值,它返回一个Deferred
对象(Job对象的子类)。像线程中的Future
,这个对象可以代表一个未来值,它立即被async
返回,然后异步的去调用块中的代码。返回的Deferred
对象有一个await
方法,调用它会产生阻塞,它等待对应的异步任务完成后才会返回异步任务的返回值。
长得很像JS中的async
和await
,但它们的行为可不一样嗷。
使用了async
后
The answer is 39
Total Time: 1018
惰性启动
可以让async
惰性启动,即只有调用Deferred
的await
或start
方法时才启动。launch
貌似也可以惰性启动。
fun main() = runBlocking {
print(measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
})
}
请注意,由于await
时,async
中的代码才开始异步执行,并且await
会阻塞直到这个代码执行完毕返回结果,所以这里相当于两个任务又被串行了。请仔细思考和之前代码的区别。之前由于async
中代码是自动启动,所以两个任务在await
前都被先行启动了,这次通过await
手动启动,导致第二个任务必须在第一个任务的await
返回后启动。
刚才也说了,惰性启动除了使用await
,还可以使用来自父类的方法start
。而start
不会阻塞,所以我们可以这样
fun main() = runBlocking {
print(measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
one.start()
two.start()
println("The answer is ${one.await() + two.await()}")
})
}
async风格函数
可以使用GlobalScope.async
将async
操作封装成一个普通函数,我目前也不知道这个GlobalScope
是干啥的。
函数名中的Async
后缀说明它是异步执行的,并且它返回一个Deferred
对象。
fun doSomethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne();
}
fun doSomethingUsefulTwoAsync() = GlobalScope.async {
doSomethingUsefulTwo()
}
然后便可以这样调用它
fun main() = runBlocking {
print(measureTimeMillis {
val one = doSomethingUsefulOneAsync()
val two = doSomethingUsefulTwoAsync()
println("The answer is ${one.await() + two.await()}")
})
}
结果
The answer is 39
1031
注意!!!!!!!这种异步风格在Kotlin中不推荐使用!!!!!!!!!!!!请避免使用
使用async的结构化并发
我们将计算两个任务返回结果之和的逻辑提取出来,由于async {}
是CoroutineScope
中的扩展,所以我们必须得把它写在coroutineScope
内。
suspend fun calcuSum() = coroutineScope {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
one.await() + two.await()
}
然后
fun main() = runBlocking {
println(measureTimeMillis {
println(calcuSum())
})
}
这样做的好处是,当calcuSum
中的两个任务有任意一个抛出异常,失败,那么calcuSum
中的所有等待中的协程都会被取消,异常会被传递到父协程中。
suspend fun calcuSum() = coroutineScope {
val one = async {
try {
doSomethingUsefulOne()
} finally {
println("First child was cancelled")
}
}
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
one.await() + two.await()
}
fun main() = runBlocking {
try {
println(measureTimeMillis {
println(calcuSum())
})
} catch (e: ArithmeticException) {
println("Parent got an exception")
}
}
结果
Second child throws an exception
First child was cancelled
Parent got an exception
未完...