我有一个套接字连接,用于发送带有标识符的消息.我想为每种类型的消息创建一个单独的observable.我有一些解决方案,但他们都感到笨拙或有可能的性能问题.我对RxJS比较陌生,所以我不知道可能陷入的陷阱.
我的第一直觉是为每种类型创建一个过滤的observable:
const receive_message = Rx.fromEvent(socket, 'data').pipe(share());
const message_type_a = receive_message.pipe(filter(message => message.type === 'a'));
const message_type_b = receive_message.pipe(filter(message => message.type === 'b'));
const message_type_c = receive_message.pipe(filter(message => message.type === 'c'));
const message_type_d = receive_message.pipe(filter(message => message.type === 'd'));
我认为这会导致性能问题,因为每次收到任何消息时,它都会对每种消息类型执行此检查.
我想过做一个像这样的多级分区:
const receive_message = Rx.fromEvent(socket, 'data');
const [message_type_a, not_a] = receive_message.pipe(partition(message => message.type === 'a'));
const [message_type_b, not_b] = not_a.pipe(partition(message => message.type === 'b'));
const [message_type_c, message_type_d] = not_b.pipe(partition(message => message.type === 'c'));
这非常笨重,我不确定它是否比过滤器解决方案更高效.
接下来我尝试使用这样的主题:
const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();
Rx.fromEvent(socket, 'data').subscribe(function (message) {
switch (message.type) {
case 'a':
message_type_a.next(message);
break;
case 'b':
message_type_b.next(message);
break;
case 'c':
message_type_c.next(message);
break;
case 'd':
message_type_d.next(message);
break;
default:
console.log('Uh oh');
}
},
console.log,
function () {
message_type_a.complete();
message_type_b.complete();
message_type_c.complete();
message_type_d.complete();
}
);
再次,这是笨重的,每当我使用主题时,我会问自己这是否是“Rx”做事方式.
理想情况下,我可以做这样的事情:
const [
message_type_a,
message_type_b,
message_type_c,
message_type_d
] = Rx.fromEvent(socket, 'data').pipe(partitionMany(message.type));
有没有任何优雅的解决方案,或者我的整体方法是如何从根本上分析源可观察的缺陷?
这是我的第一个问题,所以我希望我做得很好.提前致谢!
解决方法:
我将您的交换机案例解决方案更改为更高性能的解决方案.
const message_type_a = new Rx.Subject();
const message_type_b = new Rx.Subject();
const message_type_c = new Rx.Subject();
const message_type_d = new Rx.Subject();
subjects = {
'a': message_type_a,
'b': message_type_b,
'c': message_type_c,
'd': message_type_d
}
Rx.fromEvent(socket, 'data').pipe(tap(message =>
subjects[message.type].next(message))).subscribe();