Kotlin协程概览 (一)

协程基础

协程是一个可挂起的计算实例。和线程的概念相似,协程可以让一段代码块与另一端代码块并发执行。协程不绑定到一个特定线程上,有可能一个协程在一个线程中挂起,然后在另一个线程中恢复

协程可以被想象成轻量级的线程,但是一些重要的区别决定了协程在日常使用中与线程的不同。

以下是你的第一个协程代码:

import kotlinx.coroutines.*;

fun main() = runBlocking {
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
}

launch是一个协程构建器,它启动一个新的协程与剩余代码独立的并发执行。

delay是一个特殊的挂起函数(suspending function),它使协程挂起一段特定的时间。挂起一个协程并不阻塞其底层线程,同时允许其他协程使用该底层线程执行它们的代码

runBlocking连接非协程世界里的main方法和runBlocking{ ... }大括号中的代码。

runBlocking中写的代码都在CoroutineScope中,可以使用其中定义的所有方法,比如launch。这个设计模式是结合了Kotlin的lambda表达式整出的花活,最近在Jetpack Compose中经常看到,emmm,我还不太懂。

Kotlin协程概览 (一)

注意,launch并不是一个全局方法,只有在CoroutineScope中才能使用,这也是为什么使用了runBlocking包裹。

runBlocking方法还意味着,运行它的线程将暂时阻塞,直到runBlocking花括号中的代码全部执行完毕。

提取方法

可以将launch中的逻辑提取到一个方法中。

fun main() {
    runBlocking {
        launch { doWork() }
        println("Hello")
    }
}

suspend fun doWork() {
    delay(1000L)
    println("World!")
}

这里使用了suspend关键字,也可以不使用,suspend关键字的含义是该方法可以被挂起。一般只有在方法中有耗时操作时才加suspend关键字。suspend函数只能在协程中或另一个suspend函数中被调用,也就是说普通函数无法调用suspend函数,delay这种函数我们都没法在普通函数中调用。

coroutineScoperunBlocking

除了使用runBlocking创建一个协程作用域之外,还可以使用coroutineScope构建自己的协程作用域。

suspend fun doWork() = coroutineScope { 
    launch { println("Hello!") }
}

它俩看起来没区别,实际上还是有区别的。

它们都创建一个协程作用域,且等待内部协程和所有子协程执行完成,但是runBlocking是阻塞它正在运行的线程的,而coroutineScope没这回事。

再次强调,runBlocking只是连接非协程世界和协程世界的一座桥,应该只在main或测试中使用它,也正是因为这个原因,所以它是一个非suspend方法,以确保它能从非协程世界过渡到协程世界,而coroutineScope则是一个suspend方法,它只能在你当前已经在协程作用域时构建。

Scope构建器和并发

你可以在一个CoroutineScope中通过多个launch操作并发的启动多个协程

fun main() = runBlocking {
    doWork()
    println("Done")
}

suspend fun doWork() = coroutineScope {
    launch {
        delay(2000L)
        println("World 2")
    }

    launch {
        delay(1000L)
        println("World 1")
    }

    println("Hello")
}

结果

Hello
World 1
World 2
Done

Job

launch返回一个代表该协程的job,可以使用该job对象进行一些操作,比如调用job.join()等待协程完成。

fun main() = runBlocking {
    doWork()
}

suspend fun doWork() = coroutineScope {
    val job1 = launch {
        delay(2000L)
        println("World 2")
    }

    val job2 = launch {
        delay(1000L)
        println("World 1")
    }

    println("Hello")

    job1.join()
    job2.join()

    println("Done")
}

结果

Hello
World 1
World 2
Done

协程很轻量

你可以轻松的启动10万个协程,而使用线程的话,这将是个很难以做到的事。

fun main() = runBlocking {
    repeat(100000) { num -> 
        launch {
            delay(5000L)
            print(num)
        }
    }
}

取消与超时

可以通过job对象取消运行中协程

fun main() = runBlocking {
    val job = launch {
        repeat(5000) { num ->
            delay(1000L)
            print(num)
        }
    }

    delay(5000L)
    job.cancel()
    job.join()
    println("Done")
}

结果

0123Done

job.cancelAndJoin()可以进一步简化上面的代码,将canceljoin操作合并为一步。

取消是协作的

当使用线程时,线程的取消是协作的,但刚刚仿佛我们并没做什么协作,协程就直接取消了。其实协程的取消也是协作的,所有kotlinx.coroutines中的suspend方法(像delay这种方法),它会检测协程是否已经被取消,如果取消了就抛出CancellationException停止协程的执行。

