2.2 使用 RxJava 响应式框架的实践
RxJava 库 是 Reactive Extensions 的 Java虚拟机实现,近似于观察者模式,迭代器模式,函数式编程的组合。
2.2.1 响应式流 = 观察者 + 迭代器
通过事件分离生产者和消费者。
迭代器模式:不希望生产者在消费者出现之前生产数据的场景。
public interface Iterator<T> {
T next();
boolean hasNext();
}
// 结合观察者
public interface RxObserver<T> {
void onNext(T next); // 通知新值
void onComplete(); // 通知结束
void one rror(Exception e); // 通知出错
}
RxObserver 类似于前面介绍的观察者模式中的 Observer。
对于订阅的内容和订阅者,我们可以定义 Observale 和 Subcriber
- Observable : 类似于观察者模式中的Subject,可观察的事件源,他会发出元素,并且有流转化方法和流初始化工厂方法。
- Subscriber :抽象类,用于实现 Observer 观察者的接口,并且消费元素,实现类的基础
同时,我们定义 Subcription 来控制 Observable 和 Subscriber 之间的运行时关系。Subscription 可以检查订阅状态,并且在必要的时候取消订阅。
生产者和消费者之间的Subcription契约如下:
---------| — onNext —> | -----------
| Observable | – onComplete -> | Observer |
--------- | — one rror —> |------------
根据RxJava中的规则,Observable 事件源可以发送0-N个元素,通过声明成功和引发错误来指示结束。
所以 Observable 事件源会对相关联的 Subscriber 多次调用 onNext,最后调用 onComplete 或 one rror。
2.2.2 生产和消费流数据
把 Observable 视为一个事件生成器,在订阅时会给订阅者传播事件。
Observable<String> observable = Obervable.create(
new Observable.OnSubcribe<String>(){
@Override
public void call(Subcriber<? super String> sub){
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
}
);
// 当订阅者出现时立刻会触发 call()
// lambda写法
Observable<String> observable = Obervable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);
Observable 是可重用的,每个订阅者在订阅之后就会立刻收到这个消息事件。RxJava 1.2.7开始,Observable的创建因为不安全被弃用,因为它可能生成太多元素导致订阅者负载过多。即这种方法不支持背压。
对应的订阅者代码如下:
Subcriber<String> subscriber = new Subcriber<String>(){
@Override
public void onNext(String s){
System.out.println("receive: " + s);
}
@Override
public void onCompleted(){
System.out.println("Done.");
}
@Override
public void one rror(Throwable e){
System.err.println(e);
}
}
所以现在订阅者 Subcriber 和 消息源 Observable 可以一起工作了。Subcriber 必须要实现Observer观察者的方法,定义 onNext 来响应新事件,定义 onCompleted 来响应流完成,定义 one rror 来响应错误。
最后只要在 Observable 添加订阅关系即可完成这个响应式demo程序。
observable.subscribe(subcriber);
// output:
// receive: Hello, reactive world!
// Done.
// 更简化的lambda写法
Observable<String> observable = Obervable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);
observable.subscribe(
s -> System.out.println("receive: " + s),
() -> System.out.println("Done"),
System.err::println;
);
// output:
// receive: Hello, reactive world!
// Done.
Rxjava 库对于创建消息源 Observable 实例很灵活。
Observable.just("1","2","3","4");
Observable.from(new String[]{"A", "B", "C", "D"});
Observable.from(Collections.emptyList());
Observale<String> msg1 = Observable.fromCallable(()->"hello-");
Future<String> future = Executors.newCachedThreadPool().submit(() -> "world");
Observale<String> msg2 = Observable.from(future);
//组合多个流 处理顺序按照参数顺序
Observale<String> msg = Observable.concat(msg1, msg2, Observable.just("."));
msg.forEach(System.out::print);
// output: hello-world.
虽然有onError信号来处理出错,看起来不用去为异常定义处理程序,但是在发生错误的时候,默认的 Subscriber 实现仍然会抛出一个 rx.exceptions.OnErrorNotImplementedException