RxJava操作符系列三(下)

RxJava操作符系列三(下)

接上文

Take

Take操作符可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。


  1. Observable.range(1,8) 
  2.  
  3.           .take(4) 
  4.  
  5.           .subscribe(new Subscriber<Integer>() { 
  6.  
  7.         @Override 
  8.  
  9.         public void onNext(Integer item) { 
  10.  
  11.            Log.e(TAG, "Next: " + item); 
  12.  
  13.         } 
  14.  
  15.   
  16.  
  17.         @Override 
  18.  
  19.         public void onError(Throwable error) { 
  20.  
  21.             Log.e(TAG, "Error: " + error.getMessage()); 
  22.  
  23.         } 
  24.  
  25.   
  26.  
  27.         @Override 
  28.  
  29.         public void onCompleted() { 
  30.  
  31.            Log.e(TAG, "complete."); 
  32.  
  33.         } 
  34.  
  35.     });  

输出日志信息


  1. Next: 1 
  2.  
  3. Next: 2 
  4.  
  5. Next: 3 
  6.  
  7. Next: 4 
  8.  
  9. complete  

take和skip一样也有其它两个重载方法take(long time, TimeUnit unit),take(long time, TimeUnit unit, Scheduler scheduler),默认在computation调度器上执行。

take还有变体操作符TakeLast,takeLastBuffer具体执行效果可自行代码。

Debounce

该操作符指的是过了一段指定的时间还没发射数据时才发射一个数据,听着可能有点绕。你可以理解对源Observable间隔期产生的结果进行过滤,如果在这个规定的间隔期内没有别的结果产生,则将这个结果提交给订阅者,否则忽略该结果,原理有点像光学防抖

上代码


  1. Observable.range(1,8) 
  2.  
  3.           .take(4) 
  4.  
  5.           .subscribe(new Subscriber<Integer>() { 
  6.  
  7.         @Override 
  8.  
  9.         public void onNext(Integer item) { 
  10.  
  11.            Log.e(TAG, "Next: " + item); 
  12.  
  13.         } 
  14.  
  15.   
  16.  
  17.         @Override 
  18.  
  19.         public void onError(Throwable error) { 
  20.  
  21.             Log.e(TAG, "Error: " + error.getMessage()); 
  22.  
  23.         } 
  24.  
  25.   
  26.  
  27.         @Override 
  28.  
  29.         public void onCompleted() { 
  30.  
  31.            Log.e(TAG, "complete."); 
  32.  
  33.         } 
  34.  
  35.     });  

输出信息


  1. onNext: 4 
  2.  
  3. onNext: 5 
  4.  
  5. onNext: 6 
  6.  
  7. onNext: 7 
  8.  
  9. onNext: 8 
  10.  
  11. onNext: 9 
  12.  
  13. onCompleted:  

这个输出数据不一定一样,有可能从5开始。

Distinct

这个比较好理解,它就是过滤掉重复的数据,只允许还没有发射过的数据项通过。

示例代码


  1. Observable.just(0, 0, 6, 4, 2, 8, 2, 1, 9, 0) 
  2.  
  3.                 .distinct() 
  4.  
  5.                 .subscribe(new Subscriber<Integer>() { 
  6.  
  7.                     @Override 
  8.  
  9.                     public void onCompleted() { 
  10.  
  11.                         Log.e(TAG, "onCompleted:Distinct "); 
  12.  
  13.                     } 
  14.  
  15.   
  16.  
  17.                     @Override 
  18.  
  19.                     public void onError(Throwable e) { 
  20.  
  21.                         Log.e(TAG, "onError:Distinct "); 
  22.  
  23.                     } 
  24.  
  25.   
  26.  
  27.                     @Override 
  28.  
  29.                     public void onNext(Integer integer) { 
  30.  
  31.                         Log.e(TAG, "onNext:Distinct " + integer); 
  32.  
  33.                     } 
  34.  
  35.                 });  

输出日志信息


  1. onNext:Distinct 0 
  2.  
  3. onNext:Distinct 6 
  4.  
  5. onNext:Distinct 4 
  6.  
  7. onNext:Distinct 2 
  8.  
  9. onNext:Distinct 8 
  10.  
  11. onNext:Distinct 1 
  12.  
  13. onNext:Distinct 9 
  14.  
  15. onCompleted:Distinct  

ElementAt

该操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。给它传递一个基于0的索引值,它会发射原始Observable数据序列对应索引位置的值,如果你传递给elementAt的值为4,那么它会发射第5项的数据。如下示例代码


  1. Observable.just(0, 0, 6, 4, 2, 8, 2, 1, 9, 0) 
  2.  
  3.                 .elementAt(4) 
  4.  
  5.                 .subscribe(new Subscriber<Integer>() { 
  6.  
  7.                     @Override 
  8.  
  9.                     public void onCompleted() { 
  10.  
  11.                         Log.e(TAG, "onCompleted:ElementAt "); 
  12.  
  13.                     } 
  14.  
  15.   
  16.  
  17.                     @Override 
  18.  
  19.                     public void onError(Throwable e) { 
  20.  
  21.                         Log.e(TAG, "onError:ElementAt "); 
  22.  
  23.                     } 
  24.  
  25.   
  26.  
  27.                     @Override 
  28.  
  29.                     public void onNext(Integer integer) { 
  30.  
  31.                         Log.e(TAG, "onNext:ElementAt " + integer); 
  32.  
  33.                     } 
  34.  
  35.                 });  

输出日志信息


  1. onNext:ElementAt 2 
  2.  
  3. onCompleted:ElementAt  

IgnoreElements

操作符抑制原始Observable发射的所有数据,只允许它的终止通知(onError或onCompleted)通过,使用该操作符onNext()方法不会执行。


  1. Observable.just(1, 2, 3).ignoreElements().subscribe(new Subscriber() { 
  2.  
  3.            @Override 
  4.  
  5.            public void onCompleted() { 
  6.  
  7.                Log.e(TAG, "onCompleted"); 
  8.  
  9.            } 
  10.  
  11.  
  12.  
  13.            @Override 
  14.  
  15.            public void onError(Throwable e) { 
  16.  
  17.                Log.e(TAG, "onError"); 
  18.  
  19.            } 
  20.  
  21.  
  22.  
  23.            @Override 
  24.  
  25.            public void onNext(Integer integer) { 
  26.  
  27.                Log.e(TAG, "onNext"); 
  28.  
  29.            } 
  30.  
  31.        });  

执行后只会输出onCompleted。这个操作符效果就如同empty()方法创建一个空的Observable,只会执行onCompleted()方法,不同的是ignoreElements是对数据源的处理,而empty()是创建Observable。






本文作者:佚名
来源:51CTO
上一篇:Mysql 多主一从数据备份


下一篇:RxJava操作符系列三(上)