rxjs笔记部分随笔,权威地址:https://rxjs-cn.github.io/learn-rxjs-operators/operators/
Import {subject}from’rxjs’
Subject 数据的订阅与分发,结合报刊的发布与订阅进行功能的模拟,subject即是observeable对象也是observer对象,subject对于后期没有数据更新时所添加的订阅者是不怎么友好的,因为不跟新数据时订阅者就不在收到返回的数值
const interval$ = interval(1000).pipe(take(10));
const subject = new Subject();
const observerA = {
next: value => console.log('Observer A get value: ' + value),
error: error => console.log('Observer A error: ' + error),
complete: () => console.log('Observer A complete!'),
};
const observerB = {
next: value => console.log('Observer B get value: ' + value),
error: error => console.log('Observer B error: ' + error),
complete: () => console.log('Observer B complete!'),
};
subject.subscribe(observerA); // 添加观察者A
interval$.subscribe(subject); // 订阅interval$对象
setTimeout(() => {
subject.subscribe(observerB); // 添加观察者B
}, 1000);
Import{BehaviorSubject}from’rxjs’;
behaviorSubject 是subject的变种,最大的区别就是 behaviorSubject是用于保存最新的数值,而不是单纯的发送事件,会将最后一次发送的值作为当前值保存在内部属性中。
const subject = new BehaviorSubject(0); //BehaviorSubject小括号0代表的是状态
const observerA = {
next: value => console.log('Observer A get value: ' + value),
error: error => console.log('Observer A error: ' + error),
complete: () => console.log('Observer A complete!'),
};
const observerB = {
next: value => console.log('Observer B get value: ' + value),
error: error => console.log('Observer B error: ' + error),
complete: () => console.log('Observer B complete!'),
};
subject.subscribe(observerA); // 添加观察者A
// interval$.subscribe(subject); // 订阅interval$对象
subject.next(1);
subject.next(2);
subject.next(3);
setTimeout(() => {
subject.subscribe(observerB); // 添加观察者B
}, 1000);
Import {ReplaySubject}from’rxjs’;
ReplaySubject 用于重复发送最近几次的值给订阅者
const subject = new ReplaySubject(2); //ReplaySubject后的2为最后两次发送的数值
const observerA = {
next: value => console.log('Observer A get value: ' + value),
error: error => console.log('Observer A error: ' + error),
complete: () => console.log('Observer A complete!'),
};
const observerB = {
next: value => console.log('Observer B get value: ' + value),
error: error => console.log('Observer B error: ' + error),
complete: () => console.log('Observer B complete!'),
};
subject.subscribe(observerA); // 添加观察者A
// interval$.subscribe(subject); // 订阅interval$对象
subject.next(1);
subject.next(2);
subject.next(3);
setTimeout(() => {
subject.subscribe(observerB); // 添加观察者B
}, 1000);
Import{AsyncSubject}from’rxjs’;
AsyncSubject他会在subject完成后才返回一个值
const subject = new AsyncSubject();
const observerA = {
next: value => console.log('Observer A get value: ' + value),
error: error => console.log('Observer A error: ' + error),
complete: () => console.log('Observer A complete!'),
};
const observerB = {
next: value => console.log('Observer B get value: ' + value),
error: error => console.log('Observer B error: ' + error),
complete: () => console.log('Observer B complete!'),
};
subject.subscribe(observerA); // 添加观察者A
// interval$.subscribe(subject); // 订阅interval$对象
subject.next(1);
subject.next(2);
subject.next(3);
subject.complete();
setTimeout(() => {
subject.subscribe(observerB); // 添加观察者B
}, 1000);
Import {Observable}from ‘rxjs’
Create 创建在订阅函数中发出 'wocao'和 'woqu'的 observable 。
const obs = Observable.create(observer => {
observer.next('wocao');
observer.next('woqu');
});
const data = obs.subscribe(e => {
console.log(e);
});
创建定时发送数据 并在12秒后取消订阅数据
const obs = Observable.create(observer => {
let num = 0;
const interve = setInterval(() => {
if (num % 2 === 0) {
observer.next('这个是' + num);
}
num++;
}, 1000);
// return clearInterval(interve);
});
const data = obs.subscribe(e => {
console.log(e);
});
setTimeout(() => {
data.unsubscribe();
}, 12000);
Import {empty} from’rxjs’
Empty 立即完成observable
const subscribe = empty().subscribe({
next: () => console.log('Next'),
complete: () => console.log('Complete!'),
});
Import {from} from’rxjs’
From 将数组、promise 或迭代器转换成 observable
const map = new Map();
map.set(1, 'wocao');
map.set(2, '擦从年');
map.set('color', 2);
//Map对象
const map = from([1,2,3,'wocao'])
//Map数组
const data = from(
new Promise((resolve, reject) => {
resolve('测试');
}),
);
//promise
data.subscribe(res => {
console.log(res);
});
Import {fromEvent} from’rxjs’
fromEvent 将事件转换为Observable 序列
const source = fromEvent(document, 'click');
const data = source.pipe(map(event => `Event time: ${event.timeStamp}`));
data.subscribe(res => {
console.log(res);
});
Import {fromPromise}from’rxjs/boservable/fromPromise’
fromPromise 创建由 promise 转换而来的 observable,并发出 promise 的结果
import { of } from 'rxjs/observable/of';
import { fromPromise } from 'rxjs/observable/fromPromise';
import { mergeMap, catchError } from 'rxjs/operators';
// 基于输入来决定是 resolve 还是 reject 的示例 promise
const myPromise = willReject => {
return new Promise((resolve, reject) => {
if (willReject) {
reject('Rejected!');
}
resolve('Resolved!');
});
};
// 先发出 true,然后是 false
const source = of(true, false);
const example = source.pipe(
mergeMap(val =>
fromPromise(myPromise(val)).pipe(
// 捕获并优雅地处理 reject 的结果
catchError(error => of(`Error: ${error}`))
)
)
);
// 输出: 'Error: Rejected!', 'Resolved!'
const subscribe = example.subscribe(val => console.log(val));
Import {interval,timer} from’rxjs’
Imterval 基于给定时间间隔发出数字序列
Timer 基于给定时间单次发送数字序列
const source = interval(1000);
source.subscribe(e => {
console.log(e);
});
const timers = timer(2000);
timers.subscribe(e => {
console.log(e);
});
Import {of} from ‘rxjs’
Of 依次发出提供的任意数量的值
const sourse = of(1, 2, 3, 4, '测试1', '测试2', '测试3', 5);
sourse.subscribe(e => {
console.log(e);
});
其发送的内容共包括对象数组和函数
const sourse = of({ name: '丛草' }, [1, 2, 3], function get() {
return 'lalala';
});
sourse.subscribe(e => {
console.log(e);
});
Import {range} from’rxjs’
range依次发出给定区间内的数字。
const source = range(2, 10);
source.subscribe(e => {
console.log(e);
});
Import {throwErrow}from’rxjs’
Import{catchError}from’rxjs/operators’
catchError 优雅地处理 observable 序列中的错误
const source = throwError('this is error');
const data = source.pipe(catchError(val => of(`I get:${val}`)));
data.subscribe(res => {
console.log(res);
});
Import {retry}from’rxjs/operators’;
Retry 如果发生错误,以指定次数重试observable序列
const source = interval(1000);
const example = source.pipe(
mergeMap(val => {
if (val > 6) {
return throwError('this is errow');
} else {
return of(val);
}
}),
retry(2),
);
example.subscribe({
next: val => console.log(val),
error: val => console.log(`${val}: Retried 2 times then quit!`),
});
import{retryWhen,delayWhen} from ‘rxjs/operators’
retryWhen 当发生错误时,基于自定义的标准来重试 observable 序列。
const source = interval(1000);
const example = source.pipe(
map(val => {
if (val > 3) {
throw val;
} else {
return val;
}
}),
retryWhen(errors =>
errors.pipe(
tap(val => console.log(`Value ${val} was too high!`)),
// 5秒后重启
delayWhen(val => timer(val * 1000)),
),
),
);
example.subscribe({
next: val => console.log(val),
error: val => console.log(`${val}: Retried 2 times then quit!`),
});
import{combineAll} from’rxjs/operators’; (combineAll中文 结合所有)
combineAll 收集内部完成的observables
const source = interval(1000).pipe(take(1));
const example = source.pipe(
map(val =>
interval(1000).pipe(
map(i => `Result (${val}): ${i}`),
take(5),
),
),
);
const combined = example.pipe(combineAll());
combined.subscribe(val => console.log(val));
Import{combineLatest}from’rxjs’ combineLatest (中文 聚合)
combineLatest 当任意 observable 发出值时,发出每个 observable 的最新值
const one = timer(1000, 3000);
const tow = timer(2000, 4000);
const three = timer(3000, 6000);
const data = combineLatest(one, tow, three);
data.subscribe(val => {
const [a, b, c] = val;
console.log(a + 'aaaaa' + b + 'bbbbb' + c);
});
用combineLatest 第三个参数 收集输出的结果
const combinedProject = combineLatest(
timerOne,
timerTwo,
timerThree,
(one, two, three) => {
return `Timer One (Proj) Latest: ${one},
Timer Two (Proj) Latest: ${two},
Timer Three (Proj) Latest: ${three}`;
}
);
使用聚合功能 组合两个按钮组
<div>
<button id='red'>Red</button>
<button id='black'>Black</button>
</div>
<div>Red: <span id="redTotal"></span> </div>
<div>Black: <span id="blackTotal"></span> </div>
<div>Total: <span id="total"></span> </div>
const setHtml = id => val => (document.getElementById(id).innerHTML = val);
const addOneClick$ = id =>
fromEvent(document.getElementById(id), 'click').pipe(
mapTo(2),
startWith(0),
scan((acc, curr) => acc + curr),
tap(setHtml(`${id}Total`)),
);
combineLatest(addOneClick$('red'), addOneClick$('black'))
.pipe(map(([val1, val2]) => val1 + val2))
.subscribe(setHtml('total'));
Import{concat} from’rxjs/operators’;
Concat 按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值
const source = of(1, 2, 3, 4, 5, 6, 6, 7, 7, 8, 8);
const source2 = from([12, 'soclok', 'ksdj', { name: 'goujieba', age: 111 }]);
const data = source.pipe(concat(source2)).subscribe(val => console.log(val));
静态:
Import {concat}from’rxjs’
const source = of(1, 2, 3, 4, 5, 6, 6, 7, 7, 8, 8);
const source2 = from([12, 'soclok', 'ksdj', { name: 'goujieba', age: 111 }]);
const data = concat(source, source2);
data.subscribe(val => console.log(val));
Import{concatAll}from’rxjs/operators’;
concatAll 收集 observables,当前一个完成时订阅下一个。
const source = interval(2000);
const data = source.pipe(
map(val => of(val + 10)),
concatAll(),
);
data.subscribe(val => {
console.log(val);
});
CancatAll可连接promise 或者 内部延问题
const obs1 = interval(1000).pipe(take(5));
const obs2 = interval(500).pipe(take(2));
const obs3 = interval(2000).pipe(take(1));
const source = of(obs1, obs2, obs3);
const example = source.pipe(concatAll());
example.subscribe(val => {
console.log(val);
});
Import {forkJoin} from’rxjs’
forkJoin等待序列中所有的observable完成后 以数组返回结果 若其中有未完成的observable将没有返回结果
const source = val => new Promise(resolve => setTimeout(i => resolve(val), 8000));
const example = forkJoin(
// 立即发出 'Hello'
of('Hello'),
// 1秒后发出 'World'
of('World').pipe(delay(1000)),
// 1秒后发出0
interval(1000).pipe(take(1)),
// 以1秒的时间间隔发出0和1
interval(1000).pipe(take(2)),
// 5秒后解析 'Promise Resolved' 的 promise
source('RESULT'),
);
// 输出: ["Hello", "World", 0, 1, "Promise Resolved: RESULT"]
const subscribe = example.subscribe(val => console.log(val));
Import {merge} from’rxjs’; 静态方法
Merge 将多个observables转换为observable,返回的结果顺序是随机的;
const one = interval(4000);
const two = interval(2000);
const three = interval(2500);
const four = interval(1500);
const source = merge(
one.pipe(mapTo('1111')).pipe(take(1)),
two.pipe(mapTo('22222')).pipe(take(1)),
three.pipe(mapTo('33333')).pipe(take(1)),
four.pipe(mapTo('44444')).pipe(take(1)),
);
source.subscribe(val => {
console.log(val);
});
Import {merge} from’rxjs/operators’; 实例方法
const one = interval(4000).pipe(
mapTo('111'),
take(1),
);
const two = interval(2000).pipe(
mapTo('222'),
take(1),
);
const source = one.pipe(merge(two));
source.subscribe(val => {
console.log(val);
});
Import {mergeAll}from’rxjs/operators’;
mergeAll 收集并订阅所有的ovservables; 并发所有返回的结果
const mypromise = val => new Promise(resolve => setTimeout(() => resolve(val), 2000));
const source = of(1, 2, 3, 4, 54);
source.pipe(
delay(1000),
map(val => mypromise(val)),
mergeAll(),
);
source.subscribe(val => {
console.log(val);
});
Import {pairwise}from’rxjs/operators’;
Pairwish 用数组返回前一个及当前值;
interval(1000)
.pipe(
pairwise(),
take(5),
)
.subscribe(console.log);
Import{race}from’rxjs’;
Race 比速度,首先返回先完成的数值,忽略之后的数据
race(interval(2000), interval(1000), interval(3500), of(2, 3, 4, 'dfsd')).subscribe(val => console.log(val));
import { startWith } from 'rxjs/operators';
startWith 给定第一个数值
const source = interval(1000);
const example = source.pipe(startWith(-3, -2, -1));
const subscribe = example.subscribe(val => console.log(val));
import { withLatestFrom} from 'rxjs/operators';
withLatestFrom 给定另一个值observeable
const source = interval(1500);
const info = interval(4500);
source
.pipe(
withLatestFrom(info),
map(([one, two]) => {
return `first:${one},sconed:${two}`;
}),
)
.subscribe(val => console.log(val));
Import {zip}from’rxjs’
Zip 订阅所有的ovserveables 待发出后统一以数组返回结果
const sourceOne = of('Hello');
const sourceTwo = of('World!');
const sourceThree = of('Goodbye');
const sourceFour = of('World!');
const example = zip(
sourceOne,
sourceTwo.pipe(delay(1000)),
sourceThree.pipe(delay(2000)),
sourceFour.pipe(delay(3000))
);
const subscribe = example.subscribe(val => console.log(val));
Import{defaultIfEmpty}from’rxjs/operators’;
defaultIfEmpty 当返回的observeables为空时,返回默认的数据
const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty!'));
const subscribe = exampleOne.subscribe(val => console.log(val));
Import{every}from’rxjs/operators’;
Every 断言所有返回的值,如果通过则返回true ,否则返回false;
const source = from([444, 333, 111, 222, 555, 666]);
source.pipe(every(val => val > 100)).subscribe(val => {
console.log(val);
});
Import {debounce}from’rxjs’;
Import{debounceTime}from’rxjs/operators’;
Debounce 输出给定时间范围的数值
const source = interval(1000);
source.pipe(debounce(val => timer(val * 200))).subscribe(val => console.log(val));
DebounceTime 输出一定时间返回内的数据,可用于放反跳;多次触发不必要的事件
ngAfterViewInit() {
const btn = document.getElementById('btn');
fromEvent(btn, 'click')
.pipe(
debounceTime(100),
mergeMap(val => this.http.post('/admin/transactions/list', fiterObj(this.form.value))),
)
.subscribe(i => console.log(i));
}
debouneceTime与switchMap同样可作为防止多余数据请求的方法,而switchMap可以取消在一定范围内的请求,debouneceTime(debounece)则只会在一定范围内生效一条数据请求
Import{distinctUntilChanged}from’rxjs/operators’;
distinctUntilChanged 检测栈数据是否重复,数据发生变化才返回,堆中数据不变;
const source = from([1, 2, 3, 3, 4, 4, 5, 5, '11112k,3', { name: 'ppp' }, { name: 'ppp' }, { name: 'aaa' }]);
source.pipe(distinctUntilChanged()).subscribe(val => console.log(val));
Import {filter}from’rxjs/operators’;
Filter过滤各类数据返回需要数值
const source = from([1, 2, 3, 3, 4, 4, 5, 5, '11112k,3', { name: 'ppp' }, { name: 'ppp' }, { name: 'aaa' }]);
source.pipe(filter(val => val > 2)).subscribe(val => console.log(val));
Import {first}from’rxjs’;
First 没有参数是则返回第一项,有参数则返回通过断言的第一项
const source = from([1, 2, 3, 4, 5]);
// 没有参数则发出第一个值
const example = source.pipe(first());
// 输出: "First value: 1"
const subscribe = example.subscribe(val => console.log(`First value: ${val}`));
First(num =>num ===5)
const source = from([1, 2, 3, 4, 5]);
// 发出通过测试的第一项
const example = source.pipe(first(num => num === 5));
// 输出: "First to pass test: 5"
const subscribe = example.subscribe(val =>
console.log(`First to pass test: ${val}`)
);
Import{ignoreElements}from’rxjs/operators’;
ignoreElements 忽略所有的next(),只返回error()和complete()
const source = interval(100);
// 略所有值,只发出 complete
const example = source.pipe(
take(5),
ignoreElements()
);
// 输出: "COMPLETE!"
const subscribe = example.subscribe(
val => console.log(`NEXT: ${val}`),
val => console.log(`ERROR: ${val}`),
() => console.log('COMPLETE!')
);
Import{last}from’rxjs/operators’;
last没有参数则返回最后一个值,具体参照first()
const source = from([1, 2, 3, 4, 5]);
// 没有参数则发出最后一个值
const example = source.pipe(last());
// 输出: "Last value: 5"
const subscribe = example.subscribe(val => console.log(`Last value: ${val}`));
Import{sample}from’rxjs/operators’;
sample从内部发出源从取样;
const source = interval(1000);
// 每2秒对源 observable 最新发出的值进行取样
const example = source.pipe(sample(interval(2000)));
// 输出: 2..4..6..8..
const subscribe = example.subscribe(val => console.log(val));
Import{single}from’rxjs/operators’;
Single 发出通过表达式的一项内容,且返回的结果只有一项,没什么卵用
const source = from([1, 2, 3, 4, 5]);
const example = source.pipe(single(val => val === 3));
example.subscribe(val => console.log(val));
Import{skip}from’rxjs/operators’;
skip跳过参数内的数据
const source = interval(1000);
const example = source.pipe(skip(5));
// 输出: 5...6...7...8........
const subscribe = example.subscribe(val => console.log(val));
Import{skipUntil}from’rxjs/operators’;
skipUntil 跳过源obeservable中的值,知道直到的observable发出值;
const source = interval(1000);
const example = source.pipe(skipUntil(timer(6000)));
// 输出: 5...6...7...8........
const subscribe = example.subscribe(val => console.log(val));
Import{skipWhile}from’rxjs/operators’;
skipWhile 与skipUntil相反 ,跳过源obeservable的值,直到内部的断言为false时
const source = interval(1000);
const example = source.pipe(skipWhile(val => val < 5));
// 输出: 5...6...7...8........
const subscribe = example.subscribe(val => console.log(val));
Import{take}from’rxjs/operators’;
Take 从源obeservable中取指定参数的值
const interval$ = interval(1000);
const example = interval$.pipe(take(5));
// 输出: 0,1,2,3,4
const subscribe = example.subscribe(val => console.log(val));
Import{takeUntil}from’rxjs/operators’;
takeUntil 从源obeservable发出值,直到内部obeservable发出值为止
const source = interval(1000);
const timer$ = timer(5000);
const example = source.pipe(takeUntil(timer$));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
Import{takeWhile}from’rxjs/operators’;
takeWhile 发出值,直到内部obeservable发出值为止
const source = of(1, 2, 3, 4, 5);
const example = source.pipe(takeWhile(val => val <= 4));
// 输出: 1,2,3,4
const subscribe = example.subscribe(val => console.log(val));
Import{throttle}from’rxjs/operatolrs’;
Throttle 以某个时间为阈值,抑制源obeservable发出值
const source = interval(1000);
const example = source.pipe(throttle(val => interval(2000)));
// 输出: 0...3...6...9
const subscribe = example.subscribe(val => console.log(val));
Import{throttleTime}from’rxjs/operators’;
torottleTime为throttle的简写
const example = source.pipe(throttleTime(5000));
TransFrommation 转化
Import{buffer}from’rxjs/operators’;
Buffer 收集源obeservable发出的值直到提供的obeserveable发出才将数值以数组形式发出
const myInterval = interval(1000);
const bufferBy = fromEvent(document, 'click');
const myBufferedInterval = myInterval.pipe(buffer(bufferBy));
// 例如 输出: [1,2,3] ... [4,5,6,7,8]
const subscribe = myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val));
Import {bufferCount}from’rxjs/operators’;
bufferCount收集发出的值,直到收集完提供数量的值才以数组发出
const myInterval = interval(1000);
const myBufferedInterval = myInterval.pipe(bufferCount(3));
myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val));
Import{bufferTime}from’rxjs/operators’;
bufferTime收集指定时间内的值,直到指定时间内才以数组发出
const myInterval = interval(1000);
const myBufferedInterval = myInterval.pipe(bufferTime(3000));
myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val));
Import{bufferToggle}from’rxjs/operators’;
bufferToggle 开启开关捕获observable的数值,关闭开关将数值以数组发出
const sourceInterval = interval(1000);
// 5秒后开启第一个缓冲区,然后每5秒钟开启新的缓冲区
const startInterval = interval(5000);
// 3秒后发出值以关闭相应的缓冲区
const closingInterval = val => {
console.log(`Value ${val} emitted, starting buffer! Closing in 3s!`);
return interval(3000);
};
// 每5秒会开启一个新的缓冲区以收集发出的值,3秒后发出缓冲的值
const bufferToggleInterval = sourceInterval.pipe(bufferToggle(startInterval, closingInterval));
// 输出到控制台
// 输出: Emitted Buffer: [4,5,6]...[9,10,11]
const subscribe = bufferToggleInterval.subscribe(val => console.log('Emitted Buffer:', val));
Import {bufferWhen}from’rxjs/operators’;
bufferWhen 缓冲指定时间内的值,关闭缓冲以数组发出数值
const oneSecondInterval = interval(1000);
// 返回的 observable 每5秒发出值
const fiveSecondInterval = () => interval(5000);
// 每5秒发出缓冲的值
const bufferWhenExample = oneSecondInterval.pipe(bufferWhen(fiveSecondInterval));
// 输出值
// 输出: [0,1,2,3]...[4,5,6,7,8]
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer: ', val));