传统runnable接口实现
在java中,很多耗时的行为通过实现runnable接口,并且通过线程运行下这些耗时的任务,例如:
public class Task1 implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("this is task1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
这里用线程沉睡两秒的方式模拟堵塞的过程。但是这样做就带来了一个问题,那就是如果有多个耗时的任务都实现的runnable接口,而且这几个任务之间的状态相互依赖。也就说下一个runnable要依赖上一runable的变量,这种情况也算是很常见。举个简单的例子来说:
public class Task1 implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000);
String ret = "ok";
System.out.println("this is task1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Task2 implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("this is task2 ret:");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在Task1里面runalbe里面有个ret变量,如果想在Task2里面引用这个这个变量,这个问题该怎么解决?
传统解决方案
对于上述问题有很多解决方案,比如多线程之间的同步,生产者消费者模式等等,但我个人认为比较好的方案是回调和Rxjava两种模式。因为在java端的多线程调度,难免会有性能上面的影响。特别是面对成百上千的任务的相互调度。
- 回调
传统回调的做法是将实现一个接口,将Runnable后加入一个回调方法,就像下面这样:
class Task1 implements Runnable{
@Override
public void run() {
try {
Thread.sleep(2000);
String ret = "ok";
System.out.println("this is task1");
new Task2().doTest(ret);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Task2 implements Runnable,Task{
String ret;
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("this is task2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void doTest(String ret) {
this.ret = ret;
run();
}
}
public interface Task {
void doTest(String ret);
}
通过这样的方案实现了变量的传递,但是问题也出现了,回调在面对多任务的时候容易出现回调地狱的情况。
- RxJava
这是一种响应式编程的典范,把runnable里面的任务看成是一个事件,一个事件完成后通知下个事件开始。
Single.fromObservable(observer -> {
new Task1().run();
}).map(mapper -> {
new Task2().doTest(mapper);
}).subscribe(onSuccess -> {
// do something
},error -> {
});
尽管RxJava已经充分的解耦,而且也相对比较简洁。但kotlin的协程可以说是解决这里这类问题的更优解
kotlin协程
通过前文的阐述kotlin的协程是解决任务上下相互关联的这一类问题,比如说上面的问题用协程来办,
GlobalScope.launch {
val ret = Task1().doTask()
Task2().doTask(ret)
}
这样的写法清晰,同时将异步转成同步的思想在里面。能够如此简洁明了的解决问题,对此我想一探其底层到底是如何实现的。在官方文档说,协程可以说是轻量级的线程,从某种角度上来说,确实是如此的,因为它可以将函数挂起,不堵塞线程,而且可以唤醒函数。在这些角度上来看确实如此。但实际上的情况与线程的实现完全不一样。为了探究协程到底是什么,不妨看看它的实现。
kotlin 协程实现
我们知道java里面的线程与操作系统里面线程是一对一的关系,但协程并非这样,分析协程协程底层源码,一般通过 GlobalScope.launch启动一个协程,其代码如下:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
这里的三个参数为.上下文,线程调度器,以及我们写的回调函数,在点击start函数,通过调试其调用其默认会走到
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
请注意这行代码
createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit))
这个方法表明,首先先创建一个协程拦截器,通过intercepted方法对其方法(协程)进行挂起,在通过resumeCancellableWith方法进行启动,发现resumeCancellableWith核心实现,其代码如下:
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
这个方法表明最终将执行dispatcher.dispatch(context, this),最后协程的调度将交给线程来完成。这就为什么协程在执行的时候不会堵塞当前线程的原因,因为其简单来说,就是在线程池里面取了一个线程里面执行堵塞函数。到这里我们应该知道协程到底是什么了, 用何老师的话来说,协程就是可以一起协作的例程,在kotlin中凡被suspend函数修饰的方法均可被称为协程。suspend 关键字到底做了些什么,kotlin源码
object Task {
suspend fun deal(){
Thread.sleep(500L)
println("sleep 500ms")
}
}
fun main() {
GlobalScope.launch {
Task.deal()
}
Thread.sleep(2000L)
}
通过jd-gui对TaskKt.class进行反编译成java,其suspend函数变成了
@Nullable
public final Object deal(@NotNull Continuation $completion) {
Thread.sleep(500L);
String str = "sleep 500ms";
boolean bool = false;
System.out.println(str);
return Unit.INSTANCE;
}
没有错,通过suspend修饰的函数,在编译过程会被注入一个Continuation类型的参数,打开Continuation的定义
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
这是个接口,context执行的上下文环境,resumeWith函数的行为则是继续执行当前这个函数,并将函数的结果返回。
而main函数在反编译的时候成了:
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object object = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
this.label = 1;
if (Task.INSTANCE.deal((Continuation<? super Unit>)this) == object)
return object;
Task.INSTANCE.deal((Continuation<? super Unit>)this);
return Unit.INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit.INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<? super MainKt$main$1> $completion) {
return (Continuation<Unit>)new MainKt$main$1($completion);
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation<?> p2) {
return ((MainKt$main$1)create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
这里比较有意思的是通过,lable的状态来标志协程有没有结束。改变lable状态,switch来进入被挂起的函数。
总结
通过以上的分析,协程的出现主要是用来解决耗时任务相互依赖的问题,如果多个耗时任务是平级的,而且没有关联,最好采用多线程的方式并发处理。同时协程尽管在一定程度上面和线程很相似,但在底层的调度上面还是采用的线程的方式进行调度。因此你可以开大量的GlobalScope.launch,因为这本质上就是线程的调度,而且肯定并且不会堵塞,同时在同一个coroutineContext里面的函数实际上是在同一个线程里面的,这就表明了协程为什么可以解决任务间相互的依赖