2.2 RabbitMQ广播、工作队列模式坑
消息模式是广播 Or 工作队列
- 消息广播,即希望同一消息,不同消费者都能分别消费
- 队列模式,即不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次。
比如同一用户的注册消息
- 会员服务需监听以发送欢迎短信
- 营销服务需监听以发送新用户小礼物
但会员、营销服务可能都有多实例,业务需求同一用户的消息,可同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务1和会员服务2),不管哪个实例来处理,处理一次即可(工作队列模式):
实现代码时务必确认MQ系统的机制,确保消息的路由按期望。
RocketMQ实现类似功能比较简单直白:若消费者属于一个组,那么消息只会由同组的一个消费者消费;若消费者属不同组,每个组都能消费一遍消息。
而RabbitMQ的消息路由模式采用队列+交换器,队列是消息载体,交换器决定消息路由到队列的方式。
step1:会员服务-监听用户服务发出的新用户注册消息
若启动俩会员服务,那么同一用户的注册消息应只能被其中一个实例消费。
分别实现RabbitMQ队列、交换器、绑定三件套。
- 队列使用匿名队列
- 交换器使用DirectExchange,交换器绑定到匿名队列的路由Key是空字符串
收到消息之后,打印所在实例使用的端口。
- 消息发布者、消费者、以及MQ的配置
- 使用12345和45678两个端口启动两个程序实例后,调用sendMessage接口发送一条消息,输出的日志,显示同一会员服务两个实例都收到了消息:
- 问题在于不明
RabbitMQ直接交换器和队列的绑定关系
RabbitMQ的直接交换器根据routingKey路由消息。而程序每次启动都会创建匿名(随机命名)队列,所以每个会员服务实例都对应独立的队列,以空routingKey绑定到直接交换器。
用户服务发消息时也设置了空routingKey,所以直接交换器收到消息后,发现匹配俩队列,于是都转发消息
修复
对会员服务不要使用匿名队列,而使用同一队列。
将上面代码中的匿名队列换做普通队列:
private static final String QUEUE = "newuserQueue"; @Bean public Queue queue() { return new Queue(QUEUE); }
这样对同一消息,俩实例中只有一个实例可收到,不同消息被轮询发给不同实例。
- 现在的交换器和队列关系
step2:用户服务-广播消息给会员、营销服务
期望会员、营销服务都能收到广播消息,但会员/营销服务中的每个实例只需收到一次消息。
声明一个队列和一个FanoutExchange,然后模拟俩用户服务和俩营销服务:
@Slf4j @Configuration @RestController @RequestMapping("fanoutwrong") public class FanoutQueueWrong { private static final String QUEUE = "newuser"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } //声明FanoutExchange,然后绑定到队列,FanoutExchange绑定队列的时候不需要routingKey @Bean public Declarables declarables() { Queue queue = new Queue(QUEUE); FanoutExchange exchange = new FanoutExchange(EXCHANGE); return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange)); } //会员服务实例1 @RabbitListener(queues = QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName); } //会员服务实例2 @RabbitListener(queues = QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName); } //营销服务实例1 @RabbitListener(queues = QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } //营销服务实例2 @RabbitListener(queues = QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
请求四次sendMessage注册四个用户。日志发现一条用户注册的消息,要么被会员服务收到,要么被营销服务收到,这不是广播。可使用的明明是FanoutExchange,为什么没起效呢?
因为广播交换器会忽略routingKey,广播消息到所有绑定的队列。该案例的俩会员服务和两个营销服务都绑定了同一队列,所以四服务只能收到一次消息:
修复
拆分队列,会员和营销两组服务分别使用一条独立队列绑定到广播交换器
@Slf4j @Configuration @RestController @RequestMapping("fanoutright") public class FanoutQueueRight { private static final String MEMBER_QUEUE = "newusermember"; private static final String PROMOTION_QUEUE = "newuserpromotion"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } @Bean public Declarables declarables() { //会员服务队列 Queue memberQueue = new Queue(MEMBER_QUEUE); //营销服务队列 Queue promotionQueue = new Queue(PROMOTION_QUEUE); //广播交换器 FanoutExchange exchange = new FanoutExchange(EXCHANGE); //两个队列绑定到同一个交换器 return new Declarables(memberQueue, promotionQueue, exchange, BindingBuilder.bind(memberQueue).to(exchange), BindingBuilder.bind(promotionQueue).to(exchange)); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
现在的交换器和队列结构
从日志输出可以验证,对每条MQ消息,会员服务和营销服务分别都会收到一次,一条消息广播到两个服务同时,在每一个服务的两个实例中通过轮询接收:
异步的消息路由模式一旦配置出错,轻则可能导致消息重复处理,重则可能导致重要的服务无法接收到消息,最终造成业务逻辑错误。
小结
微服务场景下不同服务多个实例监听消息的情况,一般不同服务需要同时收到相同的消息,而相同服务的多个实例只需要轮询接收消息。我们需要确认MQ的消息路由配置是否满足需求,以避免消息重复或漏发问题。