RxJava

引言

本文将描述RxJava的设计原理,为了简化,本文并非完全参照RxJava的源码,也不讨论使用RxJava的作用,而从实现角度分析RxJava。本文不讨论RxJava的设计来源,具体请参考“函数式编程”的无副作用。

原理

RxJava使用简单示例

我们来看一个RxJava的一个简单使用示例:

        Observable.just(123)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer i) {
                        return "" + i;
                    }})
                .doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println("log:" + s + " Thread:" + Thread.currentThread().getName());
                    }})
                .filter(new Filter<String>() {
                    @Override
                    public boolean filter(String s) {
                        return s != null && s.length() > 0;
                    }})
                .subscribeOn(Schedules.ASYNC)
                .observeOn(Schedules.MAIN)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) {
                        System.out.println("result:" + s  + " Thread:" + Thread.currentThread().getName());
                    }});

运行得到结果:
I/System.out: log:123 Thread:Thread-2
I/System.out: result:123 Thread:main

分析

上述RxJava并非使用官方源库,而是本文自定义的RxJava,也能达到官网RxJava一样的效果。
RxJava并非在调用map、doNext、filter、subscribeOn、ObserveOn等操作符时,立即调用内部的方法,基于函数式编程无副作用理论,我们对其进行包一层Observable,在subscribe的使用,也并非立即去消费对应Observer的内容,也调用上一层Observable对应的Observer。如图所示:
RxJava

从上图可知,RxJava调用操作符时,并没有直接调用到其内部的方法。它每调用一次操作符就new了一个与之对应的Observable对象,调到最后开始subscribe的时候,就new了一个与之对应的Observer传参,最后调到最开始ObservableJust的时候,就开始进行onNext、onError、onComplete等操作,注意,现在在ObservableJust中最开始调用的是最外层的ObserveObserver的onNext,之后再层层往内调用,最后调用到我们传递的Observer。每次调用时,我们可以对其进行线程切换,如在ObservableSubscribeOn层subscribeOn(ASYN)时,就对后续的操作都放到了子线程中执行,再在ObservableObsereOn层中的onNext时,又可以将线程切换到Main线程。

源码

简单起见,我们简化一下上述:

        Observable.just(123)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer i) {
                        return "" + i;
                    }})
                .subscribeOn(Schedules.ASYNC)
                .observeOn(Schedules.MAIN)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }

                    @Override
                    public void one rror(Throwable r) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onSubscribe() {
                        System.out.println("onSubscribe");
                    }
                });

针对上述案例,我们抽象出两个接口,1、Observer,2、ObservableSource。通过Observable开始分发事件。

public interface ObservableSource<T> {
    void subscribe(Observer<T> observer);
}

public interface Observer<T> {
    void onNext(T t);
    void onComplete();
    void one rror(Throwable r);
    void onSubscribe();
}

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> just(T item) {
        return new ObservableJust<>(item);
    }
    public <R> Observable<R> map(Function<T, R> function) {
        return  (Observable<R>) new ObservableMap<>(this, function);
    }
    public Observable<T> subscribeOn(Schedules schedules) {
        return new ObservableSubscribeOn<>(this, schedules);
    }
    public Observable<T> observeOn(Schedules schedules) {
        return new ObservableObserveOn<>(this, schedules);
    }
    public void subscribe(Consumer<T> consumer) {
        this.subscribe(new LambdaObserver<>(consumer, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION));
    }
}

从ObservableJust出发,本源码简化了RxJava官方的源码,如下:

public class ObservableJust<T> extends Observable<T> {
    private final T value;

    public ObservableJust(T value) {
        this.value = value;
    }

    @Override
    public void subscribe(Observer<T> observer) {    // 最后调用这里,才开始onNext等
        observer.onSubscribe();
        try {
            observer.onNext(value);
            observer.onComplete();
        } catch (Throwable r) {
            observer.onError(r);
        }
    }
}

之后就是map

// 主要用来保存srouce Observable,如ObservableJust.map之后,就new了一个ObservableMap,在该ObservableMap中保存了ObservableJust的引用,这就是装饰器模式,可以参考JVM的IOStream源码理解。
// 这样就能在sbscribe的时候,调用source.subscribe了,并进行功能增强,如线程切换等。
public abstract class AbstractObservableWithUpStream<T, R> extends Observable<T> {    是
    protected final ObservableSource<T> source;

    protected AbstractObservableWithUpStream(ObservableSource<T> source) {
        this.source = source;
    }
}

public class ObservableMap<T, R> extends AbstractObservableWithUpStream<T, R>{
    private final Function<T, R> function;
    public ObservableMap(ObservableSource<T> source, Function<T, R> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribe(Observer<T> observer) {
        source.subscribe(new MapObserver<T, R>((Observer<R>) observer, function));
    }

    private static class MapObserver<T, R> extends BasicObserver<T, R> {
        final Function<T,R> mapper;

        public MapObserver(Observer<R> actual, Function<T,R> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R r = mapper.apply(t);    // 这里调用了Function中的apply方法,以进行业务能力扩展
            actual.onNext(r);
        }
    }
}

之后subscribeOn

public class ObservableSubscribeOn<T> extends AbstractObservableWithUpStream<T, T> {
    private final Schedules schedules;


    protected ObservableSubscribeOn(ObservableSource<T> source, Schedules schedules) {
        super(source);
        this.schedules = schedules;
    }

    @Override
    public void subscribe(Observer<T> observer) {    // 在subscribe的时候进行线程切换
        if (schedules == Schedules.MAIN) {
            source.subscribe(new SubscribeOnObserver<>(observer));
        } else if(schedules == Schedules.ASYNC) {
            Schedules.executorService.submit(() ->
                    source.subscribe(new SubscribeOnObserver<>(observer)));
        }
    }

    private static class SubscribeOnObserver<T> extends BasicObserver<T, T> {
        public SubscribeOnObserver(Observer<T> actual) {
            super(actual);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
    }
}

onServeOn

public class ObservableObserveOn<T> extends AbstractObservableWithUpStream<T, T> {
    private final Schedules schedules;
    protected ObservableObserveOn(ObservableSource<T> source, Schedules schedules) {
        super(source);
        this.schedules = schedules;
    }

    @Override
    public void subscribe(Observer<T> observer) {
        source.subscribe(new ObserveOnObserver<T>(observer, schedules));
    }

    private static class ObserveOnObserver<T> extends BasicObserver<T, T> {
        private final Schedules schedules;
        public ObserveOnObserver(Observer<T> actual, Schedules schedules) {
            super(actual);
            this.schedules = schedules;
        }

        @Override
        public void onNext(T t) {    // 在onNext的时候切换线程
            if (schedules == Schedules.MAIN) {
                new Handler(Looper.getMainLooper()).post(() -> {
                    actual.onNext(t);
                });
            } else if (schedules == Schedules.ASYNC) {
                Schedules.executorService.submit(() -> actual.onNext(t));
            }
        }
    }
}

最后回调示例代码

RxJava

总结

本文通过图解,源码,以及调用示例进行RxJava分析,同时,我们也可以如何自定义操作符,继承Observable,之后构建对应的操作符的Observable类和Observer类。

上一篇:2022年全球市场饲料级硫酸锌总体规模、主要生产商、主要地区、产品和应用细分研究报告


下一篇:rxjs 的 observable 是什么?