Rxjs - 常用操作符

本文总结Rxjs中的常用Operators

Pipe

pipe()的参数可以放任意的operators, operator在pipe中依次执行

map, mapTo

map 对源 observable 的每个值应用投射函数。

mapTo将每个发出值映射成常量。

 1 import { from } from 'rxjs';
 2 import { map, mapTo } from 'rxjs/operators';
 3 
 4 // 发出 (1,2,3,4,5)
 5 const source = from([1, 2, 3, 4, 5]);
 6 // 每个数字加10
 7 const example = source.pipe(map(val => val + 10));
 8 // 输出: 11,12,13,14,15
 9 const subscribe = example.subscribe(val => console.log(val));
10 
11 // 发出每个页面点击
12 const source = fromEvent(document, 'click');
13 // 将所有发出值映射成同一个值
14 const example = source.pipe(mapTo('GOODBYE WORLD!'));
15 // 输出: (click)'GOODBYE WORLD!'...
16 const subscribe = example.subscribe(val => console.log(val));

take, skip

 如果想基于某个逻辑或另一个 observable 来取任意数量的值,你可以 takeUntil 或 takeWhile

 take 与 skip 是相反的,它接收起始的N个值,而 skip 会跳过起始的N个值。

1 import { interval } from 'rxjs';
2 import { take } from 'rxjs/operators';
3 
4 // 每1秒发出值
5 const interval$ = interval(1000);
6 // 取前5个发出的值
7 const example = interval$.pipe(take(5));
8 // 输出: 0,1,2,3,4
9 const subscribe = example.subscribe(val => console.log(val));
1 import { skip } from 'rxjs/operators';
2 
3 // 每1秒发出值
4 const source = interval(1000);
5 // 跳过前5个发出值
6 const example = source.pipe(skip(5));
7 // 输出: 5...6...7...8........
8 const subscribe = example.subscribe(val => console.log(val));

 

of

按顺序发出任意数量的值

1 import { of } from 'rxjs';
2 // 依次发出提供的任意数量的值
3 const source = of(1, 2, 3, 4, 5);
4 // 输出: 1,2,3,4,5
5 const subscribe = source.subscribe(val => console.log(val));

 

from, fromEvent, fromPromise

from: 将数组、promise 或迭代器转换成 observable 

fromEvent: 从event生成observable

fromPromise:从promise生成observable

示例 1: 数组转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';

// 将数组作为值的序列发出
const arraySource = from([1, 2, 3, 4, 5]);
// 输出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
示例 2: promise 转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';

// 发出 promise 的结果
const promiseSource = from(new Promise(resolve => resolve('Hello World!')));
// 输出: 'Hello World'
const subscribe = promiseSource.subscribe(val => console.log(val));
示例 3: 集合转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';

// 使用 js 的集合
const map = new Map();
map.set(1, 'Hi');
map.set(2, 'Bye');

const mapSource = from(map);
// 输出: [1, 'Hi'], [2, 'Bye']
const subscribe = mapSource.subscribe(val => console.log(val));
示例 4: 字符串转换而来的 observable
// RxJS v6+
import { from } from 'rxjs';

// 将字符串作为字符序列发出
const source = from('Hello World');
// 输出: 'H','e','l','l','o',' ','W','o','r','l','d'
const subscribe = source.subscribe(val => console.log(val));

 

tap

透明地执行操作或副作用,比如打印日志。

 1 import { of } from 'rxjs';
 2 import { tap, map } from 'rxjs/operators';
 3 
 4 const source = of(1, 2, 3, 4, 5);
 5 // 使用 tap 透明地打印 source 中的值
 6 const example = source.pipe(
 7   tap(val => console.log(`BEFORE MAP: ${val}`)),
 8   map(val => val + 10),
 9   tap(val => console.log(`AFTER MAP: ${val}`))
10 );
11 
12 // 'tap' 并不转换值
13 // 输出: 11...12...13...14...15
14 const subscribe = example.subscribe(val => console.log(val));

 

delay

delay: 根据给定时间延迟发出值。

 1 import { of, merge } from 'rxjs';
 2 import { mapTo, delay } from 'rxjs/operators';
 3 
 4 // 发出一项
 5 const example = of(null);
 6 // 每延迟一次输出便增加1秒延迟时间
 7 const message = merge(
 8   example.pipe(mapTo('Hello')),
 9   example.pipe(
10     mapTo('World!'),
11     delay(1000)
12   ),
13   example.pipe(
14     mapTo('Goodbye'),
15     delay(2000)
16   ),
17   example.pipe(
18     mapTo('World!'),
19     delay(3000)
20   )
21 );
22 // 输出: 'Hello'...'World!'...'Goodbye'...'World!'
23 const subscribe = message.subscribe(val => console.log(val));

 

throttle, debounce

throttle:以某个时间间隔为阈值,在 durationSelector 完成前将抑制新值的发出

debounce:根据一个选择器函数,舍弃掉在两次输出之间小于指定时间的发出值。

这两个Operators可用于节流

 1 import { interval } from 'rxjs';
 2 import { throttle, debounce } from 'rxjs/operators';
 3 
 4 // 每1秒发出值
 5 const source = interval(1000);
 6 // 节流2秒后才发出最新值
 7 const example = source.pipe(throttle(val => interval(2000)));
 8 // 输出: 0...3...6...9
 9 const subscribe = example.subscribe(val => console.log(val));
