RxJava源码解析

原文链接:https://blog.csdn.net/sted_zxz/article/details/82317400

转自:https://blog.csdn.net/sted_zxz/article/details/82317400

本文基于RxJava2.2.1版本分析。

简介

官方介绍:

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. 
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

RxJava是一个处理异步操作的库、基于事件流的库。

其核心的东西不外乎两个, Observable(被观察者)和 Observer(观察者)。当观察者订阅了被观察者之后,被观察者可以发出一系列的事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后将结果交给观察者处理。

create

create()是创建Observable对象的方法之一,还有其他诸多的创建方法,这里主要了解下Observable的创建流程:

        // 创建被观察者
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onComplete();
            }
        });

进入create():

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        // requireNonNull()很多地方会用到,顾名思义就是用来判空的
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    ```


进入onAssembly():
```java
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            // 这里hook
            return apply(f, source);
        }
        return source;
    }

onAssembly()主要是用来hook的,默认使用中都没有hook,返回的就是入参ObservableCreate。我们来看一下ObservableCreate:

    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

这里我们看到ObservableCreate将最初我们创建的ObservableOnSubscribe包装了一层,用成员变量source记住。

至此,创建流程结束,我们得到了Observable< T >对象,其实就是ObservableCreate< T >.

subscribe

承接上文的示例代码:

        // 创建观察者
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "" + value);
            }

            @Override
            public void one rror(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "complete");
            }
        };
        // 观察者和被观察者建立连接
        observable.subscribe(observer);  

进入subscribe():

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call one rror because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

这里的关键代码只有一句subscribeActual(observer);,被观察者被订阅时真正被执行的方法:

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
         // 这里首先调用了observer的onSubscribe()
        observer.onSubscribe(parent);

        try {
            // 然后调用了source的subscribe()
            // 这里的sourec就是之前保存的ObservableCreate
            // subscribe()就是我们自己一开始实现的方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

CreateEmitter是啥,它是一个事件的发射器。当我们在ObservableCreate的subscribe()中调用CreateEmitter的onNext()、onError()或者onComplete()方法时,它会去调用observer的onNext()、onError()或者onComplete()方法。

从CreateEmitter parent = new CreateEmitter(observer);一句中,大概可以猜到,CreateEmitter持有了observer,确实如此:

 @Override
        public void onNext(T t) {
            if (t == null) {
                one rror(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void one rror(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

我们可以看到CreateEmitter的onNext()等方法被调用时,其中会调用对应的observer的onNext()等方法。

以上是最简单的流程,那在其中加入一个操作符会怎么样呢?我们来看一下map是做什么的。

map

map起到类型转换的作用,例子如下,源头observable发送的是String类型的数字,利用map转换成int型,最终在终点observer接受到的也是int类型数据:

       Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete();
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return Integer.parseInt(s);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext() called with: value = [" + value + "]");
            }

            @Override
            public void one rror(Throwable e) {
                Log.d(TAG, "onError() called with: e = [" + e + "]");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete() called");
            }
        });

来看它的源码:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

抛开onAssembly(),好像只是创建了一个ObservableMap并且返回:

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        // 将传入的observable记住
        super(source);
        // 将变换的方法记住
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

这里和之前看到的ObservableCreate如出一辙,将上一个observable包装了起来。

然后当subscribeActual()执行的时候,去调用source的subscribe(),这里的source指的是上游的observable。并且创建了一个新的observer,MapObserver将下游的observer包装了一层。

如果多次map()的话,我们可以将函数调用的流程分为三步:

首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
当observable.subscribe(observer);之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。
当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。
假设当前的observer是MapObserver,在它的onNext()被调用时会调用它的变换方法,然后继续调用下游observer的onNext(),如下:

       public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                // 调用变换方法mapper.apply(t)
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            // 调用下游observer的onNext()
            downstream.onNext(v);
        }

最终调用最后一个observer的onNext()。

subscribeOn

用来指定subscribe()发生的线程的:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

这里返回了一个ObservableSubscribeOn:

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

这里做法其实和以上的提到过的ObservableCreate和ObservableMap中的是一样的,包装了上游的observable,并且额外记录了subscribe()的线程调度器。

当subscribeActual()被执行的时候,会切换到线程调度器所指定的线程。

那么如果subscribeOn(Schedulers.xxx())切换线程N次,总是以第一次为准,或者说离源observable最近的那次为准,并且对其上面的代码生效。

observeOn

指定onNext()等方法执行的线程:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

还是似曾相识的场景,创建了一个ObservableObserveOn:

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

自然少不了对原来observable的包装,同时记录了线程调度器。

当从上游push消息过来的时候,我们会看到ObservableObserveOn会切换到所记录的线程调度器指定的线程:

        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

以上是RxJava2最基本方法的一些源码分析。

链式调用原理

我们可以将函数调用的流程分为三步:

首先自上而下将最初的observable包装了一层又一层,不断创建新的observable并且返回,最下游得到一个层层包装的observable。
当observable.subscribe(observer);之后,就反过来从下游不断回溯,调用上游observable的subscribe(),而subscribe()的入参是一个observer,这个observer是将下游传递过来的observer包装过后的新的observer,最终最上游得到一个层层包装之后的observer。
当回溯到最上游的时候source是一个ObservableCreate,调用它的subscribe()时,会调用对应CreateEmitter的方法,CreateEmitter进而调用对应的observer的方法,它持有的这个observer是一个层层包装之后的observer,然后又开始自上而下层层拆包,不断调用下游observer的onNext()等方法。

上一篇:RxJava2详细攻略(上)


下一篇:一文带你全面了解RxJava