Rxjava原理解析
RxJava 的源码解析会从四个方面去分析,分别是 RxJava的创建过程、订阅过程、变换过程和线程切换过程。
Rxjava的创建过程
先看一段 RxJava 的基本使用方法,如下所示:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void one rror(Throwable e) {
}
@Override
public void onComplete() {
}
});
之后查看Observable的create方法做了什么:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
// //创建了一个ObservableCreate类,里面包装了我们传入的source参数
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
返回值是Observable,参数是ObservableOnSubscribe,参数的定义如下:
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableOnSubscribe是一个接口,里面就一个方法,也是我们实现的那个方法: subscribe(...)方法。
subscribe()方法的参数是ObservableEmitter接口,该接口实现如下:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
ObservableEmitter也是一个接口,继承自Emitter,里面定义了很多方法。
接下来看看RxJavaPlugins.onAssembly(...)方法的定义:
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
这是一个关于hook的方法,直接返回source,即传入的对象,相当于new ObservableCreate<T>(source).
Rxjava订阅过程
Rxjava的订阅从subscribe(...)方法开始:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// Hook相关的操作
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
///调用subscribeActual方法,然后入参是observer(被观察者)。这个方法是抽象方法,具体的实现是交给子类的
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
最终通过 subscribeActual(observer) 来实现功能,而这个方法是有具体的子类去实现的。先来看看Observable类的源码:
public abstract class Observable<T> implements ObservableSource<T> {
......
protected abstract void subscribeActual(Observer<? super T> observer);
......
}
通过Observable.create()来生成的被观察者,里面最终的生成的是 ObservableCreate这个类。
而Observable中的subscribeActual(observer)方法是在ObservableCreate类中生成的。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// //这里将我们传入的被观察者进行了一层封装,里面实现了ObservableEmitter<T>, Disposable等接口,是一种装饰者模式
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// /调用被观察者的onSubscribe方法
observer.onSubscribe(parent);
try {
// //这里的source就是我们自己写的那个ObservableOnSubscribe,参数是封装后的观察者。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
CreateEmitter实现了里面实现了ObservableEmitter接口,我们可以看看它的源码:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
one rror(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// 调用的是观察者的onNext()方法
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
// //调用的是观察者的onComplete()方法
observer.onComplete();
} finally {
dispose();
}
}
}
......
}
其中,onNext(...)和onComplete()方法都在CreateEmitter类中实现,然后通过observer去调用至此,订阅流程分析完成。
Rxjava变换过程
Rxjava变换的过程通过map和flatMap操作符实现,比如Observable发送String类型的数字,通过map操作符转换成int类型,最终在Observer所接收到的也是int类型的数据。
变换的过程如下所示:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.w(TAG, String.valueOf(integer));
}
@Override
public void one rror(Throwable e) {
}
@Override
public void onComplete() {
}
});
接下来,我们看看map(...)函数的源码实现:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
可以看出,这里也是通过Hook的方式实现的,我们来看看ObservableMap类:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
// super(...)将上游的Observable保存起来 ,用于subscribeActual()中用。
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
......
}
可以看到ObservableMap类继承自AbstractObservableWithUpstream类,将上游的ObservableSource保存起来进行包装,所以它也算是装饰者模式的。
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
ObservableSource,代表了一个标准的无背压的源数据接口,可以被Observer订阅。
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
所有的Observable都实现了它,所以我们可以认为Observable和ObservableSource在本文中是相等的:
public abstract class Observable<T> implements ObservableSource<T> { ...... }
所以,得到的ObservableMap对象就是将上游的Observable和变换函数类Function保存起来。然后我们来看看Function:
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
然后,我们回到ObservableMap类的subscribeActual,这里才是订阅的核心方法:
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
MapObserver也是装饰者模式,对下游的Observer进行修饰。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
// 保存Fuction的变量
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
// 将上游传过来的T,利用Function转换成下游需要的U
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 变换后传递给下游Observer
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
源头Observer传递数据给下游的MapObserver,通过Function对数据进行变化操作,再调用内部保存的下游Observer的onNext(...)方法来发送数据给下游,一直向下发送到终点的Observer。
Rxjava线程调度
Rxjava线程切换的代码如下所示:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void one rror(Throwable e) {
}
@Override
public void onComplete() {
}
});
在子线程完成耗时的操作,然后在主线程进行UI方面的操作,整个线程调度就是为实现该功能。
subscribeOn
我们可以通过 subscribeOn方法来进行对上层流的线程处理:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
可以看到这里和map()操作符的源码非常的相似,看一下ObservableSubscribeOn类的源码:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
// 保存线程调度器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// map的源码中我们分析过,super()只是简单的保存ObservableSource
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 创建一个包装Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 调用订阅者的onSubscribe方法,这里的线程还未进行切换
s.onSubscribe(parent);
//进行线程的切换处理
// 1.创造一个SubscribeTask的Runable方法
// 2.通过scheduler的scheduleDirect进行线程的切换
// 3.通过parent.setDisposable来进行Disposable的切换
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
......
}
ObservableSubscribeOn自身同样是个包装类,同样继承AbstractObservableWithUpstream(Observable子类)。再来看看SubscribeTask()方法的源码:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// source是我们上一层的被观察者,parent是包装之后的观察者.
// 所以会在相关的worker里面调用source的subscribe方法,
// 即上层的数据调用已经在woker里面了(如果是IoScheduler,那么这里就是在RxCachedThreadScheduler线程池调用了这个方法 )
source.subscribe(parent);
}
}
然后看一下这里面最重要的 scheduler.scheduleDirect() 这个方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
当调用 subscribeOn()方法的时候,会在创建的调度器中来执行被观察者的执行代码,从而实现了对上层的线程切换功能。
ObserverOn
observeOn()方法是属于Observable这个类的。我们跟踪进去这个方法去看看:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
继续跟踪observerOn()方法,代码如下所示:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
这里创建了一个 ObservableObserveOn对象,所以和之前基础里面将的一样,当调用 subscribe() 方法的时候,会先调用观察者的 onSubscribe() 方法,然后通过subscribe的层层处理,调用这个被观察者里面的 subscribeActual()方法。(订阅的时候)
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 根据传入的scheduler,创建Worker
Scheduler.Worker w = scheduler.createWorker();
// 将传入的observer进行包装,包装为ObserveOnObserver类。
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
当调用subscribe方法的onNext(),onComplete()方法,其实是调用的观察者的方法。我们现在看一下ObserveOnObserver的onNext和onComplete方法又是做了什么:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
// 将onNext的数据放入队列queue
queue.offer(t);
}
// 进行线程切换
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// 调用了worker的方法,这里通过调用线程池,调用了自身的run方法
worker.schedule(this);
}
}
在subscribeActual(...)方法中Work的创建通过如下代码:
Scheduler.Worker w = scheduler.createWorker();
这里的Scheduler就是HandlerScheduler,我们来看看HandlerScheduler中的createWorker()方法:
@Override
public Worker createWorker() {
// 创建了一个HandlerWorker对象
return new HandlerWorker(handler);
}
可以看到,这里创建了HandlerWorker对象,跟踪到该类的schedule()方法:
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
// 留下钩子
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 通过Handler来发送延时消息操作来验证
Message message = Message.obtain(handler, scheduled);
message.obj = this;
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
最后,看看ScheduledRunnable类的源码:
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
其实,最后的delegate.run()方法就是调用了ObservableObserveOn的run()方法。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
可以看到run方法中判断了outputFused的真假,然后分别调用了drainFused和drainNormal方法。这里的outputFused是与RxJava2中的背压处理相关暂时先不管,根据方法名也能知道正常调用会执行drainNormal方法,于是直接来看drainNormal方法。
void drainNormal() {
int missed = 1;
// 存放onNext传入的事件对象队列
final SimpleQueue<T> q = queue;
// 传入的观察者对象
final Observer<? super T> a = actual;
// 循环check事件是否完成或者发生错误
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
// 从队列中取出发送事件传入的对象
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
// 执行观察者对象的onNext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
drainNormal方法中先通过checkTerminated方法校验发送事件是否完成或者发生异常,接着从队列中取出事件对象,再次判断是否完成或者发生错误和取出的对象是否为空,没有问题的话就会执行观察者的onNext方法。而发送完成和出现异常的方法则是在checkTerminated方法处理。
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
在checkTerminated方法里根据delayError判断是否设置了超时的错误,接着再根据获得的错误e是否为空再决定调用的是观察者的onError()方法还是onComplete方法。至此observeOn切换线程的流程也梳理结束。
总结
观察者模式还是Rxjava的核心设计思想。除此之外,通过源码阅读还发现,无论在线程切换方面还是其它功能的操作符的实现,根本上来说都是在其原有的被观察者或观察者基础上包装成一个新的对象,功能逻辑由新对象中的方法来实现完成。