10 
11 
12 // 发出四个字符串
13 const example 2= of('WAIT', 'ONE', 'SECOND', 'Last will display');
14 // 只有在最后一次发送后再经过一秒钟,才会发出值,并抛弃在此之前的所有其他值
15 const debouncedExample = example2.pipe(debounce(() => timer(1000)));
16 // 在这个示例中,所有的值都将被忽略,除了最后一个
17 // 输出: 'Last will display'
18 const subscribe = debouncedExample.subscribe(val => console.log(val));

 

concat, concatMap

observable 按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值。

 1 import { delay, concat } from 'rxjs/operators';
 2 import { of } from 'rxjs';
 3 
 4 // 发出 1,2,3
 5 const sourceOne = of(1, 2, 3);
 6 // 发出 4,5,6
 7 const sourceTwo = of(4, 5, 6);
 8 
 9 // 延迟3秒,然后发出
10 const sourceThree = sourceOne.pipe(delay(3000));
11 // sourceTwo 要等待 sourceOne 完成才能订阅
12 const example = sourceThree.pipe(concat(sourceTwo));
13 // 输出: 1,2,3,4,5,6
14 const subscribe = example.subscribe(val =>
15   console.log('Example: Delayed source one:', val)
16 );

 

merge, mergeMap

将多个 observables 转换成单个 observable 。

 1 import { mapTo } from 'rxjs/operators';
 2 import { interval, merge } from 'rxjs';
 3 
 4 // 每2.5秒发出值
 5 const first = interval(2500);
 6 // 每2秒发出值
 7 const second = interval(2000);
 8 // 每1.5秒发出值
 9 const third = interval(1500);
10 // 每1秒发出值
11 const fourth = interval(1000);
12 
13 // 从一个 observable 中发出输出值
14 const example = merge(
15   first.pipe(mapTo('FIRST!')),
16   second.pipe(mapTo('SECOND!')),
17   third.pipe(mapTo('THIRD')),
18   fourth.pipe(mapTo('FOURTH'))
19 );
20 // 输出: "FOURTH", "THIRD", "SECOND!", "FOURTH", "FIRST!", "THIRD", "FOURTH"
21 const subscribe = example.subscribe(val => console.log(val));

 

concatMap, mergeMap, switchMap, exhaustMap

concatMap: 组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。

Rxjs - 常用操作符

 

mergeMap: 将每个源值投射成 Observable ,该 Observable 会合并到输出 Observable 中。

Rxjs - 常用操作符 

注意 concatMap 和 mergeMap 之间的区别。 因为 concatMap 之前前一个内部 observable 完成后才会订阅下一个, source 中延迟 2000ms 值会先发出。 对比的话, mergeMap 会立即订阅所有内部 observables, 延迟少的 observable (1000ms) 会先发出值,然后才是 2000ms 的 observable 。

 1 import { of } from 'rxjs';
 2 import { concatMap, delay, mergeMap } from 'rxjs/operators';
 3 
 4 // 发出延迟值
 5 const source = of(2000, 1000);
 6 // 将内部 observable 映射成 source,当前一个完成时发出结果并订阅下一个
 7 const example = source.pipe(
 8   concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
 9 );
10 // 输出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
11 const subscribe = example.subscribe(val =>
12   console.log(`With concatMap: ${val}`)
13 );
14 
15 // 展示 concatMap 和 mergeMap 之间的区别
16 const mergeMapExample = source
17   .pipe(
18     // 只是为了确保 meregeMap 的日志晚于 concatMap 示例
19     delay(5000),
20     mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
21   )
22   .subscribe(val => console.log(`With mergeMap: ${val}`));

switchMap: 将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中, 并且只发出最新投射的 Observable 中的值。

Rxjs - 常用操作符

 

exhaustMap: 映射成内部 observable,忽略其他值直到该 observable 完成

 

 

Rxjs - 常用操作符

 1 import { interval } from 'rxjs';
 2 import { exhaustMap, tap, take } from 'rxjs/operators';
 3 
 4 const firstInterval = interval(1000).pipe(take(10));
 5 const secondInterval = interval(1000).pipe(take(2));
 6 
 7 const exhaustSub = firstInterval
 8   .pipe(
 9     exhaustMap(f => {
10       console.log(`Emission Corrected of first interval: ${f}`);
11       return secondInterval;
12     })
13   )
14   /*
15     当我们订阅第一个 interval 时,它开始发出值(从0开始)。
16     这个值会映射成第二个 interval,然后它开始发出值(从0开始)。
17     当第二个 interval 出于激活状态时,第一个 interval 的值会被忽略。
18     我们可以看到 firstInterval 发出的数字为3,6,等等...
19 
20     输出:
21     Emission of first interval: 0
22     0
23     1
24     Emission of first interval: 3
25     0
26     1
27     Emission of first interval: 6
28     0
29     1
30     Emission of first interval: 9
31     0
32     1
33 */
34   .subscribe(s => console.log(s));

 

上一篇:【Worktile】小知识点记录-Rxjs


下一篇:Rxjs debounce 操作符在 SAP Spartacus 函数节流中的一个实际使用例子