RxJava源码初探

一、前言

RxJava是用java实现的ReactiveX(Reactive Extensions)框架开源库。ReactiveX则是大名鼎鼎的响应式编程。而响应式编程和观察者模式紧紧的相关联。在看RxJava的源码中,分析起来会有点麻烦,所以才有了这篇文章,和对这个有兴趣的同学一起窥探一二。

二、观察者模式

2.1 基本原理

观察者模式是对象的行为模式,又叫发布-订阅(Publish/Subscribe)模式即让多个观察者对象同时监听某一个主题对象。这个主题对象在状态上发生变化时,会通知所有观察者对象,使它们能够自动更新自己。在JDK中已有对观察者模式的封装:Observable.java/Observer.java(请自行对照JDK源码看看,下面会提出2个问题):
观察者模式图。

2.2 jdk实现版本

在JDK的实现版本中,Observable是被观察者,有一个Vector数组来存放观察者,当被观察者事件改变时,调用notifyObservers来通知所有的观察者,每个观察者调用自己的update来做相应的更新操作。由于这个模式在JDK中实现的比较早(JDK1.0),有2个地方值得思考:
1、观察者数组使用Vector,为什么不使用List?是否有替代品?
由于观察者数组必须考虑到线程安全(比如在被观察者发出通知的那一刻,同时添加新的观察者,或者删除某个观察者等操作会让被观察者不知道到底通知哪些观察者,即会引发线程安全),所以JDK版本中在对vector进行操作的时候,都会加上synchronized且在通知观察者的时候反向遍历数组,以此来保证线程安全。
(1)为什么不用List?
这里可以看到通知观察者的时候需要反向遍历数组,来保证如果是发出通知时有新的观察者进来,新的观察者不会收到当前通知。如果使用List反向遍历会比使用数组更加复杂。
(2)是否有替代品?
这里的Vector操作满足(a)读大于写,(b)线程安全,(c)写时复制,所以可以用CopyOnWriteArrayList来代替,代替后实现代码会更加的简洁明了。

2、通知方法,通知每个观察者的时候没有使用try—catch,是否可以加上?
JDK observable通知方法notifyObservers代码实现:

for (int i = arrLocal.length-1; i>=0; i--)
    ((Observer)arrLocal[i]).update(this, arg);

初看到这个段代码时,会有疑惑这个循环中没有对update加try-catch,当某个观察者的更新操作抛异常会导致其他的观察者更新失败。
那么如果我们加上try-catch会发生什么呢,虽然能够保证每个观察者都能做更新操作,但是一旦某个观察者有异常,被被观察者给捕获了,而被观察者捕获后又不知道交给谁,怎么处理,会导致代码编写者以为所有的观察者都正常执行了,所以在实现Observer的update操作时需要观察者自己加上try-catch。

三、响应式编程

3.1 响应式编程的特点

上面有点跑了题。回归正传,有了观察者模式的实现做铺垫,对我们理解响应式编程原理会有很大的帮助。相同点,都是多个观察者观察一个被观察者的状态,观察者对状态变化做出自己的处理。不同点,从我目前看到源码的程度上来看,我觉得观察者模式和响应式编程的区别主要有以下几个:

  • 1 观察者模式是Observable-Observer,如果我们要在观察者做操作前,对数据做一些其他的处理怎么办?观察者模式无法解耦,只能在observer的update操作中来做。而响应式编程是Obsevable-Operator1-OperatorN-Observer,可以很好的解耦控制数据流操作。
  • 2 响应式编程通过链式调用,让用户在代码流程上能够更加清晰的掌控(即使是异步操作)。
  • 3 观察者模式无法处理观察者的异常,需要用户自己加try-catch结构。而响应式编程提供了另一种方案,用户不需要使用try-catch只需实现错误处理方法就可以做到。
  • 4 观察者模式若需要使用多线程,需要用户在observer中实现多线程操作,也就是将观察者和任务调度糅杂在一起。而响应式编程对观察者和任务调度解耦,可以通过Schedulers,来处理线程调度问题。
  • 5 观察者模式,观察者无法处理自己的调用超时问题,响应式编程则可以设定观察者的调度超时机制。
  • 6 响应式编程提供阻塞式(BlockingObservable)和非阻塞式Observable的调用

3.2 RxJava源码分析

上文讲了RxJava框架相比观察者模式的各种优点,现在我们来分析源码,由于RxJava的源码比较庞大,本篇博客只分析RxJava的基本原理,即区别中的第一点和第二点。对于其他的点会在另起blog来分析。

3.2.1 调用展现

好了,分析RxJava的基本原理前,我们先上一段测试代码(此处的Observable是RxJava中的,不是JDK中的):

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
}).map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + " word";
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println("error");
    }
    @Override
    public void onNext(String s) {
        System.out.println("rx " + s);
    }
});

