Rxjava 的使用
github地址:https://github.com/ReactiveX/RxJava
参考自:
https://mcxiaoke.gitbooks.io/rxdocs/content/
https://blog.csdn.net/weixin_36709064/article/details/82919785
https://www.jianshu.com/p/25682d620320
https://blog.csdn.net/jdsjlzx/article/details/54845517
rxjava :一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
了解 ReactiveX :
ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想,RxJava 是 ReactiveX 在 Java 上的开源的实现。Observable(观察者) 和 Subscriber(订阅者)是两个主要的类。
http://gank.io/post/560e15be2dca930e00da1083
目录
一、操作符
1、创建 被观察者 的操作符
Observable 通过静态方法创建被观察者
1、 Observable.create() : 最常用的操作符,用于创建一个具有发射事件能力的被观察者
EP: Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("World"); emitter.onComplete(); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.e("TAG", s); } });
2、Observable.just();只是简单的原样发射数据(接收的是什么类型的数据,发射的就是什么类型的数据)
String[] arr = new String[]{"ss1", "ss2", "ss3"};
Observable.just(arr).subscribe(new Consumer<String[]>() {
@Override
public void accept(String[] s) throws Exception {
Log.e("TAG", "" + s.length);
}
});//--------------------------------------------------
Observable.just("1","2","3","4") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { } });
3、Observable.from(): from系列可以将数组或Iterable的元素拿出来,做成多个事件进行发射
Observable.fromArray(arr);//fromArray里面放数组 (将数组元素一个个发射)
Observable.fromIterable(stringList)//里面放集合(将集合元素一个个发射)
//--注意:int常量类型的数组要转换成Interger对象,否则fromArray会将数组当作一个数组对象数据处理,ep:
//--它将int常量类型的数组当作一个数组对象发射了,没有进行拆分 int[] ints={1,2,3,4,5}; Observable.fromArray(ints) .subscribe(new Consumer<int[]>() { @Override public void accept(int[] ints) throws Exception { } })
4、Observable.fromCallable():该方法的参数是一个回调方法,将该回调方法的返回值装入事件进行发送
EP:
Observable.fromCallable(new Callable<ArrayList<String>>() { @Override public ArrayList<String> call() throws Exception { ArrayList<String> aa=new ArrayList<>(); for(int i=0;i<5;i++){ aa.add("这是发送的数据"); } return aa; } });
5、Observable.interval()按照固定时间间隔发送(无线循环)
ep: 每隔500毫秒执行一次 Observable.interval(500, TimeUnit.MILLISECONDS, Schedulers.trampoline()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println("当前:"+aLong); } });
//--period--间隔时间,unit--时间单位,scheduler--线程调度器 interval(long period, TimeUnit unit, Scheduler scheduler) //--period--间隔时间,unit--时间单位 interval(long period, TimeUnit unit) //initialDelay--初始开始执行的时间,--period--间隔时间,unit--时间单位,scheduler--线程调度器 interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
6、Observable.intervalRange(); 按照固定时间间隔发送(循环有限次数)
EP: //从5开始循环,循环15次,从0秒开始执行,每次间隔500毫秒。 Observable.intervalRange(5,15,0,500,TimeUnit.MILLISECONDS,Schedulers.trampoline()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { System.out.println("当前:"+aLong); } });
//start--开始循环范围,count--循环次数,initialDelay--开始时间,period--间隔时间 intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
2、功能型的操作符(依赖于被观察者)
1、转换类操作符 (流操作符,作用于被观察者)
转化类操作符,可以将一个被观察者通过某种方法,转化为另外的被观察者! 变换的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy。
1)Observable.map:map的作用是将每一个发射的事件,应用于一个函数,从而可以对事件做相应处理,事件数量不会变化.一对一的关系,map实质就是拦截这个被观察者对象,然后进行处理,转换成另一个被观察者对象,订阅者收到的对象就是转换后的对象
Observable.just("this is test") .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { String tt="这是map后的结果"+s; return tt; } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { } });
2)Observable.flatMap():flatMap 操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。(合并的数据不保证数据的顺序)
*注意: 转换类操作符都是借用function系列方法进行变换操作,flatmap 的不同点是function 函数的输出对象也是一个Observable.
ep:
ArrayList<Student> students=new ArrayList<>(); //--发射学生集合数据,得到每个学生 Observable.fromIterable(students) //--fromIterable 作用是拆分集合 //--- 将学生对象转换为课程对象 .flatMap(new Function<Student, ObservableSource<Course>>() { @Override public ObservableSource<Course> apply(Student student) throws Exception { //--发射每个学生的每门课程 return Observable.fromIterable(student.getCourses()); } }) .subscribe(new Consumer<Course>() { //--接收到的是所有学生的所有课程(平铺了数据) @Override public void accept(Course course) throws Exception { } });
//---.flatMap 的特点就是使用的Function系列的转换函数,里面的输出对象 是一个本身也发射数据的Observable
flatmap的使用特点:
1、他可以简单遍历二位数据,处理象数组或者是集合这类观察者对象(就是可以进行二次拆分数据)
https://www.jianshu.com/p/0524d7914429
2、可解决网络请求嵌套的问题 (可以进行多层次发射数据---因为flatmap的输出对象是一个本身也发射数据的Observable)
https://blog.csdn.net/jdsjlzx/article/details/51493552
3)Observable.concatMap();与上面的flatMap作用基本一样,与flatMap唯一不同的是concat能保证Observer接收到Observable集合发送事件的顺序。
4)Observable.buffer();将发射数据分成若干段,然后把每一段发射数据组成一个新的Observable,然后发射这个新的observable
EP: 按照每段2位数,发射数据 Observable.just(1,2,3,4,5) .buffer(2) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Exception { for (Integer integer : integers) { System.out.println("buffer:"+String.valueOf(integer)); } System.out.println("============================"); } }) 输出结果: buffer:1 buffer:2 buffer:3 ============================ buffer:4 buffer:5 ============================ //------------------------------------------------------------------- 按照每段3位数,发射数据,新开始的数据跳过前面2位数 Observable.just(1,2,3,4,5) .buffer(3,2) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> integers) throws Exception { for (Integer integer : integers) { System.out.println("buffer:"+String.valueOf(integer)); } System.out.println("============================"); } }) ; 输出结果:
buffer:1
buffer:2
buffer:3
============================
buffer:3
buffer:4
buffer:5
============================
buffer:5
============================
2、合并类操作符(流操作符)
合并类操作符的作用就是将多个Observable 合并 做为自己的Observable ,发射事件。
包含:concatMap(),concat(), merge(), mergeArray (),concateArray(),reduce(),collect(),startWith(),zip(),count()。
1)Observable.combineLatest:会合并 被合并的每一个Observable 的最新值
EP:
Observable ob1= Observable.just("1","2","3","4","5"); Observable ob2= Observable.just("6","7","8","9","10"); Observable.combineLatest(ob1, ob2, new BiFunction<String, String, String>() { @Override public String apply(String s, String s2) throws Exception { System.out.println("当前的值:"+s+"----"+s2); return s+s2; } }) .subscribe(new Consumer<String>() { @Override public void accept(String ss) throws Exception { } }) ;
输出结果:
当前的值:5----6
当前的值:5----7
当前的值:5----8
当前的值:5----9
当前的值:5----10
2)Observable.zip():使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合!
Observable<Integer> observable1=Observable.just(1,2,3,4); Observable<Integer> observable2=Observable.just(1,2); Observable<String> observable3=Observable.just("s1", "s2", "s3"); Observable.zip(observable1, observable2, observable3, new Function3<Integer, Integer, String, String>() { @Override public String apply(Integer integer, Integer integer2, String s) throws Exception { return integer.intValue()+"---" + integer2.intValue() +"----"+ s; } }).subscribe(s -> System.out.println(s));
输出结果:
1---1----s1
2---2----s2
3、Observable.merge | Observable.mergeWith:将两个Observable合并成一个Observable,发射数据,数据没有进行任何变化
EP: Observable<Integer> observable1=Observable.just(1,2,3,4); Observable<Integer> observable2=Observable.just(9,10); Observable.merge(observable1,observable2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer.intValue()); } }); observable1.mergeWith(observable2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println(integer.intValue()); } });
输出结果:上面的merge()和mergeWith()输出结果一样
1
2
3
4
9
10
4、Observable.startWith | Observable.startWithArray():在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列,startWithArray通过传递一个数组先发射数组里面的数据。最后发射Observable的数据。
EP: Observable.just("10","11") .startWithArray("1","2") .startWith("5") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });
输出结果:
5
1
2
10
11
3、过滤类操作符(流操作符)
过滤类操作符,按照某个条件过滤掉不符合该条件的的发射事件,下游就会接收不到被过滤的事件。
1)Observable.distinct() : 过滤掉重复的事件
EP: Observable.just(1,1,2,1,3,2) .distinct() .subscribe(integer -> System.out.println(integer.intValue()));
输出结果:
1
2
3
2)Observable.filter():设置过滤条件,过滤掉不符合条件的。
EP: Observable.just(1,1,2,1,3,2) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer.intValue()==2; } }) .subscribe(integer -> System.out.println(integer.intValue()));
输出结果:
2
2
3)Observable.skip():跳过前n项数据,或者是跳过开始的某一段时间的数据
EP:
1、跳过前3项的数据
Observable.just(1,2,3,4,5,6,7) .skip(3) .subscribe(integer -> System.out.println(integer.intValue()));
输出结果:
4
5
6
72、跳过开始某一段时间的数据
Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline()) //--每500毫秒执行一次,无线循环 .skip(3,TimeUnit.SECONDS) //跳过前3s的数据 .subscribe(aLong -> System.out.println(aLong));
输出结果:
5
6
7
8
9....
4)Observable.take() / takeUntil() / takeWhile() / takelast():
Observable.take():take方法传递一个long型参数,表示执行的次数--也就是保留前n项数据(功能与skip相反)
EP: Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline()) .take(5) .subscribe(aLong -> System.out.println(aLong));
结果:
0
1
2
3
4
Observable.takeUntil():当发返回值为true的时候终止 (包含临界条件)
Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline()) .takeUntil(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { return aLong==3; } }) .subscribe(aLong -> System.out.println(aLong));
结果:
0
1
2
3
Observable.takeWhile():当 返回值是false 的时候终止(并且不包含临界条件的item )
Observable.interval(500,TimeUnit.MILLISECONDS,Schedulers.trampoline()) .takeWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { return aLong!=3; } }) .subscribe(aLong -> System.out.println(aLong));
结果:
0
1
2
Observable.takelast():传递的参数是long型,发射最后n项数据(与take()相反)
Observable.just("1","2","3","4","5","6","7") .takeLast(3) .subscribe(aLong -> System.out.println(aLong));
结果:
5
6
7
二、线程控制Scheduler
Scheduler :
Scheduler :RxJava 通过线程调度器来指定每一段代码应该运行在什么样的线程,利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。
创建类操作符 ----事件的产生,由subscribeOn来控制运行在哪种线程里。subscribeOn不论出于何种位置,都只会执行一次。
非创建类操作符---事件的消费 ,由observeOn来控制 运行在哪种线程里。可以通过多次设定observeOn,来指定不同的代码运行在不同的线程里。
observeOn 切换线程原理:observeOn 切换的是observeOn之后执行的代码,如果后面没有observeOn进行再次切换的话,后面的所有代码都运行在首个observeOn切换的线程里。
subscribeOn 主要控制事件发射运行的线程(默认是在主线程),subscribeOn 之所以只起一次作用,因为它作用于事件发射(被观察者)。--发射事件后不可能再次发射这次事件,所以只有一次起作用
EP:
Observable.just(1, 2, 3, 4) .subscribeOn(Schedulers.io()) //指定创建类操作符(just)运行线程 .observeOn(Schedulers.newThread()) //指定map(mapOperator)运行线程 .map(mapOperator) .observeOn(Schedulers.io())//指定map(mapOperator2)运行线程 .map(mapOperator2) .observeOn(AndroidSchedulers.mainThread) //指定subscribe(subscriber)运行线程 .subscribe(subscriber);
内置的Scheduler
RxJava 已经内置了几个 Scheduler
,它们已经适合大多数的使用场景:
已经内置的调度器的种类
下表展示了RxJava中可用的调度器种类:
调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算的 Scheduler 。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如xml,json文件的解析,Bitmap图片的压缩取样等。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.immediate( ) | 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler 。(rxjava2 已废弃) |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 启用新线程,并在新线程执行操作 |
Schedulers.trampoline( ) | 在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。(等价于Schedulers.immediate( )) |
默认调度器
在RxJava中,某些Observable操作符的变体允许你设置用于操作执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操作符的默认调度器:
操作符 | 调度器 |
---|---|
buffer(timespan) | computation |
buffer(timespan, count) | computation |
buffer(timespan, timeshift) | computation |
debounce(timeout, unit) | computation |
delay(delay, unit) | computation |
delaySubscription(delay, unit) | computation |
interval | computation |
repeat | trampoline |
replay(time, unit) | computation |
replay(buffersize, time, unit) | computation |
replay(selector, time, unit) | computation |
replay(selector, buffersize, time, unit) | computation |
retry | trampoline |
sample(period, unit) | computation |
skip(time, unit) | computation |
skipLast(time, unit) | computation |
take(time, unit) | computation |
takeLast(time, unit) | computation |
takeLast(count, time, unit) | computation |
takeLastBuffer(time, unit) | computation |
takeLastBuffer(count, time, unit) | computation |
throttleFirst | computation |
throttleLast | computation |
throttleWithTimeout | computation |
timeInterval | immediate |
timeout(timeoutSelector) | immediate |
timeout(firstTimeoutSelector, timeoutSelector) | immediate |
timeout(timeoutSelector, other) | immediate |
timeout(timeout, timeUnit) | computation |
timeout(firstTimeoutSelector, timeoutSelector, other) | immediate |
timeout(timeout, timeUnit, other) | computation |
timer | computation |
timestamp | immediate |
window(timespan) | computation |
window(timespan, count) | computation |
window(timespan, timeshift) | computation |
三、重要的方法和接口
1、重要的接口
1)ObservableOnSubscribe:发射器回调接口
public interface ObservableOnSubscribe<T>{
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
:ObservableOnSubscribe接口只有一个方法,这个方法提供了发射数据的功能,做为Observable.create() 传递的参数
2)ObservableEmitter:数据发射器接口
ObservableEmitter 是数据发射器接口:
ObservableEmitter 继承自Emitter接口,Emitter是一个发射器接口,是具体实现发射数据的接口,
例如:Emitter中的onNext()方法就是发射数据用的。
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); @NonNull ObservableEmitter<T> serialize(); boolean tryOnError(@NonNull Throwable t);
}
3)Observer:订阅者接口
Observer是订阅者接口:
Observer 是用于接收被观察者的数据
public interface Observer<T> {void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void one rror(@NonNull Throwable e);void onComplete();
}
4)Consumer:消费者接口(功能类似于订阅者接口)
Consumer是消费者接口
Consumer用于接收被观察者的数据。
public interface Consumer<T> { void accept(T t) throws Exception; }
5)Disposable:代表整个观测组序列事件
Disposable 代表一个可支配的资源
Disposable接口是代表一个可支配的支援,当被观察者被订阅,执行 .subscribe()方法,该方法返回的就是一个Disposable,即就是一个可支配的支援。
public interface Disposable { void dispose(); boolean isDisposed(); }
6)Function系列的接口:对象转换接口
Function用于变换对象,使用于map,flatmap这种需要对象转换的情况。
例如 function接口:
public interface Function<T, R> { //--T--输入对象,R--输出对象 R apply(@NonNull T t) throws Exception; }
function系列的接口按照输入对象的个数排序为:
IntFunction<T> ---这个没有输出对象 Function<T, R> BiFunction<T1, T2, R> Function3<T1, T2, T3, R> Function4<T1, T2, T3, T4, R> Function5<T1, T2, T3, T4, T5, R> Function6<T1, T2, T3, T4, T5, T6, R> Function7<T1, T2, T3, T4, T5, T6, T7, R> Function8<T1, T2, T3, T4, T5, T6, T7, T8, R> Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R>
;
4、具体使用注意事项:
1、内存溢出问题 :在使用rxjava的时候,如果没有及时解除订阅,在退出activity的时候,异步线程还在执行。
对activity还存在引用,此时activity不能被回收,因此出现内存溢出问题。
rxjava执行异步任务时,通常需要在Activity/Fragment销毁时,及时关闭异步任务,否则就会有内存泄漏。
方式一: 使用CompositeDisposable 管理Disposable对象,在Activity/Fragment销毁时 调用CompositeDisposable.clear()关闭所有异步任务。
compositeDisposable.add(disposable);
@Override public void onDestroy() { iView=null; if(compositeDisposable!=null){ compositeDisposable.clear(); compositeDisposable=null; } }
方式二:RxLifecycle 取消订阅 (了解 lifecycle)