javascript-Rxjs订阅队列

我的角度应用程序中有一个Firebase订阅,它会触发多次.
ich如何实现将任务作为队列处理,以便每个任务可以同步运行一次?

this.tasks.subscribe(async tasks => {
   for (const x of tasks) 
      await dolongtask(x); // has to be sync
      await removetask(x);
   });

问题在于,当longtask仍在处理时,将触发subribe事件.

解决方法:

我根据您提供的代码做出了一些假设,

>其他应用程序将任务添加到firebase数据库(异步),并且此代码正在实现任务处理器.
>您的firebase查询返回(集合中)所有未处理的任务,并且每次添加新任务时都会发出完整列表.
>仅在运行removeTask()后,查询才会删除任务

如果是这样,则需要在处理器之前执行重复数据删除机制.

出于说明的目的,我模拟了带有主题的firebase查询(将其重命名为taskQuery $),并且在脚本底部模拟了一系列firebase事件.
我希望它不会太混乱!

console.clear()
const { mergeMap, filter } = rxjs.operators;

// Simulate tasks query  
const tasksQuery$= new rxjs.Subject();

// Simulate dolongtask and removetask (assume both return promises that can be awaited)
const dolongtask = (task) => {
  console.log( `Processing: ${task.id}`);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log( `Processed: ${task.id}`);
      resolve('done')
    }, 1000);
  });
}
const removeTask = (task) => {
  console.log( `Removing: ${task.id}`);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log( `Removed: ${task.id}`);
      resolve('done')
    }, 200);
  });
}

// Set up queue (this block could be a class in Typescript)
let tasks = [];
const queue$= new rxjs.Subject();
const addToQueue = (task) => {
  tasks = [...tasks, task];
  queue$.next(task);
}
const removeFromQueue = () => tasks = tasks.slice(1);
const queueContains = (task) => tasks.map(t => t.id).includes(task.id)

// Dedupe and enqueue
tasksQuery$.pipe(
  mergeMap(tasks => tasks), // flatten the incoming task array 
  filter(task => task && !queueContains(task)) // check not in queue
).subscribe(task => addToQueue(task) );

//Process the queue
queue$.subscribe(async task => {
  await dolongtask(task);
  await removeTask(task); // Assume this sends 'delete' to firebase
  removeFromQueue();
});

// Run simulation
tasksQuery$.next([{id:1},{id:2}]);
// Add after delay to show repeated items in firebase
setTimeout(() => {
  tasksQuery$.next([{id:1},{id:2},{id:3}]); 
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
上一篇:javascript-在可观察值中更改值


下一篇:RxJS:如何切换以在多个行为主题源之间切换