rxjs 学习实践笔记

rxjs笔记部分随笔,权威地址:https://rxjs-cn.github.io/learn-rxjs-operators/operators/

 

Import {subject}from’rxjs’

Subject 数据的订阅与分发,结合报刊的发布与订阅进行功能的模拟,subject即是observeable对象也是observer对象,subject对于后期没有数据更新时所添加的订阅者是不怎么友好的,因为不跟新数据时订阅者就不在收到返回的数值

    const interval$ = interval(1000).pipe(take(10));

    const subject = new Subject();

 

    const observerA = {

      next: value => console.log('Observer A get value: ' + value),

      error: error => console.log('Observer A error: ' + error),

      complete: () => console.log('Observer A complete!'),

    };

 

    const observerB = {

      next: value => console.log('Observer B get value: ' + value),

      error: error => console.log('Observer B error: ' + error),

      complete: () => console.log('Observer B complete!'),

    };

 

    subject.subscribe(observerA); // 添加观察者A

    interval$.subscribe(subject); // 订阅interval$对象

    setTimeout(() => {

      subject.subscribe(observerB); // 添加观察者B

    }, 1000);

 

Import{BehaviorSubject}from’rxjs’;

behaviorSubject 是subject的变种,最大的区别就是 behaviorSubject是用于保存最新的数值,而不是单纯的发送事件,会将最后一次发送的值作为当前值保存在内部属性中。

    const subject = new BehaviorSubject(0);  //BehaviorSubject小括号0代表的是状态

    const observerA = {

      next: value => console.log('Observer A get value: ' + value),

      error: error => console.log('Observer A error: ' + error),

      complete: () => console.log('Observer A complete!'),

    };

 

    const observerB = {

      next: value => console.log('Observer B get value: ' + value),

      error: error => console.log('Observer B error: ' + error),

      complete: () => console.log('Observer B complete!'),

    };

 

    subject.subscribe(observerA); // 添加观察者A

    // interval$.subscribe(subject); // 订阅interval$对象

    subject.next(1);

    subject.next(2);

    subject.next(3);

    setTimeout(() => {

      subject.subscribe(observerB); // 添加观察者B

    }, 1000);

 

Import {ReplaySubject}from’rxjs’;

ReplaySubject 用于重复发送最近几次的值给订阅者

    const subject = new ReplaySubject(2); //ReplaySubject后的2为最后两次发送的数值

    const observerA = {

      next: value => console.log('Observer A get value: ' + value),

      error: error => console.log('Observer A error: ' + error),

      complete: () => console.log('Observer A complete!'),

    };

 

    const observerB = {

      next: value => console.log('Observer B get value: ' + value),

      error: error => console.log('Observer B error: ' + error),

      complete: () => console.log('Observer B complete!'),

    };

 

    subject.subscribe(observerA); // 添加观察者A

    // interval$.subscribe(subject); // 订阅interval$对象

    subject.next(1);

    subject.next(2);

    subject.next(3);

    setTimeout(() => {

      subject.subscribe(observerB); // 添加观察者B

    }, 1000);

 

Import{AsyncSubject}from’rxjs’;

AsyncSubject他会在subject完成后才返回一个值

    const subject = new AsyncSubject();

    const observerA = {

      next: value => console.log('Observer A get value: ' + value),

      error: error => console.log('Observer A error: ' + error),

      complete: () => console.log('Observer A complete!'),

    };

 

    const observerB = {

      next: value => console.log('Observer B get value: ' + value),

      error: error => console.log('Observer B error: ' + error),

      complete: () => console.log('Observer B complete!'),

    };

 

    subject.subscribe(observerA); // 添加观察者A

    // interval$.subscribe(subject); // 订阅interval$对象

    subject.next(1);

    subject.next(2);

    subject.next(3);

    subject.complete();

    setTimeout(() => {

      subject.subscribe(observerB); // 添加观察者B

    }, 1000);

 

 

 

 

 

Import {Observable}from ‘rxjs’

Create 创建在订阅函数中发出 'wocao'和 'woqu'的 observable 。

const obs = Observable.create(observer => {

      observer.next('wocao');

      observer.next('woqu');

    });

    const data = obs.subscribe(e => {

      console.log(e);

    });

