RxJava使用详解

RxJava是一种异步数据处理库,也是一种扩展的观察者模式。对于Android开发者来说,使用RxJava时也会搭配RxAndroid,它是RxJava针对Android平台的一个扩展,用于Android 开发,它提供了响应式扩展组件,使用RxAndroid的调度器可以解决Android多线程问题。

观察者模式

四大要素:Observable(被观察者),Observer (观察者),subscribe (订阅),事件。
观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。
RxJava使用详解
扩展的观察者模式
RxJava使用详解
当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError(),onError()和onComplete()只会回调一个。

引入依赖

    implementation 'io.reactivex.rxjava3:rxjava:3.1.3'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
        //创建被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("Hello Uncle Xing");
                emitter.onComplete();
            }
        });
        //创建观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }

            @Override
            public void one rror(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        };
        //订阅事件
        observable.subscribe(observer);

操作符

创建Observable

create:用于创建Observable

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("Hello Uncle Xing");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }

            @Override
            public void one rror(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        });

just:为你创建一个Observable并自动调用onNext发射数据,just中传递的参数将直接在Observer的onNext方法中接收到

        Observable.just("Uncle Xing").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.i(tag, "onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.i(tag, "onNext:" + s);
            }

            @Override
            public void one rror(@NonNull Throwable e) {
                Log.i(tag, "onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(tag, "onComplete");
            }
        });

interval:创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定1秒一次调用onNext()方法。

        Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Long aLong) {
                Log.i(tag, "count:" + aLong); //这里是非主线程,会隔1s打印出0,1,2,3....
            }

            @Override
            public void one rror(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

timer:创建一个Observable,它在一个给定的延迟后发射一个值,即表示延迟1秒后,调用onNext()方法

        Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.i(tag, "count:" + aLong);
            }

            @Override
            public void one rror(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

转换Observable

map:把原来的Observable对象转换成另一个Observable对象

        Observable.just(666).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Throwable {
                return integer.toString();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.i(tag, "map:" + s);
            }
        });

flatMap:对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象

        Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Throwable {
                return Observable.just(integer.toString());
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.i(tag, "accept:" + s);
            }
        });

buffer:定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值

        Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Throwable {
                Log.i(tag, integers.toString());
            }
        });

Log会分两次打印,第一次打印 [1, 2, 3],第二次打印 [4, 5, 6]

过滤Observable

distinct:去掉重复数据

        Observable.just(1, 2, 3, 4, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.i(tag, "distinct:" + integer);
            }
 
            @Override
            public void one rror(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

elementAt:取出指定位置的数据

        Observable.just(1, 2, 3, 4).elementAt(1).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onSuccess(@NonNull Integer integer) {
                Log.i(tag, "onSuccess:" + integer);
            }

            @Override
            public void one rror(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

filter:对数据进行指定规则的过滤

        Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Throwable {
                return integer > 1;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "filter:" + integer);
            }
        });

组合Observable

zip:通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项

        Observable<Integer> observable = Observable.just(10, 20, 30, 40);
        Observable<Integer> observable2 = Observable.just(1, 2, 3);
        Observable.zip(observable, observable2, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                return integer + integer2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "zip:" + integer);
            }
        });

注意:当其中一个Observable发送数据结束或异常,另外一个也停止发送,所以这里只会打印出11,22,33

merge:合并多个Observables的发射物

        Observable<Integer> observable = Observable.just(10, 20, 30, 40);
        Observable<Integer> observable2 = Observable.just(1, 2, 3);
        Observable.merge(observable, observable2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "merge:" + integer);//会打印出10,20,30,1,2,3
            }
        });

startWith:在数据序列的开头插入一条指定的项

        Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable2 = Observable.just(1, 2, 3);
        observable.startWith(observable2).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Throwable {
                Log.i(tag, "startWith:" + integer);
            }
        });

错误处理

  1. onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止;
  2. onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列;
  3. onExceptionResumeNext:让Observable在遇到错误时继续发射后面的数据项。

Schedulers(调度器):以一种及其简单的方式解决多线程问题的机制

  1. io():用于I/O操作;
  2. computation():计算工作默认的调度器;
  3. immediate():立即执行,允许立即在当前线程执行你指定的工作;
  4. newThread():为指定任务创建新线程;
  5. trampoline():顺序处理,按需处理队列,并运行队列的每一个任务。

AndroidSchedulers:RxAndroid提供在Android平台的调度器,指定观察者在主线程。

