javascript – RxJS:优雅的方式将分区源Observable分成3个或更多的Observable

我有一个套接字连接,用于发送带有标识符的消息.我想为每种类型的消息创建一个单独的observable.我有一些解决方案,但他们都感到笨拙或有可能的性能问题.我对RxJS比较陌生,所以我不知道可能陷入的陷阱.

我的第一直觉是为每种类型创建一个过滤的observable:

const receive_message = Rx.fromEvent(socket, 'data').pipe(share());
const message_type_a = receive_message.pipe(filter(message => message.type === 'a'));
const message_type_b = receive_message.pipe(filter(message => message.type === 'b'));
const message_type_c = receive_message.pipe(filter(message => message.type === 'c'));
const message_type_d = receive_message.pipe(filter(message => message.type === 'd'));

我认为这会导致性能问题,因为每次收到任何消息时,它都会对每种消息类型执行此检查.

我想过做一个像这样的多级分区:

const receive_message = Rx.fromEvent(socket, 'data');
const [message_type_a, not_a] = receive_message.pipe(partition(message => message.type === 'a'));
const [message_type_b, not_b] = not_a.pipe(partition(message => message.type === 'b'));
const [message_type_c, message_type_d] = not_b.pipe(partition(message => message.type === 'c'));

这非常笨重,我不确定它是否比过滤器解决方案更高效.

接下来我尝试使用这样的主题:

const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();

Rx.fromEvent(socket, 'data').subscribe(function (message) {
    switch (message.type) {
      case 'a':
        message_type_a.next(message);
        break;
      case 'b':
        message_type_b.next(message);
        break;
      case 'c':
        message_type_c.next(message);
        break;
      case 'd':
        message_type_d.next(message);
        break;
      default:
        console.log('Uh oh');
    }
  },
  console.log,
  function () {
    message_type_a.complete();
    message_type_b.complete();
    message_type_c.complete();
    message_type_d.complete();
  }
);

再次,这是笨重的,每当我使用主题时,我会问自己这是否是“Rx”做事方式.

理想情况下,我可以做这样的事情:

const [
  message_type_a,
  message_type_b,
  message_type_c,
  message_type_d
] = Rx.fromEvent(socket, 'data').pipe(partitionMany(message.type));

有没有任何优雅的解决方案,或者我的整体方法是如何从根本上分析源可观察的缺陷?

这是我的第一个问题,所以我希望我做得很好.提前致谢!

解决方法:

我将您的交换机案例解决方案更改为更高性能的解决方案.

const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();

subjects = {
    'a': message_type_a,
    'b': message_type_b,
    'c': message_type_c,
    'd': message_type_d
}

Rx.fromEvent(socket, 'data').pipe(tap(message => 
subjects[message.type].next(message))).subscribe();
上一篇:javascript – rxjs / Subscription没有导出成员’Subscription’


下一篇:javascript – FirebaseListObservable在查询后变为Observable