创建定时发送数据 并在12秒后取消订阅数据

const obs = Observable.create(observer => {

      let num = 0;

      const interve = setInterval(() => {

        if (num % 2 === 0) {

          observer.next('这个是' + num);

        }

        num++;

      }, 1000);

      // return clearInterval(interve);

    });

    const data = obs.subscribe(e => {

      console.log(e);

    });

    setTimeout(() => {

      data.unsubscribe();

    }, 12000);

 

Import {empty} from’rxjs’

Empty 立即完成observable

 const subscribe = empty().subscribe({

      next: () => console.log('Next'),

      complete: () => console.log('Complete!'),

    });

 

 

Import {from} from’rxjs’

From 将数组、promise 或迭代器转换成 observable

const map = new Map();

    map.set(1, 'wocao');

    map.set(2, '擦从年');

    map.set('color', 2);

//Map对象

const map = from([1,2,3,'wocao'])

//Map数组

const data = from(

      new Promise((resolve, reject) => {

        resolve('测试');

      }),

);

//promise

 

    data.subscribe(res => {

      console.log(res);

    });

 

Import {fromEvent} from’rxjs’

fromEvent 将事件转换为Observable 序列

 

const source = fromEvent(document, 'click');

    const data = source.pipe(map(event => `Event time: ${event.timeStamp}`));

    data.subscribe(res => {

      console.log(res);

    });

 

Import {fromPromise}from’rxjs/boservable/fromPromise’

fromPromise 创建由 promise 转换而来的 observable,并发出 promise 的结果

import { of } from 'rxjs/observable/of';

import { fromPromise } from 'rxjs/observable/fromPromise';

import { mergeMap, catchError } from 'rxjs/operators';

 

// 基于输入来决定是 resolve 还是 reject 的示例 promise

const myPromise = willReject => {

  return new Promise((resolve, reject) => {

    if (willReject) {

      reject('Rejected!');

    }

    resolve('Resolved!');

  });

};

// 先发出 true,然后是 false

const source = of(true, false);

const example = source.pipe(

  mergeMap(val =>

    fromPromise(myPromise(val)).pipe(

      // 捕获并优雅地处理 reject 的结果

      catchError(error => of(`Error: ${error}`))

    )

  )

);

// 输出: 'Error: Rejected!', 'Resolved!'

const subscribe = example.subscribe(val => console.log(val));

 

Import {interval,timer} from’rxjs’

Imterval  基于给定时间间隔发出数字序列

Timer  基于给定时间单次发送数字序列

const source = interval(1000);

     source.subscribe(e => {

      console.log(e);

    });

    const timers = timer(2000);

    timers.subscribe(e => {

      console.log(e);

    });

 

Import {of} from ‘rxjs’

Of 依次发出提供的任意数量的值

const sourse = of(1, 2, 3, 4, '测试1', '测试2', '测试3', 5);

    sourse.subscribe(e => {

      console.log(e);

    });

其发送的内容共包括对象数组和函数

const sourse = of({ name: '丛草' }, [1, 2, 3], function get() {

      return 'lalala';

    });

    sourse.subscribe(e => {

      console.log(e);

    });

 

Import {range} from’rxjs’

range依次发出给定区间内的数字。

    const source = range(2, 10);

    source.subscribe(e => {

      console.log(e);

    });

 

Import {throwErrow}from’rxjs’

Import{catchError}from’rxjs/operators’

catchError 优雅地处理 observable 序列中的错误

    const source = throwError('this is error');

    const data = source.pipe(catchError(val => of(`I get:${val}`)));

    data.subscribe(res => {

      console.log(res);

    });

 

Import {retry}from’rxjs/operators’;

Retry 如果发生错误,以指定次数重试observable序列

    const source = interval(1000);

    const example = source.pipe(

      mergeMap(val => {

        if (val > 6) {

          return throwError('this is errow');

        } else {

          return of(val);

        }

      }),

      retry(2),

    );

    example.subscribe({

      next: val => console.log(val),

      error: val => console.log(`${val}: Retried 2 times then quit!`),

    });

 

 

import{retryWhen,delayWhen} from ‘rxjs/operators’

retryWhen 当发生错误时,基于自定义的标准来重试 observable 序列。