输出:rx hello world
这种连.的写法对看代码来说有点累,简化下:

(1)Observable<String> observable1 = Observable.create(new OnSubscribe());
(2)Observable<String> observable2 = observable.map(new Func1());
(3)Subscription sub = observable2.subscribe(new Subscriber());

代码一简化,我们就可以对上文说的3.1节中响应式编程的特点1:数据流操作和观察者处理解耦的原理有所了解了。
这里的解耦方式,其实就是在原被观察者进行数据流操作(map)后生成一个新的被观察者,观察者其实订阅的是数据流操作生成的被观察者。
在整个过程中,可以看到三个重要的类:

  • Observable:被观察者
  • OnSubscriber:Observable的成员变量,用来调用观察者处理方法
  • Subscriber:观察者

3.2.2 代码分析

RxJava具体怎么处理,分3步看:

3.2.2.1 生成被观察者Observable对象

第一行代码,(1)Observable observable1 = Observable.create(new OnSubscribe());
创建了一个被观察者,进入源码

public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

/**
 * Invoked when Observable.subscribe is called.
 */
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
    // cover for generics insanity
}

即create为Observable的对象observable1创建了一个默认的钩子hook和一个Onsubscribe对象。

3.2.2.2 数据流处理

第二行代码,(2)Observable observable2 = observable.map(new Func1());
创建了一个新的被观察者2,进入源码:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

这一行代码很关键,也很绕,2个动作
(1)生成了一个OperatorMap对象。这个对象call方法,参数为观察者Subscriber对象;返回值是一个新的观察者Subscriber对象。新的Subscriber对象的onNext方法先调用operator操作(map的call方法)的call方法,拿Operator操作返回的数据,再调用外部观察者的onNext方法。很熟悉是吗?上文说到的ReactiveX编程可以实现Observable-Operator1-----OperatorN-Observer,在这段代码中得到了体现:在调用Observer的OnNext方法前,会先调用OperatorN的call方法对数据处理,处理完成后的数据值,作为参数传入Observer的OnNext方法。
我们需要注意的是,OperatorMap的call方法返回的是一个新的观察者,新的观察者通过OnNext方法将老的观察者给链接起来的。那么新的观察者什么时候订阅被观察者的呢?这个就是接下来的第(2)个动作:lift

public final class OperatorMap<T, R> implements Operator<R, T> {
    private final Func1<? super T, ? extends R> transformer;
    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {
            @Override
            public void onCompleted() {
                o.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }
            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }
        };
    }

}

(2)调用了observable1的lift方法,这个方法用来干什么呢?

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            st.onStart();
            onSubscribe.call(st); 
        });
}

源码略大我简化了一下,其实就是原Observable(observable1)的lift方法创建了一个新的Observable对象(observable2),这个新的对象的OnSubscribe成员变量的call方法将map-OpreatorMap生成的观察者传入到了最开始的Observable的OnSubscirbe中进行处理。

过程再整理一下:

  • 1、 Create操作:创建一个Observable对象observable1,对象中的OnSubscribe用来响应Subscriber
  • 2、 OperatorMap操作生成新的观察者,新的观察者通过OnNext方法,先调用map的call方法处理数据,再调用1中的观察者Subscriber的OnNext方法。
  • 3、lift 操作生成新的被观察者observable2,observable2中的call方法通过observable1的成员变量OnSubscribe来调用2中生成的新的观察者的call方法。

接下来就是最后一步了,什么时候调用observable2的call方法呢?这个就是下面的介绍,被观察者订阅观察者,在订阅的时候进行调用。

3.2.2.3 观察者订阅

第三行代码,Subscription sub = observable2.subscribe(new Subscriber());
订阅subscribe源码核心代码:

hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);

其实这个就是调用observable2的OnSubscribe的call方法,参数是传入的订阅者。
说白了观察者订阅其实就是用被观察者的subscribe方法订阅观察者,且调用被观察者的OnSubscribe方法来调用观察者的处理方法onNext。
这里可能就会有疑问:为什么是被观察者来订阅观察者,而不是观察者来订阅被观察者?
这种做法可以使整个过程使用连点(.)来完成从数据处理到订阅者响应的所有流程,代码上会更加的清晰。

好了,是时候祭出大招了,映照下面这个图,你会更加的清晰。
RxJava源码初探

如果,你觉得还不过瘾,还有的没有了解清楚。建议看代码。。。,下一篇博客会介绍RxJava使用Scheduler多线程源码分析

相关参考文档:

上一篇:异常分析初探


下一篇:【Java学习笔记之十九】super在Java继承中的用法小结