如果用一句话阐述对于observable的简单理解,我会这么说:事件流(event stream) + 观察订阅模式(observe/subscribe pattern)。
observable相比promise,可以更好地实现函数式编程、支持取消、可以有多个事件的订阅者,等等。 在以往的项目实践中,如react,我们会用axios发起异步请求,在then中处理返回结果。当嵌套多层后,代码就会变得混乱。我们当然可以用async/await来避免回调嵌套,不过本文旨在说明如何在RxJS框架下以函数式编程的方式实现多层的异步调用。
建立Node项目
npm init -y
npm install @types/node rxjs typescript axios --save-dev
结果如下
{
"name": "test",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@types/node": "^16.9.1",
"rxjs": "^7.3.0",
"typescript": "^4.4.3"
},
"devDependencies": {
"axios": "^0.21.4"
}
}
示例1:自定义实现Observable对Promise的包装
import { Observable } from 'rxjs';
import axios from 'axios';
let task = new Observable( ( observer: any ) => {
axios.get( 'https://jsonplaceholder.typicode.com/users' )
.then( ( response ) => {
observer.next( response.data );
observer.complete();
} )
.catch( ( error ) => {
observer.error( error );
} );
} );
task.subscribe({
next(data) { console.log('data: ', data); },
error(err) { console.log('something wrong occurred: ' + err); },
complete() { console.log('done'); }
});
运行方式:
tsc [文件名].ts
node [文件名].js
示例2:用RxJS提供的from方法将Promise转换为Observable
import { from, map } from 'rxjs';
import axios from 'axios';
const promise = axios.get('https://jsonplaceholder.typicode.com/users')
const observable = from(promise).pipe(map(d => d.data));
observable.subscribe({
next(data) { console.log('data: ', data); },
error(err) { console.log('something wrong occurred: ' + err); },
complete() { console.log('done'); }
})
此处在获取到数据后,再使用map将数据进行格式转换(示例中原封不动地返回)
示例3:多个Observable并行执行
import { Observable, from, forkJoin, map, of } from 'rxjs';
import { mergeMap, reduce, filter } from 'rxjs/operators';
import axios from 'axios';
const task1 = from(axios.get('https://jsonplaceholder.typicode.com/users')).pipe(map(d => d.data));;
const task2 = from(new Promise(function(resolve) {
setTimeout(()=>resolve('Hello!'), 1500);
}))
forkJoin([task1, task2]).subscribe({
next(data) { console.log('data: ', data); },
error(err) { console.log('something wrong occurred: ' + err); },
complete() { console.log('done'); }
})
const squareOdd = of(1, 2, 3, 4, 5)
.pipe(
filter(n => n % 2 !== 0),
map(n => n * n)
);
// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
读者可能会问,Promise.all也可以实现同样的结果,async/await也能避免回调嵌套。 那么RxJS的优势到底在哪里呢?
的确,简单的应用场景下并无明显差别,后续我们给出更高级的应用场景,再做分享。