const source = interval(1000);

    const example = source.pipe(

      map(val => {

        if (val > 3) {

          throw val;

        } else {

          return val;

        }

      }),

      retryWhen(errors =>

        errors.pipe(

          tap(val => console.log(`Value ${val} was too high!`)),

          // 5秒后重启

          delayWhen(val => timer(val * 1000)),

        ),

      ),

    );

    example.subscribe({

      next: val => console.log(val),

      error: val => console.log(`${val}: Retried 2 times then quit!`),

    });

 

import{combineAll} from’rxjs/operators’; (combineAll中文 结合所有)

combineAll 收集内部完成的observables

   const source = interval(1000).pipe(take(1));

    const example = source.pipe(

      map(val =>

        interval(1000).pipe(

          map(i => `Result (${val}): ${i}`),

          take(5),

        ),

      ),

    );

    const combined = example.pipe(combineAll());

    combined.subscribe(val => console.log(val));

 

Import{combineLatest}from’rxjs’   combineLatest (中文 聚合)

combineLatest 当任意 observable 发出值时,发出每个 observable 的最新值

    const one = timer(1000, 3000);

    const tow = timer(2000, 4000);

    const three = timer(3000, 6000);

    const data = combineLatest(one, tow, three);

    data.subscribe(val => {

      const [a, b, c] = val;

      console.log(a + 'aaaaa' + b + 'bbbbb' + c);

    });

 

用combineLatest 第三个参数 收集输出的结果

const combinedProject = combineLatest(

  timerOne,

  timerTwo,

  timerThree,

  (one, two, three) => {

    return `Timer One (Proj) Latest: ${one}, 

              Timer Two (Proj) Latest: ${two}, 

              Timer Three (Proj) Latest: ${three}`;

  }

);

使用聚合功能 组合两个按钮组

<div>

  <button id='red'>Red</button>

  <button id='black'>Black</button>

</div>

<div>Red: <span id="redTotal"></span> </div>

<div>Black: <span id="blackTotal"></span> </div>

<div>Total: <span id="total"></span> </div>

 

const setHtml = id => val => (document.getElementById(id).innerHTML = val);

    const addOneClick$ = id =>

      fromEvent(document.getElementById(id), 'click').pipe(

        mapTo(2),

        startWith(0),

        scan((acc, curr) => acc + curr),

        tap(setHtml(`${id}Total`)),

      );

    combineLatest(addOneClick$('red'), addOneClick$('black'))

      .pipe(map(([val1, val2]) => val1 + val2))

      .subscribe(setHtml('total'));

 

Import{concat} from’rxjs/operators’;

Concat 按照顺序,前一个 observable 完成了再订阅下一个 observable 并发出值

    const source = of(1, 2, 3, 4, 5, 6, 6, 7, 7, 8, 8);

    const source2 = from([12, 'soclok', 'ksdj', { name: 'goujieba', age: 111 }]);

    const data = source.pipe(concat(source2)).subscribe(val => console.log(val));

静态:

Import {concat}from’rxjs’

    const source = of(1, 2, 3, 4, 5, 6, 6, 7, 7, 8, 8);

    const source2 = from([12, 'soclok', 'ksdj', { name: 'goujieba', age: 111 }]);

    const data = concat(source, source2);

    data.subscribe(val => console.log(val));

 

Import{concatAll}from’rxjs/operators’;

concatAll 收集 observables,当前一个完成时订阅下一个。

const source = interval(2000);

    const data = source.pipe(

      map(val => of(val + 10)),

      concatAll(),

    );

    data.subscribe(val => {

      console.log(val);

    });

CancatAll可连接promise 或者 内部延问题

const obs1 = interval(1000).pipe(take(5));

    const obs2 = interval(500).pipe(take(2));

    const obs3 = interval(2000).pipe(take(1));

    const source = of(obs1, obs2, obs3);

    const example = source.pipe(concatAll());

    example.subscribe(val => {

      console.log(val);

    });

 

Import {forkJoin} from’rxjs’

forkJoin等待序列中所有的observable完成后 以数组返回结果  若其中有未完成的observable将没有返回结果

