RxJS
13.1.1 什么是 RxJS ?
RxJS 是一个用于处理异步编程的 JavaScript 库,目标是使编写异步和基于回调的代码更容易。
13.1.2 为什么要学习 RxJS ?
就像 Angular 深度集成 TypeScript 一样,Angular 也深度集成了 RxJS。
服务、表单、事件、全局状态管理、异步请求 …
13.1.3 快速入门
-
可观察对象 ( Observable ) :类比 Promise 对象,内部可以用于执行异步代码,通过调用内部提供的方法将异步代码执行的结果传递到可观察对象外部。
-
观察者 ( Observer ):类比 then 方法中的回调函数,用于接收可观察对象中传递出来数据。
-
订阅 ( subscribe ):类比 then 方法,通过订阅将可观察对象和观察者连接起来,当可观察对象发出数据时,订阅者可以接收到数据。
import { Observable } from "rxjs"
const observable = new Observable(function (observer) {
setTimeout(function () {
observer.next({
name: "Tom"
})
}, 2000)
})
const observer = {
next: function (value) {
console.log(value)
}
}
observable.subscribe(observer)
13.2 可观察对象
13.2.1 Observable
-
在 Observable 对象内部可以多次调用 next 方法向外发送数据。
const observable = new Observable(function (observer) { let index = 0 setInterval(function () { observer.next(index++) }, 1000) }) const observer = { next: function (value) { console.log(value) } } observable.subscribe(observer)
-
当所有数据发送完成以后,可以调用 complete 方法终止数据发送。
const observable = new Observable(function (observer) { let index = 0 let timer = setInterval(function () { observer.next(index++) if (index === 3) { observer.complete() clearInterval(timer) } }, 1000) }) const observer = { next: function (value) { console.log(value) }, complete: function () { console.log("数据发送完成") } } observable.subscribe(observer)
-
当 Observable 内部逻辑发生错误时,可以调用 error 方法将失败信息发送给订阅者,Observable 终止。
import { Observable } from "rxjs" const observable = new Observable(function (observer) { let index = 0 let timer = setInterval(function () { observer.next(index++) if (index === 3) { observer.error("发生错误") clearInterval(timer) } }, 1000) }) const observer = { next: function (value) { console.log(value) }, error: function (error) { console.log(error) } } observable.subscribe(observer)
- 可观察对象是惰性的,只有被订阅后才会执行
const observable = new Observable(function () { console.log("Hello RxJS") }) // observable.subscribe()
-
可观察对象可以有 n 多订阅者,每次被订阅时都会得到执行
const observable = new Observable(function () { console.log("Hello RxJS") }) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe()
-
取消订阅
import { interval } from "rxjs" const obs = interval(1000); const subscription = obs.subscribe(console.log); setTimeout(function () { subscription.unsubscribe() }, 2000)
13.2.2 Subject
-
用于创建空的可观察对象,在订阅后不会立即执行,next 方法可以在可观察对象外部调用
import { Subject } from "rxjs" const demoSubject = new Subject() demoSubject.subscribe({next: function (value) {console.log(value)}}) demoSubject.subscribe({next: function (value) {console.log(value)}}) setTimeout(function () { demoSubject.next("hahaha") }, 3000)
13.2.3 BehaviorSubject
拥有 Subject 全部功能,但是在创建 Obervable 对象时可以传入默认值,观察者订阅后可以直接拿到默认值。
import { BehaviorSubject } from "rxjs"
const demoBehavior = new BehaviorSubject("默认值")
demoBehavior.subscribe({next: function (value) {console.log(value)}})
demoBehavior.next("Hello")
13.2.3 ReplaySubject
功能类似 Subject,但有新订阅者时两者处理方式不同,Subject 不会广播历史结果,而 ReplaySubject 会广播所有历史结果。
import { ReplaySubject } from "rxjs"
const rSubject = new ReplaySubject()
rSubject.subscribe(value => {
console.log(value)
})
rSubject.next("Hello 1")
rSubject.next("Hello 2")
setTimeout(function () {
rSubject.subscribe({next: function (value) {console.log(value)}})
}, 3000)
13.3 辅助方法
13.3.1 range
range(start, length),调用方法后返回 observable 对象,被订阅后会发出指定范围的数值。
import { range } from "rxjs"
range(0, 5).subscribe(n => console.log(n))
// 0
// 1
// 2
// 3
// 4
方法内部并不是一次发出 length 个数值,而是发送了 length 次,每次发送一个数值,就是说内部调用了 length 次 next 方法。
13.3.2 of
将参数列表作为数据流返回。
of("a", "b", [], {}, true, 20).subscribe(v => console.log(v))
13.3.3 from
将 Array,Promise, Iterator 转换为 observable 对象。
from(["a", "b", "c"]).subscribe(v => console.log(v))
// a
// b
// c
import { from } from "rxjs"
function p() {
return new Promise(function (resolve) {
resolve([100, 200])
})
}
from(p()).subscribe(v => console.log(v))
// [100, 200]
13.3.4 interval、timer
Interval:每隔一段时间发出一个数值,数值递增
import { interval } from "rxjs"
interval(1000).subscribe(n => console.log(n))
**timer **:间隔时间过去以后发出数值,行为终止,或间隔时间发出数值后,继续按第二个参数的时间间隔继续发出值
import { timer } from "rxjs"
timer(2000).subscribe(n => console.log(n))
timer(0, 1000).subscribe(n => console.log(n))
13.3.5 Concat
合并数据流,先让第一个数据流发出值,结束后再让第二个数据流发出值,进行整体合并。
import { concat, range } from "rxjs"
concat(range(1, 5), range(6, 5)).subscribe(console.log)
13.3.7 combineLatest
将两个 Obserable 中最新发出的数据流进行组合成新的数据流,以数组的形式发出。和当前最新的进行组合。
import { combineLatest, timer } from "rxjs"
const firstTimer = timer(0, 1000) // emit 0, 1, 2... after every second, starting from now
const secondTimer = timer(500, 1000) // emit 0, 1, 2... after every second, starting 0,5s from now
combineLatest(firstTimer, secondTimer).subscribe(console.log)
// [0, 0] after 0.5s
// [1, 0] after 1s
// [1, 1] after 1.5s
// [2, 1] after 2s
13.3.8 zip
将多个 Observable 中的数据流进行组合。和将来最新的进行组合。
import { zip, of } from "rxjs"
import { map } from "rxjs/operators"
let age = of(27, 25, 29)
let name = of("Foo", "Bar", "Beer")
let isDev = of(true, true, false)
zip(name, age, isDev)
.pipe(map(([name, age, isDev]) => ({ name, age, isDev })))
.subscribe(console.log)
// { name: 'Foo', age: 27, isDev: true }
// { name: 'Bar', age: 25, isDev: true }
// { name: 'Beer', age: 29, isDev: false }
13.3.9 forkJoin
forkJoin 是 Rx 版本的 Promise.all(),即表示等到所有的 Observable 都完成后,才一次性返回值。
import axios from "axios"
import { forkJoin, from } from "rxjs"
axios.interceptors.response.use(response => response.data)
forkJoin({
goods: from(axios.get("http://localhost:3005/goods")),
category: from(axios.get("http://localhost:3005/category"))
}).subscribe(console.log)
13.3.10 throwError
返回可观察对象并向订阅者抛出错误。
import { throwError } from "rxjs"
throwError("发生了未知错误").subscribe({ error: console.log })
13.3.11 retry
如果 Observable 对象抛出错误,则该辅助方法会重新订阅 Observable 以获取数据流,参数为重新订阅次数。
import { interval, of, throwError } from "rxjs"
import { mergeMap, retry } from "rxjs/operators"
interval(1000)
.pipe(
mergeMap(val => {
if (val > 2) {
return throwError("Error!")
}
return of(val)
}),
retry(2)
)
.subscribe({
next: console.log,
error: console.log
})
13.3.12 race
接收并同时执行多个可观察对象,只将最快发出的数据流传递给订阅者。
import { race, timer } from "rxjs"
import { mapTo } from "rxjs/operators"
const obs1 = timer(1000).pipe(mapTo("fast one"))
const obs2 = timer(3000).pipe(mapTo("medium one"))
const obs3 = timer(5000).pipe(mapTo("slow one"))
race(obs3, obs1, obs2).subscribe(console.log)
13.3.13 fromEvent
将事件转换为 Observable。
import { fromEvent } from "rxjs"
const btn = document.getElementById("btn")
// 可以将 Observer 简写成一个函数,表示 next
fromEvent(btn, "click").subscribe(e => console.log(e))
13.4 操作符
- 数据流:从可观察对象内部输出的数据就是数据流,可观察对象内部可以向外部源源不断的输出数据。
- 操作符:用于操作数据流,可以对象数据流进行转换,过滤等等操作。
13.4.1 map、mapTo
**map:**对数据流进行转换,基于原有值进行转换。
import { interval } from "rxjs"
import { map } from "rxjs/operators"
interval(1000)
.pipe(map(n => n * 2))
.subscribe(n => console.log(n))
**mapTo:**对数据流进行转换,不关心原有值,可以直接传入要转换后的值。
import { interval } from "rxjs"
import { mapTo } from "rxjs/operators"
interval(1000)
.pipe(mapTo({ msg: "接收到了数据流" }))
.subscribe(msg => console.log(msg))
13.4.2 filter
对数据流进行过滤。
import { range } from "rxjs"
import { filter } from "rxjs/operators"
range(1, 10)
.pipe(filter(n => n % 2 === 0))
.subscribe(even => console.log(even))
13.4.3 pluck
获取数据流对象中的属性值。
import { interval } from "rxjs"
import { pluck, mapTo } from "rxjs/operators"
interval(1000)
.pipe(
mapTo({ name: "张三", a: { b: "c" } }),
pluck("a", "b")
)
.subscribe(n => console.log(n))
13.4.4 first
获取数据流中的第一个值或者查找数据流中第一个符合条件的值,类似数组中的 find 方法。获取到值以后终止行为。
import { interval } from "rxjs"
import { first } from "rxjs/operators"
interval(1000)
.pipe(first())
.subscribe(n => console.log(n))
interval(1000)
.pipe(first(n => n === 3))
.subscribe(n => console.log(n))
13.4.5 startWith
创建一个新的 observable 对象并将参数值发送出去,然后再发送源 observable 对象发出的值。
在异步编程中提供默认值的时候非常有用。
import { interval } from "rxjs"
import { map, startWith } from "rxjs/operators"
interval(1000)