基础用法
compositeDisposable = new CompositeDisposable();
List<String> list = new ArrayList<>();
list.add("hello");
list.add("world");
list.add("ni");
list.add("hao");
Disposable disposable = Observable.fromIterable(list)
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
compositeDisposable.add(disposable);
日志输出如下, 从日志中可以看到, 是在主线程中运行的
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:hello
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:world
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:ni
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onNext:hao
2021-05-31 15:04:17.985 21851-21851/com.plbear.lxc E/imlog: onComplete
改到子线程中
Disposable disposable = Observable.fromIterable(list)
.observeOn(Schedulers.io())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
输出
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:hello
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:world
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:ni
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onNext:hao
2021-05-31 15:16:58.204 26646-30647/com.plbear.lxc E/imlog: onComplete
map
Disposable disposable = Observable.fromIterable(list)
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtilsKt.logcat("map:"+s);
return s + "map after";
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
我们上面通过map对结果进行了转换, 并修改运行的线程, 看下输出
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:world
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:21:06.277 31109-2644/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:hellomap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:worldmap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:nimap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onNext:haomap after
2021-05-31 15:21:06.311 31109-31109/com.plbear.lxc E/imlog: onComplete
flatMap
Disposable disposable = Observable.fromIterable(list)
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtilsKt.logcat("map:"+s);
return s + "map after";
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s + " flatMap");
}
})
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
看输出
2021-05-31 15:27:38.678 3165-3165/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:world
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:27:38.679 3165-3165/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:hellomap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:worldmap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:nimap after flatMap
2021-05-31 15:27:38.679 3165-6960/com.plbear.lxc E/imlog: onNext:haomap after flatMap
2021-05-31 15:27:38.680 3165-6960/com.plbear.lxc E/imlog: onComplete
repeat
重复发射多次
Disposable disposable = Observable.fromIterable(list)
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtilsKt.logcat("map:"+s);
return s + " -map ";
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s + "-flatMap");
}
})
.repeat(2)
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:world
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:36:28.582 5157-5157/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:36:28.583 5157-10063/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:world
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:36:28.591 5157-5157/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:36:28.591 5157-10064/com.plbear.lxc E/imlog: onComplete
repeatWhen
Disposable disposable = Observable.fromIterable(list)
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtilsKt.logcat("map:"+s);
return s + " -map ";
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s + "-flatMap");
}
})
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
return objectObservable.delay(2, TimeUnit.SECONDS);
}
})
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
2021-05-31 15:41:09.082 8172-8172/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:world
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:41:09.083 8172-8172/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:41:09.083 8172-11269/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:41:09.084 8172-11269/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:41:11.086 8172-8172/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:world
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:41:11.087 8172-8172/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:41:11.087 8172-11269/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:41:11.088 8172-11269/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
从上面日志可以看到, repeatWhen会不断的重复执行.
delay
延迟发射
Disposable disposable = Observable.fromIterable(list)
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtilsKt.logcat("map:"+s);
return s + " -map ";
}
})
.observeOn(Schedulers.io())
.delay(1,TimeUnit.SECONDS)
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
return Observable.just(s + "-flatMap");
}
})
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:world
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:46:41.644 8178-8178/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:ni -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onNext:hao -map -flatMap
2021-05-31 15:46:42.645 8178-13359/com.plbear.lxc E/imlog: onComplete
retry
当发生时候后重试, 可以设置重试的次数
Disposable disposable = Observable.fromIterable(list)
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtilsKt.logcat("map:" + s);
return s + " -map ";
}
})
.observeOn(Schedulers.io())
.delay(1, TimeUnit.SECONDS)
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
if (s.contains("ni")) {
throw new RuntimeException("ni exception");
}
return Observable.just(s + "-flatMap");
}
})
.retry(2)
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:" + s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
2021-05-31 15:52:32.740 14532-14532/com.plbear.lxc E/imlog: apple
2021-05-31 15:52:34.974 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:34.975 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:35.977 14532-15969/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:35.977 14532-15969/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:35.980 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:36.981 14532-15973/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:36.983 14532-15973/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:36.984 14532-14532/com.plbear.lxc E/imlog: map:hello
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:world
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:ni
2021-05-31 15:52:36.985 14532-14532/com.plbear.lxc E/imlog: map:hao
2021-05-31 15:52:37.986 14532-15984/com.plbear.lxc E/imlog: onNext:hello -map -flatMap
2021-05-31 15:52:37.987 14532-15984/com.plbear.lxc E/imlog: onNext:world -map -flatMap
2021-05-31 15:52:37.987 14532-15984/com.plbear.lxc E/imlog: one rror
retryWhen
当遇到失败的时候,就一直重试
Disposable disposable = Observable.fromIterable(list)
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
LogUtilsKt.logcat("map:" + s);
return s + " -map ";
}
})
.observeOn(Schedulers.io())
.delay(1, TimeUnit.SECONDS)
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
if (s.contains("ni")) {
throw new RuntimeException("ni exception");
}
return Observable.just(s + "-flatMap");
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.delay(1, TimeUnit.SECONDS);
}
})
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext:" + s);
}
@Override
public void one rror(@NonNull Throwable e) {
LogUtilsKt.logcat("onError");
}
@Override
public void onComplete() {
LogUtilsKt.logcat("onComplete");
}
});
zip
合并多个请求
Disposable disposable = Observable.zip(Observable.fromIterable(list), Observable.fromIterable(list2), new BiFunction<String, String, String>() {
@NonNull
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
LogUtilsKt.logcat("apply:"+s+":"+s2);
return s + s;
}
}).subscribeWith(new DisposableObserver<String>(){
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext"+s);
}
@Override
public void one rror(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
zipWith
Disposable disposable = Observable.fromIterable(list)
.zipWith(Observable.fromIterable(list2), new BiFunction<String, String, String>() {
@NonNull
@Override
public String apply(@NonNull String o, @NonNull String s) throws Exception {
LogUtilsKt.logcat("apply:" + s + ":" + s);
return o + ":" + s;
}
})
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onNext(@NonNull String s) {
LogUtilsKt.logcat("onNext" + s);
}
@Override
public void one rror(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
输出
2021-05-31 19:37:32.812 28110-28110/com.plbear.lxc E/imlog: apple
2021-05-31 19:37:34.166 28110-28110/com.plbear.lxc E/imlog: apply:nihao:nihao
2021-05-31 19:37:34.166 28110-28110/com.plbear.lxc E/imlog: onNexthello:nihao