RxJava 复杂场景 Schedulers调度

参考: https://blog.piasy.com/2016/10/14/Complex-RxJava-2-scheduler/

以Zip为例,学习Schedulers的线程调度

要求:

        * create 里的代码在 io 线程执行,
* zip 合并的代码在主线程执行,
* map 的操作在 io 线程执行,
* subscriber 的代码在主线程执行。
        Observable<Integer> odd = Observable.create(new Observable.OnSubscribe<Integer>() {

            @Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("1. create odd @ " + Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onCompleted();
}
}); Observable<Integer> even = Observable.create(new Observable.OnSubscribe<Integer>() { @Override
public void call(Subscriber<? super Integer> subscriber) {
System.out.println("2. create even @ " + Thread.currentThread().getName());
subscriber.onNext(2);
subscriber.onCompleted();
}
}); Observable.zip(
// 決定--> zip 合并的代码在主线程执行
odd.observeOn(AndroidSchedulers.mainThread()),
even,
new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
System.out.println("3. zip @ " + Thread.currentThread().getName());
return integer + integer2;
}
})
// 決定--> create 里的代码在 io 线程执行
.subscribeOn(Schedulers.io())
// 決定--> map 的操作在 io 线程执行
.observeOn(Schedulers.io())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
System.out.println("4. map @ " + Thread.currentThread().getName());
return "" + integer;
}
})
// 決定--> subscriber 的代码在主线程执行
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String str) {
System.out.println("5. subscribe @ " + Thread.currentThread().getName());
}
});

EDU-CAP4-1X2A-6WE5-YXMB-3G62-L85F-FCA3-2PW
SYN-CAP4-1K7Q-YNYW-LFK5-88DB-X8FN-JCPR-47E
BOX-CAP4-1S4D-BLTF-JLHE-B3ZJ-K3HM-MFKF-S8A
NFR-CAP4-1L7Y-FA7F-QRCY-DSP5-4LPY-DYK5-9P6
EDU-CAP4-123M-TPBK-C93K-DXHE-UPPQ-BS3P-JRN

上一篇:【原创】Java并发编程系列1:大纲


下一篇:.net平台下C#socket通信(中)