Github
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依赖
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
二.RxJava是什么
RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
RxJava本质上是一个实现异步操作的库
三.RxJava的优势
简洁,在程序逻辑变得越来越复杂时,可以保持程序的简洁。
四.API 介绍和原理
观察者模式
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
观察者设计模式:对象间的一种一对多的依赖关系,以便一个对象的状态发生变化时,所有依赖于它的对象都得到通知并自动刷新。
RxJava 的观察者模式
RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。被观察者和观察者通过 subscribe() 方法实现订阅关系,从而被观察者可以在需要的时候发出事件来通知观察者。
RxJava 的事件回调方法 onNext()、onCompleted() 和 one rror()。
- onNext():RxJava把每个事件单独处理,当一个事件队列完成后,会调用onNext()方法。
- onCompleted(): 事件队列结束,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为完成标志。
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
注意:onCompleted() 和 one rror() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
基本实现
RxJava 的基本实现主要有三点:
- 创建 Observer(观察者)
Observer<String> observer = new Observer<String>()
{
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法执行了: " + s);
}
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法执行了");
}
@Override
public void one rror(Throwable e) {
Log.i(TAG, "onError方法执行了!");
}
};
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的
Subscriber<String> subscriber = new Subscriber<String>()
{
@Override
public void onCompleted()
{
Log.i(TAG, "Completed方法执行了");
}
@Override
public void one rror(Throwable e)
{
Log.i(TAG, "onError方法执行了!");
}
@Override
public void onNext(String s)
{
Log.i(TAG, "onNext方法执行了: " + s);
}
};
两者的使用方式是一样的,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,它们的区别主要有两点:
第一:onStart(),这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法。
第二:unsubscribe(),这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态,可以在onPause() onStop() 等方法中调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
- 创建Observable(被观察者)
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
{
@Override
public void call(Subscriber<? super String> subscriber)
{
subscriber.onNext("Hello");
subscriber.onNext("Rxjava");
subscriber.onNext("RxAndroid");
subscriber.onCompleted();
}
});
可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发。
create() 方法是 RxJava 最基本的创造事件序列的方法。 RxJava 还提供了一些快捷方法用来创建事件队列
just(T...): 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Rxjava", "RxAndroid");
from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] strings = {"Hello", "Rxjava", "RxAndroid"};
Observable observable = Observable.from(strings);
- Subscribe (订阅)
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
基本实现的完整代码:
//观察者Observer
Observer<String> observer = new Observer<String>()
{
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法执行了: " + s);
}
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法执行了");
}
@Override
public void one rror(Throwable e) {
Log.i(TAG, "onError方法执行了!");
}
};
//被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
{
@Override
public void call(Subscriber<? super String> subscriber)
{
subscriber.onNext("Hello");
subscriber.onNext("Rxjava");
subscriber.onNext("RxAndroid");
subscriber.onCompleted();
}
});
//被观察者订阅观察者
observable.subscribe(observer);
或者这样写
Subscriber<String> subscriber = new Subscriber<String>()
{
@Override
public void onCompleted()
{
Log.i(TAG, "Completed方法执行了");
}
@Override
public void one rror(Throwable e)
{
Log.i(TAG, "onError方法执行了!");
}
@Override
public void onNext(String s)
{
Log.i(TAG, "onNext方法执行了: " + s);
}
};
//被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
{
@Override
public void call(Subscriber<? super String> subscriber)
{
subscriber.onNext("Hello");
subscriber.onNext("Rxjava");
subscriber.onNext("RxAndroid");
subscriber.onCompleted();
}
});
//被观察者订阅观察者
observable.subscribe(subscriber);
在或者这样写
//观察者Observer
Observer<String> observer = new Observer<String>()
{
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法执行了: " + s);
}
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法执行了");
}
@Override
public void one rror(Throwable e) {
Log.i(TAG, "onError方法执行了!");
}
};
Observable observable = Observable.just("Hello","Rxjava","RxAndroid");
//String[] strings = {"Hello","Rxjava","RxAndroid"};
//Observable observable = Observable.from(strings);
//被观察者订阅观察者
observable.subscribe(observer);
执行结果:
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法执行了: Hello
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法执行了: Rxjava
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: onNext方法执行了: RxAndroid
01-16 13:35:02.734 12509-12509/com.zhoujian.rxjava I/MainActivity: Completed方法执行了
自定义回调
- 被观察者(Observable)
Observable observable = Observable.just("Hello","Rxjava","RxAndroid");
- 自定义回调的观察者
//自定义回调
Action1<String> nextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "自定义回调----onNext方法执行了: " + s);
}
};
Action1<Throwable> errorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.i(TAG, "自定义回调----onError方法执行了!");
}
};
Action0 completeAction = new Action0() {
@Override
public void call() {
Log.i(TAG, "自定义回调----Completed方法执行了");
}
};
- 订阅(subscribe)
//observable.subscribe(nextAction);
//observable.subscribe(nextAction,errorAction);
observable.subscribe(nextAction,errorAction,completeAction);
注意: Action1 和Action0 是 RxJava 的接口,Action1表示有一个返回值,Action0表示没有返回值。
执行结果:
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定义回调----onNext方法执行了: Hello
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定义回调----onNext方法执行了: Rxjava
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定义回调----onNext方法执行了: RxAndroid
01-16 13:50:26.964 12509-12509/com.zhoujian.rxjava I/MainActivity: 自定义回调----Completed方法执行了
举个例子a:打印数组中的元素
String[] names = {"周杰伦","周星驰","周润发"};
//利用自定义回调
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "姓名:=="+s);
}
});
执行结果:
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周杰伦
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周星驰
01-16 13:55:42.937 12509-12509/com.zhoujian.rxjava I/MainActivity: 姓名:==周润发
举个例子b:加载显示图片
final ImageView img = (ImageView) findViewById(R.id.img);
//被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.mipmap.my);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
});
//观察者
Observer<Drawable> observer= new Observer<Drawable>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted方法执行了");
}
@Override
public void one rror(Throwable e) {
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
Log.d(TAG, "onNext方法执行了");
img.setImageDrawable(drawable);
Log.d(TAG, "图片加载完成了!");
}
};
//被观察者订阅观察者
observable.subscribe(observer);
执行结果:
01-16 14:09:06.752 10355-10355/com.zhoujian.rxjava D/MainActivity: onNext方法执行了
01-16 14:09:06.752 10355-10355/com.zhoujian.rxjava D/MainActivity: 图片加载完成了!
01-16 14:09:06.753 10355-10355/com.zhoujian.rxjava D/MainActivity: onCompleted方法执行了
Snip20170116_1.png
Scheduler(线程控制)
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler。
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。
- Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算
- Android 有一个专用AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制。
subscribeOn(): 指定事件产生所在线程。
observeOn(): 指定事件消费所在的线程。
还是以加载显示图片的例子来说明
final ImageView img = (ImageView) findViewById(R.id.img);
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.mipmap.my);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())//上面的call方法发生在io线程
. observeOn(AndroidSchedulers.mainThread())//下面显示图片发生在主线程
. subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted方法执行了");
}
@Override
public void one rror(Throwable e) {
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
Log.d(TAG, "onNext方法执行了");
img.setImageDrawable(drawable);
Log.d(TAG, "图片加载完成了!");
}
});
说明:被观察者的代码运行在io线程中,观察者的代码运行在主线程中,符合Android的在子线程中获取数据,在主线程中显示数据,更新界面。
变换
变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
- 变换之map():一对一的转换
例子a:把String类型转换成Integer
//被观察者
Observable.just("100")
//变换----map
.map(new Func1<String, Integer>()
{
@Override
public Integer call(String s)
{
return Integer.valueOf(s);
}
})
//自定义观察者
.subscribe(new Action1<Integer>()
{
@Override
public void call(Integer integer)
{
Log.d(TAG, "String转换成Integer完成了,integer的值为:"+integer.intValue());
}
});
执行结果:
01-16 14:56:59.117 10355-10355/com.zhoujian.rxjava D/MainActivity: String转换成Integer完成了,integer的值为:100
Func1 和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
可以看到,map() 方法将参数中的 String 对象转换成一个 Integer 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Integer。这种直接变换对象并返回的,是最常见的也最容易理解的变换。
例子b:把JavaBean对象转换成String类型输出
首先封装一个演员Actor的JavaBean
package com.zhoujian.rxjava.bean;
/**
* Created by zhoujian on 2016/12/29.
*/
public class Actor
{
private String name;
private String sex;
public Actor(String name, String sex)
{
this.name = name;
this.sex = sex;
}
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public String getSex()
{
return sex;
}
public void setSex(String sex)
{
this.sex = sex;
}
@Override
public String toString()
{
return "Actor{" + "name='" + name + '\'' + ", sex='" + sex + '\'' + '}';
}
}
再封装一个电影Movie的JavaBean,一个电影中有多个演员
package com.zhoujian.rxjava.bean;
import java.util.List;
/**
* Created by zhoujian on 2016/12/29.
*/
public class Movie
{
private String name;
private int id;
private String data;
private List<Actor> mactorList;
public Movie(String data, int id, List<Actor> mactorList, String name)
{
this.data = data;
this.id = id;
this.mactorList = mactorList;
this.name = name;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public List<Actor> getMactorList() {
return mactorList;
}
public void setMactorList(List<Actor> mactorList) {
this.mactorList = mactorList;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Movie{" + "data='" + data + '\'' + ", name='" + name + '\'' +", id=" + id + ", mactorList=" + mactorList + '}';
}
}
准备数据
Actor actor1 = new Actor("周星驰", "男");
Actor actor2 = new Actor("张柏芝", "女");
ArrayList<Actor> movie1List = new ArrayList<Actor>();
movie1List.add(actor1);
movie1List.add(actor2);
Movie movie1 = new Movie("1998-10-14", 1, movie1List, "喜剧之王");
Actor actor3 = new Actor("罗志祥", "男");
Actor actor4 = new Actor("张雨绮", "女");
ArrayList<Actor> movie2List = new ArrayList<Actor>();
movie2List.add(actor3);
movie2List.add(actor4);
Movie movie2 = new Movie("2016-05-01", 2, movie2List, "美人鱼");
Movie[] movies = {movie1, movie2};
把JavaBean对象转换成String类型输出
//定义被观察者
Observable observable= Observable.from(movies).map(new Func1<Movie, String>() {
@Override
public String call(Movie movie) {
return movie.toString();
}
});
//定义观察者
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法执行了");
}
@Override
public void one rror(Throwable e) {
Log.i(TAG, "onError方法执行了");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext方法执行了");
Log.i(TAG,"s==="+s);
}
};
//被观察者订阅观察者
observable.subscribe(observer);
执行结果:
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法执行了
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: s===Movie{data='1998-10-14', name='喜剧之王', id=1, mactorList=[Actor{name='周星驰', sex='男'}, Actor{name='张柏芝', sex='女'}]}
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法执行了
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: s===Movie{data='2016-05-01', name='美人鱼', id=2, mactorList=[Actor{name='罗志祥', sex='男'}, Actor{name='张雨绮', sex='女'}]}
01-16 15:34:20.291 31780-31780/com.zhoujian.rxjava I/MainActivity: Completed方法执行了
- 变换之flatMap():一对多的转换
还是上述电影的例子,如果把一个电影转换成多个演员输出,这就要用到flatMap(),一对多的转换。
Observer<Actor> observer = new Observer<Actor>() {
@Override
public void onCompleted() {
Log.i(TAG, "Completed方法执行了");
}
@Override
public void one rror(Throwable e) {
Log.i(TAG, "onError方法执行了");
}
@Override
public void onNext(Actor actor) {
Log.i(TAG, "onNext方法执行了");
Log.i(TAG, "actor===" + actor.toString());
}
};
Observable.from(movies).flatMap(new Func1<Movie, Observable<Actor>>() {
@Override
public Observable<Actor> call(Movie movie) {
return Observable.from(movie.getMactorList());
}
}).subscribe(observer);
执行结果:
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法执行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='周星驰', sex='男'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法执行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='张柏芝', sex='女'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法执行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='罗志祥', sex='男'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: onNext方法执行了
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: actor===Actor{name='张雨绮', sex='女'}
01-16 15:44:18.897 31780-31780/com.zhoujian.rxjava I/MainActivity: Completed方法执行了
从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。
flatMap() 的原理:
第一,使用传入的事件对象创建一个 Observable 对象。
第二, 并不发送这个 Observable, 而是将它激活,于是它开始发送事件。
第三,每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。
变换的原理:lift()
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。像一种代理机制,通过事件拦截和处理实现事件序列的变换。
compose对 Observable 整体的变换
除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。
public class LiftTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
}
}
...
Transformer liftAll = new LiftTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
线程的*控制
还是原来的例子,String转Integer
Observable.just("100")//主线程中执行,由subscribeOn指定
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.map(new Func1<String, Integer>()//IO 线程,由 observeOn() 指定
{
@Override
public Integer call(String s)
{
return Integer.valueOf(s);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>()// Android 主线程,由 observeOn() 指定
{
@Override
public void call(Integer integer)
{
Log.d(TAG, "String转换成Integer完成了,integer的值为:"+integer.intValue());
}
});
作者:蓝枫zeke
链接:https://www.jianshu.com/p/658ff6cc9e9e
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。