fun main() = runBlocking {
    val job = launch(Dispatchers.Default) {
        var i = 0;
        try {
            while ( i < 10 ) {
                delay(1000L)
                print(i++)
            }
        } catch (e: CancellationException) {
            println("Job is cancelled")
        }
    }

    delay(5000L)
    job.cancelAndJoin()
    println("Done")
}

结果

01234Job is cancelled
Done

所以说当我们没写try-catch时,是delay()抛出的异常帮我们隐式结束了协程。

特别注意:这个try-catch最好不要写在循环之中,很多情况下它会造成问题。

思考下,这个job在被取消时会发生什么

val job = launch(Dispatchers.Default) {
    var i = 0;
    while ( i < 10 ) {
        try {
            delay(1000L)
            print(i++)
        } catch (e: CancellationException) {
            println("Job is cancelled")
        }
    }
}

会发生死循环,因为delay检测到协程已经被取消,抛出异常,异常被catch正常捕获后,就不会退出循环了,继续执行循环,然后下一次循环时delay依然检测到协程已被取消,继续抛出异常,异常又被正常捕获,继续恢复循环......

如果没有使用能够自动检测协程取消的suspend方法,可以使用isActive来检测协程是否已经被取消

fun main() = runBlocking {
    val job = launch(Dispatchers.Default) {
        var i = 0;
        while ( isActive && i < 10000000) {
            println(i++)
        }
    }

    delay(1000L)
    job.cancelAndJoin()
    println("Done")
}

协程明显是执行过程中就退出了

Kotlin协程概览 (一)

运行不能取消的代码块

由于suspend方法都会检测协程是否已经取消了,取消后就不能再调用,这时如果业务逻辑还限定我们还必须调用,我们就很难受了。

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(5000) { num ->
                delay(1000L)
                print(num)
            }
        } finally {
            delay(1000)
            // 在协程被取消的情况下不会成功输出
            println("OKAY. RELEASE THEM!!")
        }
    }

    delay(5000L)
    job.cancelAndJoin()
    println("Done")
}

这里,finally中希望挂起一秒再进行资源释放操作,但很不巧,因为协程已被挂起delay会继续抛出异常,下面的println永远不会被输出。

可以使用withContext(NonCancellable)包裹,它创建一个不可被取消的CoroutineScope

finally {
    withContext(NonCancellable) {
        delay(1000)
        println("OKAY. RELEASE THEM!!")
    }
}

超时

虽然使用上面我们一直使用的joinAndCancel也能做到超时放弃任务的效果,但是写起来还是很不舒服。

withTimeout等待一个超时时间,时间到了之后抛出TimeoutCancellationException异常

fun main() = runBlocking {
    withTimeout(5000){
        repeat(5000) { num ->
            delay(1000L)
            print(num)
        }
    }
    println("Done")
}

结果

0123Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 5000 ms

TimeoutCancellationExceptionCancellationException的子类,但(我猜)只有CancellationException被认为是协程正常结束的原因,所以这个异常没有被隐式捕获,而是抛出了。

另一个就是withTimeoutOrNull,它不会抛出异常,它在超时时只是返回null。

如下的代码,当没有超时正常完成时,result接收到Done,而当超时时,withTimeoutOrNull会返回null给它。

fun main() = runBlocking {
    val result = withTimeoutOrNull(5000){
        repeat(5000) { num ->
            delay(1000L)
            print(num)
        }
        "Done"
    }
    println(result)
}

结果

0123null

异步超时和资源

withTimeout中的超时事件相对于其大括号中的代码是异步执行的,也就是说超时可能在任何时机被触发,甚至是withTimeout返回前。

如下的代码使用一个计数器计算有多少资源被拿走了,预期情况下,该代码应该返回0,但是由于存在部分Resource()创建完毕还未实际返回给resource时60ms的超时时间已经到了,所以并没有被正常关闭。

var acquired = 0

class Resource {
    init { acquired++ } // 拿走一个资源对象
    fun close() { acquired-- } // 释放该资源对象
}

fun main() {
    runBlocking {
        repeat(100_000) { // Launch 100K coroutines
            launch {
                val resource = withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    Resource() // 拿走一个资源对象并返回给resource
                }
                resource.close() // 释放该资源对象
            }
        }
    }
    // 当`runBlocking`中所有协程执行完执行
    println(acquired) // 打印出有多少资源被拿走
}