SubscribeOn用于每个Observable对象,ObserveOn用于每个Observer对象

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                emitter.onNext(100);
                emitter.onComplete();
                Log.i(tag, "subscribe thread:" + Thread.currentThread().getName());//打印subscribe thread:RxNewThreadScheduler-1
            }
        }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer integer) {
                        Log.i(tag, "onNext thread:" + Thread.currentThread().getName());//打印onNext thread:main
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

管理RxJava的生命周期

在使用RxJava的时候,如果没有及时解除订阅,在退出Activity的时候,异步线程还在执行,对Activity的引用还在,此时就会产生内存泄露问题。

一般的做法是订阅成功后,拿到Disposable对象,在Activity或Fragment销毁时,调用Disposable对象的dispose()方法,将异步任务中断,也就是中断RxJava的管道。

        Disposable disposable = Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Throwable {
                Log.i(tag, "accept:" + aLong);
            }
        });
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }

但是,这种做法的开发效率是不高的,试想一下,如果开启了多个异步任务,就需要在Activity或Fragment销毁时中断多个异步任务。这里推荐使用RxLifecycle,传送门:https://github.com/trello/RxLifecycle

引入依赖

    implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
    implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'

让你的Activity继承RxAppCompatActivity,Fragment继承RxFragment,其余类似,然后使用bindUntilEvent或者bindToLifecycle

        Observable.interval(1000, TimeUnit.MILLISECONDS)
                .compose(bindUntilEvent(ActivityEvent.DESTROY)) //当前Activity执行到onDestroy时,Observable取消订阅
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Throwable {
                        Log.i(tag, "accept:" + aLong);
                    }
                });
        Observable.interval(1000, TimeUnit.MILLISECONDS)
                .compose(bindToLifecycle())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Throwable {
                        Log.i(tag, "accept:" + aLong);
                    }
                });

使用bindToLifecycle:
如果Observable在onCreate执行,那么当执行到onDestroy时取消订阅。
如果Observable在onStart执行,那么当执行到onStop时取消订阅。
如果Observable在onResume执行,那么当执行到onPause时取消订阅。

背压

上下游在不同的线程中,通过Observable发射数据流时,如果上游发射数据速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据会存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压问题。

订阅关系

  1. 同步订阅:观察者和被观察者在同一个线程工作。这样被观察者每发送一个事件,必须等到观察者接收和处理后,才能继续发送下一个事件。
  2. 异步订阅:观察者和被观察者不在同一线程工作。这样被观察者不需要等待观察者接收和处理后才能继续发送下一个事件,而是不断发送到缓冲区,直到事件完毕,此后观察者会从缓冲区取出事件。

Flowable

Flowable是背压策略实现的承载者

Subscriber这种订阅方式在第二次请求数据时就不会执行了,原因就是onCompleted后自动取消了订阅,Observer则不是

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull FlowableEmitter<Integer> emitter) throws Throwable {
                for (int i = 0; i < 10; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();

            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.i(tag, "onSubscribe");
                        //作用:决定观察者能够接收多少个事件,设置5说明观察者能够接收5个事件,多出的事件存放在缓存区,官方推荐使用Long.MAX_VALUE
                        s.request(5);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.i(tag, "onNext:" + integer);
                    }

                    @Override
                    public void one rror(Throwable t) {
                        Log.i(tag, "onError:" + t);
                    }

                    @Override
                    public void onComplete() {
                        Log.i(tag, "onComplete");
                    }
                });

背压策略模式

当缓存区满了,被观察者仍然继续发送下一个事件时,该如何处理的策略方式?

背压模式有如下几种类型:

  1. BackpressureStrategy.ERROR:直接抛出异常MissingBackpressureException
  2. BackpressureStrategy.MISSING:友好提示,缓存区满了
  3. BackpressureStrategy.DROP:超过缓存区大小(128)的事件丢弃
  4. BackpressureStrategy.BUFFER:将缓存区大小设置成无限大,这种可以接收超过原先缓存区大小(128)的事件数量
  5. BackpressureStrategy.LATEST:只保存最后的事件,超过缓存区大小(128)的事件丢弃,比如发送了150个事件,缓存区里会保存129个事件(第1-128 + 第150事件)

RxJava + Retrofit完成网络请求

public interface MyService {
    @GET("gallery/{imageType}/response")
    Observable<List<String>> getImages(@Path("imageType") String imageType);
}
        Retrofit retrofit = new Retrofit.Builder()
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(BASE_URL)
                .build();
        MyService service = retrofit.create(MyService.class);

        service.getImages("banner")
                .compose(bindToLifecycle())
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<List<String>>() {
                    @Override
                    public void accept(List<String> strings) throws Throwable {
                        //todo
                    }
                });
上一篇:全球及中国汽车扬声器粘合剂行业市场发展状况与竞争格局分析报告2022-2028年


下一篇:Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念 (三)