RxJS简介
Reactive Extensions for JavaScript
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。
RxJS核心概念
- Observable (可观察对象):表示一个概念,这个概念是一个可调用的未来值或事件的集合。
- Observer (观察者):一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
- Subscription (订阅):表示 Observable 的执行,主要用于取消 Observable 的执行。
- Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
- Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
- Schedulers(调度器):用来控制并发并且是*集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。
Distinct(过滤操作符)
用来选择某个键的值以检查是否是不同的。
返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。
如果提供了 keySelector 函数,那么它会将源 Observable 的每个值都投射成一个新的值,这个值会用来检查是否与先前投射的值相等。如果没有提供 keySelector 函数,它会直接使用源 Observable 的每个值来检查是否与先前的值相等。
在支持 Set 的 JavaScript 运行时中,此操作符会使用 Set 来提升不同值检查的性能。
在其他运行时中,此操作符会使用 Set 的最小化实现,此实现在底层依赖于 Array 和 indexOf,因为要检查更多的值来进行区分,所以性能会降低。
即使是在新浏览器中,长时间运行的 distinct 操作也可能会导致内存泄露。为了在某种场景下来缓解这个问题,可以提供一个可选的 flushes 参数, 这样内部的 Set 可以被“清空”,基本上清除了它的所有值。
示例代码
import { of, Subscription } from 'rxjs'; import { distinct } from 'rxjs/operators'; import { UntilDestroy } from '@ngneat/until-destroy'; interface Person { age: number, name: string } @UntilDestroy({ arrayName: 'subscriptionSet' }) export class TestDistinct { private subscriptionSet: Subscription[] = []; constructor() { this.subscriptionSet.push(of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1) .pipe( distinct(), ) .subscribe(x => console.log(x))); // result: // 1, 2, 3, 4 this.subscriptionSet.push(of<Person>( { age: 4, name: 'Foo'}, { age: 7, name: 'Bar'}, { age: 5, name: 'Foo'}, ).pipe( distinct((p: Person) => p.name), ) .subscribe(x => console.log(x))); // result: // { age: 4, name: 'Foo' } // { age: 7, name: 'Bar' } } }
distinctUntilChanged (过滤操作符)
用来检验当前项与源中的前一项是否相同。
返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。
如果提供了 compare 函数,那么每一项都会调用它来检验是否应该发出这个值。
如果没有提供 compare 函数,默认使用相等检查。
示例代码
示例中InputDebounceDirective是为了解决两个问题
- 用户每输入一个字符就会触发搜索请求,浪费资源;
- 两次keyup 事件可能产生一样的value值;
例如用户输入了123,会触发搜索请求。用户再次输入了1234,又快速删除了4,还是会会触发和上一次相同的搜索请求);
ts代码
import { Directive, ElementRef, Output, EventEmitter } from '@angular/core'; import { fromEvent, Subscription } from 'rxjs'; import { debounceTime, distinctUntilChanged } from 'rxjs/operators'; import { UntilDestroy } from '@ngneat/until-destroy'; import * as _ from 'lodash'; @UntilDestroy({ arrayName: 'subscriptionSet' }) @Directive({ selector: '[inputDebounce]' }) export class InputDebounceDirective { private subscriptionSet: Subscription[] = []; constructor(el: ElementRef) { this.subscriptionSet.push(fromEvent(el.nativeElement, 'input') .pipe(debounceTime(500), distinctUntilChanged()) .subscribe(this.getInputValue)); } getInputValue = (value: string) => { this.inputDebounceChange.emit(value); }; }
Html代码
<input type="text" inputDebounce [(ngModel)]="searchText" [ngModelOptions]="{ standalone: true }" (inputDebounceChange)="handleSearchText()" [placeholder]="'Search'">
Filter (过滤操作符)
通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。
评估源 Observable 所发出的每个值的函数。如果它返回 true,就发出值,如果是 false 则不会传给输出 Observable 。index 参数是自订阅开始后发送序列的索引,是从 0 开始的。
代码示例
示例中表单只有通过验证才输出内容。
Html代码
<form [formGroup]="userForm"> <div class="form-group"> <label>Name *</label> <input name="testName" formControlName="user.name"/> <label>Age *</label> <input name="testAge" formControlName="user.age"/> <label>Gender *</label> <input name="testGender" formControlName="user.gender"/> </div> </form>
Ts代码
this.userForm = new FormGroup({ 'user.name': new FormControl(this.user.name, [Validators.required]), 'user.age': new FormControl(this.user.age, [Validators.required]), 'user.gender': new FormControl(this.user.gender, [Validators.required]), }); this.userForm.valueChanges .pipe( filter(() => this. userForm.valid) ) .subscribe(res => console.log(res));