注意,这里的递增和递减操作是完全安全的,官方文档说具体会在“协程上下文”一节中说明。

修改上面launch代码块中的代码,加入try-catch

var resource: Resource? = null
try {
    withTimeout(60) {
        delay(50)
        resource = Resource()
    }
} finally {
    resource?.close()
}

也就是说,对于那些必须安全关闭的资源,不适合去依赖withTimeoutwithTimeoutOrNull的返回

组合挂起函数

默认顺序调用

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L)
    return 13;
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L)
    return 26;
}

我们有两个挂起函数,如果我们想要它俩的返回值,并计算它们返回值相加的结果,按照默认顺序就是这样:

fun main() = runBlocking {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()

        println("The answer is ${one + two}")
    }

    println("Total Time: ${time}")
}

想想就知道doSomethingUsefulOnedoSomethingUsefulTwo会被顺序调用,第二个会等待第一个完成再开始执行。

结果

The answer is 39
Total Time: 2009

但是很多时候我们需要两个函数异步执行,等两个都执行完毕,去计算结果,其实我们用返回Joblaunch和两个外部变量,或者用withTimeout这些都能完成,只不过哪个好像都不是用在这的东西。async可以解决这个问题。

val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }

println("The answer is ${one.await() + two.await()}")

async并不直接返回值,它返回一个Deferred对象(Job对象的子类)。像线程中的Future,这个对象可以代表一个未来值,它立即被async返回,然后异步的去调用块中的代码。返回的Deferred对象有一个await方法,调用它会产生阻塞,它等待对应的异步任务完成后才会返回异步任务的返回值。

长得很像JS中的asyncawait,但它们的行为可不一样嗷。

使用了async

The answer is 39
Total Time: 1018

惰性启动

可以让async惰性启动,即只有调用Deferredawaitstart方法时才启动。launch貌似也可以惰性启动。

fun main() = runBlocking {
    print(measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }

        println("The answer is ${one.await() + two.await()}")
    })
}

请注意,由于await时,async中的代码才开始异步执行,并且await会阻塞直到这个代码执行完毕返回结果,所以这里相当于两个任务又被串行了。请仔细思考和之前代码的区别。之前由于async中代码是自动启动,所以两个任务在await前都被先行启动了,这次通过await手动启动,导致第二个任务必须在第一个任务的await返回后启动。

刚才也说了,惰性启动除了使用await,还可以使用来自父类的方法start。而start不会阻塞,所以我们可以这样

fun main() = runBlocking {
    print(measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }

        one.start()
        two.start()

        println("The answer is ${one.await() + two.await()}")
    })
}

async风格函数

可以使用GlobalScope.asyncasync操作封装成一个普通函数,我目前也不知道这个GlobalScope是干啥的。

函数名中的Async后缀说明它是异步执行的,并且它返回一个Deferred对象。

fun doSomethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne();
}

fun doSomethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

然后便可以这样调用它

fun main() = runBlocking {
    print(measureTimeMillis {
        val one = doSomethingUsefulOneAsync()
        val two = doSomethingUsefulTwoAsync()

        println("The answer is ${one.await() + two.await()}")
    })
}

结果

The answer is 39
1031

注意!!!!!!!这种异步风格在Kotlin中不推荐使用!!!!!!!!!!!!请避免使用

使用async的结构化并发

我们将计算两个任务返回结果之和的逻辑提取出来,由于async {}CoroutineScope中的扩展,所以我们必须得把它写在coroutineScope内。

suspend fun calcuSum() = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }

    one.await() + two.await()
}

然后

fun main() = runBlocking {
    println(measureTimeMillis {
        println(calcuSum())
    })
}

这样做的好处是,当calcuSum中的两个任务有任意一个抛出异常,失败,那么calcuSum中的所有等待中的协程都会被取消,异常会被传递到父协程中。

suspend fun calcuSum() = coroutineScope {
    val one = async {
        try {
            doSomethingUsefulOne()
        } finally {
            println("First child was cancelled")
        }
    }
    val two = async<Int> {
        println("Second child throws an exception")
        throw ArithmeticException()
    }

    one.await() + two.await()
}

fun main() = runBlocking {
    try {
        println(measureTimeMillis {
            println(calcuSum())
        })
    } catch (e: ArithmeticException) {
        println("Parent got an exception")
    }
}

结果

Second child throws an exception
First child was cancelled
Parent got an exception

未完...

上一篇:jsp的四大域对象


下一篇:蓝桥杯--递归母牛的故事