本文总结Rxjs中的常用Operators
Pipe
pipe()的参数可以放任意的operators, operator在pipe中依次执行
map, mapTo
map 对源 observable 的每个值应用投射函数。
mapTo将每个发出值映射成常量。
1 import { from } from 'rxjs'; 2 import { map, mapTo } from 'rxjs/operators'; 3 4 // 发出 (1,2,3,4,5) 5 const source = from([1, 2, 3, 4, 5]); 6 // 每个数字加10 7 const example = source.pipe(map(val => val + 10)); 8 // 输出: 11,12,13,14,15 9 const subscribe = example.subscribe(val => console.log(val)); 10 11 // 发出每个页面点击 12 const source = fromEvent(document, 'click'); 13 // 将所有发出值映射成同一个值 14 const example = source.pipe(mapTo('GOODBYE WORLD!')); 15 // 输出: (click)'GOODBYE WORLD!'... 16 const subscribe = example.subscribe(val => console.log(val));
take, skip
如果想基于某个逻辑或另一个 observable 来取任意数量的值,你可以 takeUntil 或 takeWhile!
take
与 skip
是相反的,它接收起始的N个值,而 skip
会跳过起始的N个值。
1 import { interval } from 'rxjs'; 2 import { take } from 'rxjs/operators'; 3 4 // 每1秒发出值 5 const interval$ = interval(1000); 6 // 取前5个发出的值 7 const example = interval$.pipe(take(5)); 8 // 输出: 0,1,2,3,4 9 const subscribe = example.subscribe(val => console.log(val));
1 import { skip } from 'rxjs/operators'; 2 3 // 每1秒发出值 4 const source = interval(1000); 5 // 跳过前5个发出值 6 const example = source.pipe(skip(5)); 7 // 输出: 5...6...7...8........ 8 const subscribe = example.subscribe(val => console.log(val));
of
按顺序发出任意数量的值
1 import { of } from 'rxjs'; 2 // 依次发出提供的任意数量的值 3 const source = of(1, 2, 3, 4, 5); 4 // 输出: 1,2,3,4,5 5 const subscribe = source.subscribe(val => console.log(val));
from, fromEvent, fromPromise
from: 将数组、promise 或迭代器转换成 observable
fromEvent: 从event生成observable
fromPromise:从promise生成observable
示例 1: 数组转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';
// 将数组作为值的序列发出
const arraySource = from([1, 2, 3, 4, 5]);
// 输出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
示例 2: promise 转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';
// 发出 promise 的结果
const promiseSource = from(new Promise(resolve => resolve('Hello World!')));
// 输出: 'Hello World'
const subscribe = promiseSource.subscribe(val => console.log(val));
示例 3: 集合转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';
// 使用 js 的集合
const map = new Map();
map.set(1, 'Hi');
map.set(2, 'Bye');
const mapSource = from(map);
// 输出: [1, 'Hi'], [2, 'Bye']
const subscribe = mapSource.subscribe(val => console.log(val));
示例 4: 字符串转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';
// 将字符串作为字符序列发出
const source = from('Hello World');
// 输出: 'H','e','l','l','o',' ','W','o','r','l','d'
const subscribe = source.subscribe(val => console.log(val));
tap
透明地执行操作或副作用,比如打印日志。
1 import { of } from 'rxjs'; 2 import { tap, map } from 'rxjs/operators'; 3 4 const source = of(1, 2, 3, 4, 5); 5 // 使用 tap 透明地打印 source 中的值 6 const example = source.pipe( 7 tap(val => console.log(`BEFORE MAP: ${val}`)), 8 map(val => val + 10), 9 tap(val => console.log(`AFTER MAP: ${val}`)) 10 ); 11 12 // 'tap' 并不转换值 13 // 输出: 11...12...13...14...15 14 const subscribe = example.subscribe(val => console.log(val));
delay
delay: 根据给定时间延迟发出值。
1 import { of, merge } from 'rxjs'; 2 import { mapTo, delay } from 'rxjs/operators'; 3 4 // 发出一项 5 const example = of(null); 6 // 每延迟一次输出便增加1秒延迟时间 7 const message = merge( 8 example.pipe(mapTo('Hello')), 9 example.pipe( 10 mapTo('World!'), 11 delay(1000) 12 ), 13 example.pipe( 14 mapTo('Goodbye'), 15 delay(2000) 16 ), 17 example.pipe( 18 mapTo('World!'), 19 delay(3000) 20 ) 21 ); 22 // 输出: 'Hello'...'World!'...'Goodbye'...'World!' 23 const subscribe = message.subscribe(val => console.log(val));
throttle, debounce
throttle:以某个时间间隔为阈值,在 durationSelector
完成前将抑制新值的发出
debounce:根据一个选择器函数,舍弃掉在两次输出之间小于指定时间的发出值。
这两个Operators可用于节流
1 import { interval } from 'rxjs'; 2 import { throttle, debounce } from 'rxjs/operators'; 3 4 // 每1秒发出值 5 const source = interval(1000); 6 // 节流2秒后才发出最新值 7 const example = source.pipe(throttle(val => interval(2000))); 8 // 输出: 0...3...6...9 9 const subscribe = example.subscribe(val => console.log(val)); 10 11 12 // 发出四个字符串 13 const example 2= of('WAIT', 'ONE', 'SECOND', 'Last will display'); 14 // 只有在最后一次发送后再经过一秒钟,才会发出值,并抛弃在此之前的所有其他值 15 const debouncedExample = example2.pipe(debounce(() => timer(1000))); 16 // 在这个示例中,所有的值都将被忽略,除了最后一个 17 // 输出: 'Last will display' 18 const subscribe = debouncedExample.subscribe(val => console.log(val));
concat, concatMap
observable 按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值。
1 import { delay, concat } from 'rxjs/operators'; 2 import { of } from 'rxjs'; 3 4 // 发出 1,2,3 5 const sourceOne = of(1, 2, 3); 6 // 发出 4,5,6 7 const sourceTwo = of(4, 5, 6); 8 9 // 延迟3秒,然后发出 10 const sourceThree = sourceOne.pipe(delay(3000)); 11 // sourceTwo 要等待 sourceOne 完成才能订阅 12 const example = sourceThree.pipe(concat(sourceTwo)); 13 // 输出: 1,2,3,4,5,6 14 const subscribe = example.subscribe(val => 15 console.log('Example: Delayed source one:', val) 16 );
merge, mergeMap
将多个 observables 转换成单个 observable 。
1 import { mapTo } from 'rxjs/operators'; 2 import { interval, merge } from 'rxjs'; 3 4 // 每2.5秒发出值 5 const first = interval(2500); 6 // 每2秒发出值 7 const second = interval(2000); 8 // 每1.5秒发出值 9 const third = interval(1500); 10 // 每1秒发出值 11 const fourth = interval(1000); 12 13 // 从一个 observable 中发出输出值 14 const example = merge( 15 first.pipe(mapTo('FIRST!')), 16 second.pipe(mapTo('SECOND!')), 17 third.pipe(mapTo('THIRD')), 18 fourth.pipe(mapTo('FOURTH')) 19 ); 20 // 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH" 21 const subscribe = example.subscribe(val => console.log(val));
concatMap, mergeMap, switchMap, exhaustMap
concatMap: 组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。
mergeMap: 将每个源值投射成 Observable ,该 Observable 会合并到输出 Observable 中。
注意 concatMap
和 mergeMap
之间的区别。 因为 concatMap
之前前一个内部 observable 完成后才会订阅下一个, source 中延迟 2000ms 值会先发出。 对比的话, mergeMap
会立即订阅所有内部 observables, 延迟少的 observable (1000ms) 会先发出值,然后才是 2000ms 的 observable 。
1 import { of } from 'rxjs'; 2 import { concatMap, delay, mergeMap } from 'rxjs/operators'; 3 4 // 发出延迟值 5 const source = of(2000, 1000); 6 // 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个 7 const example = source.pipe( 8 concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val))) 9 ); 10 // 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms 11 const subscribe = example.subscribe(val => 12 console.log(`With concatMap: ${val}`) 13 ); 14 15 // 展示 concatMap 和 mergeMap 之间的区别 16 const mergeMapExample = source 17 .pipe( 18 // 只是为了确保 meregeMap 的日志晚于 concatMap 示例 19 delay(5000), 20 mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val))) 21 ) 22 .subscribe(val => console.log(`With mergeMap: ${val}`));
switchMap: 将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中, 并且只发出最新投射的 Observable 中的值。
exhaustMap: 映射成内部 observable,忽略其他值直到该 observable 完成
1 import { interval } from 'rxjs'; 2 import { exhaustMap, tap, take } from 'rxjs/operators'; 3 4 const firstInterval = interval(1000).pipe(take(10)); 5 const secondInterval = interval(1000).pipe(take(2)); 6 7 const exhaustSub = firstInterval 8 .pipe( 9 exhaustMap(f => { 10 console.log(`Emission Corrected of first interval: ${f}`); 11 return secondInterval; 12 }) 13 ) 14 /* 15 当我们订阅第一个 interval 时,它开始发出值(从0开始)。 16 这个值会映射成第二个 interval,然后它开始发出值(从0开始)。 17 当第二个 interval 出于激活状态时,第一个 interval 的值会被忽略。 18 我们可以看到 firstInterval 发出的数字为3,6,等等... 19 20 输出: 21 Emission of first interval: 0 22 0 23 1 24 Emission of first interval: 3 25 0 26 1 27 Emission of first interval: 6 28 0 29 1 30 Emission of first interval: 9 31 0 32 1 33 */ 34 .subscribe(s => console.log(s));