作为github上star数极高的响应式编程java扩展类库,rxjava是啥就不多说了,网上能查到一堆介绍,下面是一些学习记录:
前提依赖:
compile 'io.reactivex.rxjava2:rxjava:2.1.9'
一、Observable
1.1 hello world
rxjava中的核心思路是“生产者-消费者”模型,生产者的java类通常用xxxEmitter命名,字面意思:发射器,可以想象为一个机关枪,一直biu biu biu的向外发射信息,另一端则是靶子(也就是消费者),在不停的接收。不过要注意的是:rxjava中,能接收子弹的靶子,可以同时有多个。
Observable<String> observable = Observable.create(emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); }); Observer observer1 = new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } }; observable.subscribe(observer1);
输出:
subscribe=> a b c complete
注:最后一行,也可以改成
observable.subscribe(observer1); observable.subscribe(observer1);
这样就相当于2个靶子在接子弹了。 上面这是最正统的写法,官方推荐使用链式编程写法:
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
1.2 onComplete事件
emitter发送onComplete消息后,挨打的靶子(消费者),就不再继续处理了,不管后面emitter是否还继续发送。
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); //这里主动通知消费者complete System.out.println("complete后,emitter还继续发射..."); emitter.onNext("d"); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
输出:
subscribe=> a b c complete complete后,emitter还继续发射...
注:onComplete之后,emitter再次发送的"d",消费者已经不再处理了。
1.3 onError事件
onError即可以在emitter(生产者)端报错,也可以在靶子(消费者)上报错,不管哪一端发生error,消费者就停止处理了。
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onError(new Throwable("emitter报了个错!")); System.out.println("准备发送c"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println(e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
输出:
subscribe=> a emitter报了个错! 准备发送c
下面模拟下消费者处理时,发生异常:
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { if (s.equals("b")) { int a = 0; int b = 1; System.out.println((b / a)); } System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println("error:" + e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
输出:
subscribe=> a error:/ by zero
1.4 disposable
如果消费者主动dispose()后,相当于就解除了生产者-消费者的关系。
Observable.create((ObservableOnSubscribe<String>) emitter -> { emitter.onNext("a"); emitter.onNext("b"); System.out.println("准备发送c"); emitter.onNext("c"); emitter.onComplete(); }).subscribe(new Observer<String>() { Disposable disposable; @Override public void onSubscribe(@NonNull Disposable d) { disposable = d; System.out.println("subscribe=>"); } @Override public void onNext(@NonNull String s) { if (s.equals("b")) { disposable.dispose(); } System.out.println(s + " "); } @Override public void onError(@NonNull Throwable e) { System.out.println("error:" + e.getMessage()); } @Override public void onComplete() { System.out.println("complete"); } });
上面的代码,消费者在遇到b时,主动切断了与生产者的关联,emitter再发送的d,消费者就不处理了,输出:
subscribe=> a b 准备发送c
1.5 大道至简
如果消费者只关心onNext的处理部分,其它无所谓,上面这一堆代码,可以简化为一行:
Observable.fromArray("a", "b", "c").subscribe(c -> System.out.println(c + " "));
输出:
a b c
最后再来一个示例:把3个单词拼成一句话,而且每个单词处理成“首字母大写”的风格。
Observable.fromArray("I", "AM", "CHINESE") .map(c -> c.substring(0, 1).toUpperCase() + c.substring(1).toLowerCase()) .subscribe(c -> System.out.print(c + " "));
输出:
I Am Chinese
参考:
http://www.vogella.com/tutorials/RxJava/article.html
http://www.cnblogs.com/aademeng/articles/7462540.html
https://www.jianshu.com/c/299d0a51fdd4
出处:http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。