RxJava学习笔记

基础用法

        compositeDisposable = new CompositeDisposable();
        List<String> list = new ArrayList<>();
        list.add("hello");
        list.add("world");
        list.add("ni");
        list.add("hao");

        Disposable disposable = Observable.fromIterable(list)
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:"+s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });


        compositeDisposable.add(disposable);

日志输出如下, 从日志中可以看到, 是在主线程中运行的

2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:hello
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:world
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:ni
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:hao
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onComplete

改到子线程中

        Disposable disposable = Observable.fromIterable(list)
                .observeOn(Schedulers.io())
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:"+s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

输出

2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:hello
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:world
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:ni
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:hao
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onComplete

map

        Disposable disposable = Observable.fromIterable(list)
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtilsKt.logcat("map:"+s);
                        return s + "map after";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:"+s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

我们上面通过map对结果进行了转换, 并修改运行的线程, 看下输出

2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:world
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:hellomap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:worldmap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:nimap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:haomap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onComplete

flatMap

 Disposable disposable = Observable.fromIterable(list)
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtilsKt.logcat("map:"+s);
                        return s + "map after";
                    }
                })
                .observeOn(Schedulers.io())
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull String s) throws Exception {
                        return Observable.just(s + " flatMap");
                    }
                })
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:"+s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

看输出

2021-05-31 15:27:38.678 3165-3165/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:world
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:hellomap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:worldmap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:nimap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:haomap after flatMap
2021-05-31 15:27:38.680 3165-6960/com.plbear.lxc E/imlog: onComplete

 

repeat

重复发射多次

        Disposable disposable = Observable.fromIterable(list)
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtilsKt.logcat("map:"+s);
                        return s + " -map ";
                    }
                })
                .observeOn(Schedulers.io())
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull String s) throws Exception {
                        return Observable.just(s + "-flatMap");
                    }
                })
                .repeat(2)
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:"+s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:world
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:world
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onComplete

repeatWhen

        Disposable disposable = Observable.fromIterable(list)
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtilsKt.logcat("map:"+s);
                        return s + " -map ";
                    }
                })
                .observeOn(Schedulers.io())
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull String s) throws Exception {
                        return Observable.just(s + "-flatMap");
                    }
                })
                .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                        return objectObservable.delay(2, TimeUnit.SECONDS);
                    }
                })
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:"+s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

2021-05-31 15:41:09.082 8172-8172/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:world
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:41:09.084 8172-11269/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:41:11.086 8172-8172/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:world
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:41:11.087 8172-11269/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:hao -map -flatMap

从上面日志可以看到, repeatWhen会不断的重复执行.

delay

延迟发射

        Disposable disposable = Observable.fromIterable(list)
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtilsKt.logcat("map:"+s);
                        return s + " -map ";
                    }
                })
                .observeOn(Schedulers.io())
                .delay(1,TimeUnit.SECONDS)
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull String s) throws Exception {
                        return Observable.just(s + "-flatMap");
                    }
                })
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:"+s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:world
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onComplete

retry

当发生时候后重试, 可以设置重试的次数

        Disposable disposable = Observable.fromIterable(list)
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtilsKt.logcat("map:" + s);
                        return s + " -map ";
                    }
                })
                .observeOn(Schedulers.io())
                .delay(1, TimeUnit.SECONDS)
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull String s) throws Exception {
                        if (s.contains("ni")) {
                            throw new RuntimeException("ni exception");
                        }
                        return Observable.just(s + "-flatMap");
                    }
                })
                .retry(2)
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:" + s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

2021-05-31 15:52:32.740 14532-14532/com.plbear.lxc E/imlog: apple
2021-05-31 15:52:34.974 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:35.977 14532-15969/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:35.977 14532-15969/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:36.981 14532-15973/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:36.983 14532-15973/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:36.984 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:37.986 14532-15984/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:37.987 14532-15984/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:37.987 14532-15984/com.plbear.lxc E/imlog: one rror
 

retryWhen

当遇到失败的时候,就一直重试

        Disposable disposable = Observable.fromIterable(list)
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        LogUtilsKt.logcat("map:" + s);
                        return s + " -map ";
                    }
                })
                .observeOn(Schedulers.io())
                .delay(1, TimeUnit.SECONDS)
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull String s) throws Exception {
                        if (s.contains("ni")) {
                            throw new RuntimeException("ni exception");
                        }
                        return Observable.just(s + "-flatMap");
                    }
                })
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                        return throwableObservable.delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext:" + s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {
                        LogUtilsKt.logcat("onError");
                    }

                    @Override
                    public void onComplete() {
                        LogUtilsKt.logcat("onComplete");
                    }
                });

zip

合并多个请求

        Disposable disposable = Observable.zip(Observable.fromIterable(list), Observable.fromIterable(list2), new BiFunction<String, String, String>() {
            @NonNull
            @Override
            public String apply(@NonNull String s, @NonNull String s2) throws Exception {
                LogUtilsKt.logcat("apply:"+s+":"+s2);
                return s + s;
            }
        }).subscribeWith(new DisposableObserver<String>(){
            @Override
            public void onNext(@NonNull String s) {
                LogUtilsKt.logcat("onNext"+s);
            }

            @Override
            public void one rror(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

zipWith

        Disposable disposable = Observable.fromIterable(list)
                .zipWith(Observable.fromIterable(list2), new BiFunction<String, String, String>() {
                    @NonNull
                    @Override
                    public String apply(@NonNull String o, @NonNull String s) throws Exception {
                        LogUtilsKt.logcat("apply:" + s + ":" + s);
                        return o + ":" + s;
                    }
                })
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onNext(@NonNull String s) {
                        LogUtilsKt.logcat("onNext" + s);
                    }

                    @Override
                    public void one rror(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

输出 

2021-05-31 19:37:32.812 28110-28110/com.plbear.lxc E/imlog: apple
2021-05-31 19:37:34.166 28110-28110/com.plbear.lxc E/imlog: apply:nihao:nihao
2021-05-31 19:37:34.166 28110-28110/com.plbear.lxc E/imlog: onNexthello:nihao

 

 

 

 

 

上一篇:MySQL学习记录


下一篇:Wireshark中PIDs与网络包的关联