const source = val => new Promise(resolve => setTimeout(i => resolve(val), 8000));

    const example = forkJoin(

      // 立即发出 'Hello'

      of('Hello'),

      // 1秒后发出 'World'

      of('World').pipe(delay(1000)),

      // 1秒后发出0

      interval(1000).pipe(take(1)),

      // 以1秒的时间间隔发出0和1

      interval(1000).pipe(take(2)),

      // 5秒后解析 'Promise Resolved' 的 promise

      source('RESULT'),

    );

    // 输出: ["Hello", "World", 0, 1, "Promise Resolved: RESULT"]

    const subscribe = example.subscribe(val => console.log(val));

 

Import {merge} from’rxjs’; 静态方法

Merge 将多个observables转换为observable,返回的结果顺序是随机的;

    const one = interval(4000);

    const two = interval(2000);

    const three = interval(2500);

    const four = interval(1500);

    const source = merge(

      one.pipe(mapTo('1111')).pipe(take(1)),

      two.pipe(mapTo('22222')).pipe(take(1)),

      three.pipe(mapTo('33333')).pipe(take(1)),

      four.pipe(mapTo('44444')).pipe(take(1)),

    );

    source.subscribe(val => {

      console.log(val);

    });

Import {merge} from’rxjs/operators’; 实例方法

 const one = interval(4000).pipe(

      mapTo('111'),

      take(1),

    );

    const two = interval(2000).pipe(

      mapTo('222'),

      take(1),

    );

    const source = one.pipe(merge(two));

    source.subscribe(val => {

      console.log(val);

    });

 

Import {mergeAll}from’rxjs/operators’;

mergeAll 收集并订阅所有的ovservables; 并发所有返回的结果

const mypromise = val => new Promise(resolve => setTimeout(() => resolve(val), 2000));

    const source = of(1, 2, 3, 4, 54);

    source.pipe(

      delay(1000),

      map(val => mypromise(val)),

      mergeAll(),

    );

    source.subscribe(val => {

      console.log(val);

    });

 

Import {pairwise}from’rxjs/operators’;

Pairwish 用数组返回前一个及当前值;

  interval(1000)

      .pipe(

        pairwise(),

        take(5),

      )

      .subscribe(console.log);

 

Import{race}from’rxjs’;

Race 比速度,首先返回先完成的数值,忽略之后的数据

race(interval(2000), interval(1000), interval(3500), of(2, 3, 4, 'dfsd')).subscribe(val => console.log(val));

 

 

 

 

import { startWith } from 'rxjs/operators';

startWith 给定第一个数值

const source = interval(1000);

const example = source.pipe(startWith(-3, -2, -1));

const subscribe = example.subscribe(val => console.log(val));

 

import { withLatestFrom} from 'rxjs/operators';

withLatestFrom 给定另一个值observeable

    const source = interval(1500);

    const info = interval(4500);

    source

      .pipe(

        withLatestFrom(info),

        map(([one, two]) => {

          return `first:${one},sconed:${two}`;

        }),

      )

      .subscribe(val => console.log(val));

 

Import {zip}from’rxjs’

Zip 订阅所有的ovserveables 待发出后统一以数组返回结果

const sourceOne = of('Hello');

const sourceTwo = of('World!');

const sourceThree = of('Goodbye');

const sourceFour = of('World!');

const example = zip(

  sourceOne,

  sourceTwo.pipe(delay(1000)),

  sourceThree.pipe(delay(2000)),

  sourceFour.pipe(delay(3000))

);

const subscribe = example.subscribe(val => console.log(val));

 

Import{defaultIfEmpty}from’rxjs/operators’;

defaultIfEmpty 当返回的observeables为空时,返回默认的数据

    const exampleOne = of().pipe(defaultIfEmpty('Observable.of() Empty!'));

    const subscribe = exampleOne.subscribe(val => console.log(val));

 

Import{every}from’rxjs/operators’;

Every 断言所有返回的值,如果通过则返回true ,否则返回false;

    const source = from([444, 333, 111, 222, 555, 666]);

    source.pipe(every(val => val > 100)).subscribe(val => {

      console.log(val);

    });

 

Import {debounce}from’rxjs’;

Import{debounceTime}from’rxjs/operators’;

