Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念 (二)

Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念 (二)
【 好书分享:《Spring 响应式编程》-- 京东】

2.2 使用 RxJava 响应式框架的实践

RxJava 库 是 Reactive Extensions 的 Java虚拟机实现,近似于观察者模式,迭代器模式,函数式编程的组合。

2.2.1 响应式流 = 观察者 + 迭代器

通过事件分离生产者和消费者。

迭代器模式:不希望生产者在消费者出现之前生产数据的场景。

public interface Iterator<T> {
    T next();
    boolean hasNext();
}

// 结合观察者
public interface RxObserver<T> {
    void onNext(T next); // 通知新值
    void onComplete(); // 通知结束
    void one rror(Exception e); // 通知出错
}

RxObserver 类似于前面介绍的观察者模式中的 Observer。

对于订阅的内容和订阅者,我们可以定义 Observale 和 Subcriber

  • Observable : 类似于观察者模式中的Subject,可观察的事件源,他会发出元素,并且有流转化方法和流初始化工厂方法。
  • Subscriber :抽象类,用于实现 Observer 观察者的接口,并且消费元素,实现类的基础

同时,我们定义 Subcription 来控制 Observable 和 Subscriber 之间的运行时关系。Subscription 可以检查订阅状态,并且在必要的时候取消订阅。

生产者和消费者之间的Subcription契约如下:

---------| — onNext —> | -----------
| Observable | – onComplete -> | Observer |
--------- | — one rror —> |------------

根据RxJava中的规则,Observable 事件源可以发送0-N个元素,通过声明成功和引发错误来指示结束。

所以 Observable 事件源会对相关联的 Subscriber 多次调用 onNext,最后调用 onComplete 或 one rror。

2.2.2 生产和消费流数据

把 Observable 视为一个事件生成器,在订阅时会给订阅者传播事件。

Observable<String> observable = Obervable.create(
	new Observable.OnSubcribe<String>(){
    	@Override
        public void call(Subcriber<? super String> sub){
        	sub.onNext("Hello, reactive world!");
            sub.onCompleted();
        }
    }
);

// 当订阅者出现时立刻会触发 call()
// lambda写法
Observable<String> observable = Obervable.create(
    sub -> {
        sub.onNext("Hello, reactive world!");
        sub.onCompleted();
    }
);

Observable 是可重用的,每个订阅者在订阅之后就会立刻收到这个消息事件。RxJava 1.2.7开始,Observable的创建因为不安全被弃用,因为它可能生成太多元素导致订阅者负载过多。即这种方法不支持背压。

对应的订阅者代码如下:

Subcriber<String> subscriber = new Subcriber<String>(){
	@Override
    public void onNext(String s){
    	System.out.println("receive: " + s);
    }

    @Override
    public void onCompleted(){
        System.out.println("Done.");
    }

    @Override
    public void one rror(Throwable e){
        System.err.println(e);
    }
}

所以现在订阅者 Subcriber 和 消息源 Observable 可以一起工作了。Subcriber 必须要实现Observer观察者的方法,定义 onNext 来响应新事件,定义 onCompleted 来响应流完成,定义 one rror 来响应错误。

最后只要在 Observable 添加订阅关系即可完成这个响应式demo程序。

observable.subscribe(subcriber);
// output:
// receive: Hello, reactive world!
// Done.

// 更简化的lambda写法
Observable<String> observable = Obervable.create(
    sub -> {
        sub.onNext("Hello, reactive world!");
        sub.onCompleted();
    }
);

observable.subscribe(
    s -> System.out.println("receive: " + s),
    () -> System.out.println("Done"),
    System.err::println;

);
// output:
// receive: Hello, reactive world!
// Done.

Rxjava 库对于创建消息源 Observable 实例很灵活。

Observable.just("1","2","3","4");
Observable.from(new String[]{"A", "B", "C", "D"});
Observable.from(Collections.emptyList());

Observale<String> msg1 = Observable.fromCallable(()->"hello-");
Future<String> future = Executors.newCachedThreadPool().submit(() -> "world");
Observale<String> msg2 = Observable.from(future); 

//组合多个流 处理顺序按照参数顺序
Observale<String> msg = Observable.concat(msg1, msg2, Observable.just("."));
msg.forEach(System.out::print);
// output: hello-world.

虽然有onError信号来处理出错,看起来不用去为异常定义处理程序,但是在发生错误的时候,默认的 Subscriber 实现仍然会抛出一个 rx.exceptions.OnErrorNotImplementedException

上一篇:java中final关键字


下一篇:String为什么是被final修饰的?String的创建过程