初识RxJava(七)功能类 操作符

前言

之前的几天写了 1,2,3,4,5,6 篇关于 RxJava 的操作符的基本用法的演示,琢磨琢磨了一下,感觉不太全,再补充点功能类的操作符,也就是常用的,但是不太在意的操作符。

正文

1、subscribe 操作符

1)、作用

订阅,连接观察者 & 被观察者,组成订阅关系

2)、代码
   /**
     * subscribe 操作符
     */
    @SuppressLint("CheckResult")
    private void subscribeMethod() {
        Observable.just(123, 234, 455, 677)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        logDUtils("accept:" + integer);
                    }
                });
    }

这里就不贴 效果了,有了 subscribe 操作符,才可以让 被观察者与观察者联系到一起。

2、线程调度类 subscribeOn 和 observeOn

1)、作用

subscribeOn :被观察者 的运行线程
observeOn : 观察者 的运行线程

两个操作符 内的参数:

Schedulers.immediate ( ) : 默认线程--------------> 当前线程 不指定是什么具体线程;
AndroidSchedulers.mainThread ( ) :主线程
Schedulers.newThread ( ) :常规新工作线程 非主线程
Schedulers.io ( ) :i o 操作符线程 -----------------> 多用于网络请求、读写文件等 io 密集型操作
Schedulers.computation( ) : cpu 计算线程

建议查看笔者仰慕的大佬 Season_zlc 的 给初学者的RxJava2.0教程(二) 这篇文章

2)、代码
/**
     * 线程调度类 操作符以及参数
     */
    @SuppressLint("CheckResult")
    private void threadMethod() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                logDUtils("create  thread 1:" + Thread.currentThread().getName());
                emitter.onNext("a");
                logDUtils("create  thread 2:" + Thread.currentThread().getName());
                emitter.onNext("b");
            }
            //先将被观察者 设置运行与 工作线程
        }).subscribeOn(Schedulers.io())
                //将 被观察者 运行在 主线程
                .subscribeOn(AndroidSchedulers.mainThread())
                //将 观察者 首先运行在 工作线程
                .observeOn(Schedulers.newThread())
                //将 观察者转换到 主线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        logDUtils("accept  thread:" + Thread.currentThread().getName() + "  打印数据: " + s);
                    }
                });
    }
3)、效果

初识RxJava(七)功能类  操作符

注意:

从效果内 可以看出 被观察者 对象 只能 设置 1 次,而 观察者所在线程 可以进行多次改变。

3、delay 操作符

1)、作用

将被观察者延迟一段时间再发送事件

2)、代码演示

    /**
     * delay 操作符
     */
    private void delayMethod() {
        Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        logDUtils("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        logDUtils("onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        logDUtils("onError" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        logDUtils("onComplete");
                    }
                });
    }
3)、效果

初识RxJava(七)功能类  操作符

其他 重载方法

// 参数:时间     时间单位
public final Observable<T> delay(long delay, TimeUnit unit) 

// 参数:  时间    时间单位    线程调度器
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)

// 参数: 时间    时间单位  错误延迟参数
public final Observable<T> delay(long delay, TimeUnit unit, boolean delayError) 

// 参数:时间  时间单位   线程调度器     错误延迟参数
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)

4、do 类 操作符

doOnEach:当被观察者每发送 1 次数据事件就会调用 1 次;
doOnNext:执行 onNext 事件前调用;
doAfterNext: 执行onNext事件后调用;
doOnComplete:被观察者正常发送事件完毕后调用;
doOnError: 被观察者发送错误事件时调用;
doOnSubscribe: 观察者订阅时调用;
doAfterTerminate:被观察者发送事件完 毕后调用,不管是正常终止还是非正常终止;
doFinally:最后执行
doOnUnsubscribe(): 取消订阅执行

5、onErrorReturn 操作符

1)、作用

当发生错误时, 发送一个约定好的事件, 保证正常停止。

2)、代码
 /**
     * onErrorReturn  操作符
     */
    private void onErrorReturnMethod() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("a");
                emitter.onError(new Throwable());
                emitter.onNext("b");
            }
        }).onErrorReturn(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) throws Exception {
                logDUtils("发送错误");
                return "c";
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                logDUtils("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                logDUtils("onNext:" + s);
            }

            @Override
            public void onError(Throwable e) {
                logDUtils("onError:" + e.getMessage());
            }

            @Override
            public void onComplete() {
                logDUtils("onComplete:");
            }
        });
    }
3)、效果

初识RxJava(七)功能类  操作符

当发送 onError 事件之后,后面的 b 并没有被发送,而是直接走 onErrorReturn 操作符发送 c ,结束程序。

6、onErrorResumeNext 操作符

作用

当发生错误的时候 发送 一个被观察者。 记住 与上面时有区别的哦!

7、onExceptionResumeNext 操作符

作用

当发生 异常 的时候 发送一个被观察者。记住这个与上面也不一样哈!

8、 retry 操作符

作用

当发生 错误 或者 异常时,重新让被观察者发送 一 次数据
重载方法 有 5个

public final Observable<T> retry() 

// 参数 = 重试次数
public final Observable<T> retry(long times)

 // 参数 = 判断逻辑
public final Observable<T> retry(Predicate<? super Throwable> predicate) 

// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)
public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)

// 参数 = 设置重试次数 & 判断逻辑
public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)

9、retryUntil 操作符

作用

出现错误后,判断是否需要重新发送数据
若需要,则持续进行重试操作 类似于 public final Observable<T> retry(Predicate<? super Throwable> predicate) 方法

10、retryWhen 操作符

作用

遇到错误时,将发生的错误传递给一个新的被观察者,并决定是否需要重新订阅 原 被观察者 并且发送 原 事件;

11、repeat 操作符

作用

无参 :表示 无条件重复 发送被观察 的事件
有参:表示 重复次数

12、repeatWhen 操作符

有条件的 进行 重复发送

好了,马上双十一, 筒子们,准备好购物车,一起来剁手吧,额,还是再添两本书吧。

上一篇:【开发环境】PyCharm 打开现有 Python 工程 ( 配置 Python 编译器版本 )(三)


下一篇:SpringFramework核心技术五:面向切面编程(AOP)和代理机制