Debounce 输出给定时间范围的数值

    const source = interval(1000);

    source.pipe(debounce(val => timer(val * 200))).subscribe(val => console.log(val));

DebounceTime 输出一定时间返回内的数据,可用于放反跳;多次触发不必要的事件

  ngAfterViewInit() {

    const btn = document.getElementById('btn');

    fromEvent(btn, 'click')

      .pipe(

        debounceTime(100),

        mergeMap(val => this.http.post('/admin/transactions/list', fiterObj(this.form.value))),

      )

      .subscribe(i => console.log(i));

  }

 

debouneceTime与switchMap同样可作为防止多余数据请求的方法,而switchMap可以取消在一定范围内的请求,debouneceTime(debounece)则只会在一定范围内生效一条数据请求

 

Import{distinctUntilChanged}from’rxjs/operators’;

distinctUntilChanged 检测栈数据是否重复,数据发生变化才返回,堆中数据不变;

const source = from([1, 2, 3, 3, 4, 4, 5, 5, '11112k,3', { name: 'ppp' }, { name: 'ppp' }, { name: 'aaa' }]);

    source.pipe(distinctUntilChanged()).subscribe(val => console.log(val));

 

Import {filter}from’rxjs/operators’;

Filter过滤各类数据返回需要数值

    const source = from([1, 2, 3, 3, 4, 4, 5, 5, '11112k,3', { name: 'ppp' }, { name: 'ppp' }, { name: 'aaa' }]);

    source.pipe(filter(val => val > 2)).subscribe(val => console.log(val));

 

Import {first}from’rxjs’;

First 没有参数是则返回第一项,有参数则返回通过断言的第一项

const source = from([1, 2, 3, 4, 5]);

// 没有参数则发出第一个值

const example = source.pipe(first());

// 输出: "First value: 1"

const subscribe = example.subscribe(val => console.log(`First value: ${val}`));

First(num =>num ===5)

const source = from([1, 2, 3, 4, 5]);

// 发出通过测试的第一项

const example = source.pipe(first(num => num === 5));

// 输出: "First to pass test: 5"

const subscribe = example.subscribe(val =>

  console.log(`First to pass test: ${val}`)

);

 

Import{ignoreElements}from’rxjs/operators’;

ignoreElements 忽略所有的next(),只返回error()和complete()

const source = interval(100);

// 略所有值,只发出 complete

const example = source.pipe(

  take(5),

  ignoreElements()

);

// 输出: "COMPLETE!"

const subscribe = example.subscribe(

  val => console.log(`NEXT: ${val}`),

  val => console.log(`ERROR: ${val}`),

  () => console.log('COMPLETE!')

);

 

Import{last}from’rxjs/operators’;

last没有参数则返回最后一个值,具体参照first()

const source = from([1, 2, 3, 4, 5]);

// 没有参数则发出最后一个值

const example = source.pipe(last());

// 输出: "Last value: 5"

const subscribe = example.subscribe(val => console.log(`Last value: ${val}`));

 

Import{sample}from’rxjs/operators’;

sample从内部发出源从取样;

const source = interval(1000);

// 每2秒对源 observable 最新发出的值进行取样

const example = source.pipe(sample(interval(2000)));

// 输出: 2..4..6..8..

const subscribe = example.subscribe(val => console.log(val));

 

Import{single}from’rxjs/operators’;

Single 发出通过表达式的一项内容,且返回的结果只有一项,没什么卵用

    const source = from([1, 2, 3, 4, 5]);

    const example = source.pipe(single(val => val === 3));

    example.subscribe(val => console.log(val));

 

Import{skip}from’rxjs/operators’;

skip跳过参数内的数据

const source = interval(1000);

const example = source.pipe(skip(5));

// 输出: 5...6...7...8........

const subscribe = example.subscribe(val => console.log(val));

 

Import{skipUntil}from’rxjs/operators’;

skipUntil 跳过源obeservable中的值,知道直到的observable发出值;

const source = interval(1000);

const example = source.pipe(skipUntil(timer(6000)));

// 输出: 5...6...7...8........

const subscribe = example.subscribe(val => console.log(val));

 

Import{skipWhile}from’rxjs/operators’;

skipWhile 与skipUntil相反 ,跳过源obeservable的值,直到内部的断言为false时

