前言:
昨天的两篇介绍了 RxJava 的创建类操作符和延时类操作符,今天笔者记录一下 转换类 操作符,不太监了 ,开始记笔记。对了具体不太清楚转换过程的 可以去拜读 这位大佬 Season_zlc 关于 map 等操作符的介绍 链接为:给初学者的RxJava2.0教程(三) 。
正文:
1、map 操作符
1)、作用
将观察者 发射的 数据 从发射类型 转换为 其它类型
多应用于数据类型转换
2)、代码
/**
* map 操作符
*/
private void mapMethod() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(111);
emitter.onNext(222);
emitter.onNext(333);
emitter.onNext(444);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "将 Integer 类型数据转换为 String 类型数据:"+integer;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
logDUtils("onSubscribe:");
}
@Override
public void onNext(String s) {
logDUtils("onNext:" + s);
}
@Override
public void onError(Throwable e) {
logDUtils("onError:"+e.getMessage());
}
@Override
public void onComplete() {
logDUtils("onComplete:");
}
});
}
3)、效果
被观察对象 在 map 操作符内 进行相应的转换 在观察者对象 得到 新的数据类型
2、FlatMap 操作符
1)、作用
将被观察者 发射的指定类型的一个数据 转换或拆分为 多个类型数据 发射给观察者的;
拆分之后的 数据 被重组为 一个新的被观察者 发射给 观察者;
新组建的被观察者 为无序。
2)、代码
/**
* FlatMap 操作符
*/
@SuppressLint("CheckResult")
private void flatMapMethod() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(111);
emitter.onNext(222);
emitter.onNext(333);
emitter.onNext(444);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("从被观察者接收到的数据" + integer);
}
//为了验证 无序性 添加了 100 毫秒 的延迟操作
return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
logDUtils("accept:" + s);
}
});
}
3)、效果
3、concatMap 操作符
1)、作用
作用与 FlatMap 操作符相同 ,只是保证了有序性
2)、代码
/**
* ConcatMap 操作符
*/
@SuppressLint("CheckResult")
private void concatMapMethod() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(111);
emitter.onNext(222);
emitter.onNext(333);
emitter.onNext(444);
}
}).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("从被观察者接收到的数据" + integer);
}
//为了验证 有序性 添加了 100 毫秒 的延迟操作
return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
logDUtils("accept:" + s);
}
});
}
3)、效果
4、buffer 操作符
1)、作用
从被观察者内 拿到指定长度 的数据 缓存到缓存区内 打包一起发射给 观察者,当发射数据不够指定长度 则不再进行缓存。
2)、代码
/**
* Buffer 操作符
*/
@SuppressLint("CheckResult")
private void bufferMethod() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(111);
emitter.onNext(111);
emitter.onNext(222);
emitter.onNext(222);
emitter.onNext(333);
emitter.onNext(333);
emitter.onNext(444);
emitter.onNext(444);
}
}).buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
logDUtils("accept:" + integers);
}
});
}
3)、效果
buffer 操作符还有另一个常用的方法
//参数:缓存数量 步长:从第一个还是计算,指定数据数量之后 再次进行缓存 可以取样用
public final Observable<List<T>> buffer(int count, int skip)
代码
/**
* Buffer 操作符
*/
@SuppressLint("CheckResult")
private void bufferMethod() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(111);
emitter.onNext(111);
emitter.onNext(222);
emitter.onNext(222);
emitter.onNext(333);
emitter.onNext(333);
emitter.onNext(444);
emitter.onNext(444);
}
}).buffer(2,3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
logDUtils("accept:" + integers);
}
});
}
效果