我的角度应用程序中有一个Firebase订阅,它会触发多次.
ich如何实现将任务作为队列处理,以便每个任务可以同步运行一次?
this.tasks.subscribe(async tasks => {
for (const x of tasks)
await dolongtask(x); // has to be sync
await removetask(x);
});
问题在于,当longtask仍在处理时,将触发subribe事件.
解决方法:
我根据您提供的代码做出了一些假设,
>其他应用程序将任务添加到firebase数据库(异步),并且此代码正在实现任务处理器.
>您的firebase查询返回(集合中)所有未处理的任务,并且每次添加新任务时都会发出完整列表.
>仅在运行removeTask()后,查询才会删除任务
如果是这样,则需要在处理器之前执行重复数据删除机制.
出于说明的目的,我模拟了带有主题的firebase查询(将其重命名为taskQuery $),并且在脚本底部模拟了一系列firebase事件.
我希望它不会太混乱!
console.clear()
const { mergeMap, filter } = rxjs.operators;
// Simulate tasks query
const tasksQuery$= new rxjs.Subject();
// Simulate dolongtask and removetask (assume both return promises that can be awaited)
const dolongtask = (task) => {
console.log( `Processing: ${task.id}`);
return new Promise(resolve => {
setTimeout(() => {
console.log( `Processed: ${task.id}`);
resolve('done')
}, 1000);
});
}
const removeTask = (task) => {
console.log( `Removing: ${task.id}`);
return new Promise(resolve => {
setTimeout(() => {
console.log( `Removed: ${task.id}`);
resolve('done')
}, 200);
});
}
// Set up queue (this block could be a class in Typescript)
let tasks = [];
const queue$= new rxjs.Subject();
const addToQueue = (task) => {
tasks = [...tasks, task];
queue$.next(task);
}
const removeFromQueue = () => tasks = tasks.slice(1);
const queueContains = (task) => tasks.map(t => t.id).includes(task.id)
// Dedupe and enqueue
tasksQuery$.pipe(
mergeMap(tasks => tasks), // flatten the incoming task array
filter(task => task && !queueContains(task)) // check not in queue
).subscribe(task => addToQueue(task) );
//Process the queue
queue$.subscribe(async task => {
await dolongtask(task);
await removeTask(task); // Assume this sends 'delete' to firebase
removeFromQueue();
});
// Run simulation
tasksQuery$.next([{id:1},{id:2}]);
// Add after delay to show repeated items in firebase
setTimeout(() => {
tasksQuery$.next([{id:1},{id:2},{id:3}]);
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>