const source = interval(1000);

const example = source.pipe(skipWhile(val => val < 5));

// 输出: 5...6...7...8........

const subscribe = example.subscribe(val => console.log(val));

 

Import{take}from’rxjs/operators’;

Take 从源obeservable中取指定参数的值

const interval$ = interval(1000);

const example = interval$.pipe(take(5));

// 输出: 0,1,2,3,4

const subscribe = example.subscribe(val => console.log(val));

 

Import{takeUntil}from’rxjs/operators’;

takeUntil 从源obeservable发出值,直到内部obeservable发出值为止

const source = interval(1000);

const timer$ = timer(5000);

const example = source.pipe(takeUntil(timer$));

// 输出: 0,1,2,3

const subscribe = example.subscribe(val => console.log(val));

 

Import{takeWhile}from’rxjs/operators’;

takeWhile 发出值,直到内部obeservable发出值为止

const source = of(1, 2, 3, 4, 5);

const example = source.pipe(takeWhile(val => val <= 4));

// 输出: 1,2,3,4

const subscribe = example.subscribe(val => console.log(val));

 

Import{throttle}from’rxjs/operatolrs’;

Throttle 以某个时间为阈值,抑制源obeservable发出值

const source = interval(1000);

const example = source.pipe(throttle(val => interval(2000)));

// 输出: 0...3...6...9

const subscribe = example.subscribe(val => console.log(val));

 

Import{throttleTime}from’rxjs/operators’;

torottleTime为throttle的简写

const example = source.pipe(throttleTime(5000));

 

 

TransFrommation 转化

Import{buffer}from’rxjs/operators’;

Buffer 收集源obeservable发出的值直到提供的obeserveable发出才将数值以数组形式发出

 

    const myInterval = interval(1000);

    const bufferBy = fromEvent(document, 'click');

    const myBufferedInterval = myInterval.pipe(buffer(bufferBy));

    // 例如 输出: [1,2,3] ... [4,5,6,7,8]

    const subscribe = myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val));

 

 

Import {bufferCount}from’rxjs/operators’;

bufferCount收集发出的值,直到收集完提供数量的值才以数组发出

    const myInterval = interval(1000);

    const myBufferedInterval = myInterval.pipe(bufferCount(3));

    myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val));

 

Import{bufferTime}from’rxjs/operators’;

bufferTime收集指定时间内的值,直到指定时间内才以数组发出

const myInterval = interval(1000);

    const myBufferedInterval = myInterval.pipe(bufferTime(3000));

    myBufferedInterval.subscribe(val => console.log(' Buffered Values:', val));

 

Import{bufferToggle}from’rxjs/operators’;

bufferToggle 开启开关捕获observable的数值,关闭开关将数值以数组发出

    const sourceInterval = interval(1000);

    // 5秒后开启第一个缓冲区,然后每5秒钟开启新的缓冲区

    const startInterval = interval(5000);

    // 3秒后发出值以关闭相应的缓冲区

    const closingInterval = val => {

      console.log(`Value ${val} emitted, starting buffer! Closing in 3s!`);

      return interval(3000);

    };

    // 每5秒会开启一个新的缓冲区以收集发出的值,3秒后发出缓冲的值

    const bufferToggleInterval = sourceInterval.pipe(bufferToggle(startInterval, closingInterval));

    // 输出到控制台

    // 输出: Emitted Buffer: [4,5,6]...[9,10,11]

    const subscribe = bufferToggleInterval.subscribe(val => console.log('Emitted Buffer:', val));

 

Import {bufferWhen}from’rxjs/operators’;

bufferWhen 缓冲指定时间内的值,关闭缓冲以数组发出数值

    const oneSecondInterval = interval(1000);

    // 返回的 observable 每5秒发出值

    const fiveSecondInterval = () => interval(5000);

    // 每5秒发出缓冲的值

    const bufferWhenExample = oneSecondInterval.pipe(bufferWhen(fiveSecondInterval));

    // 输出值

    // 输出: [0,1,2,3]...[4,5,6,7,8]

    const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer: ', val));

 

上一篇:javascript-RxJs处理异常而不终止


下一篇:javascript-如何按顺序使用RxJS Observables?