五分钟带你玩转rocketMQ(五)实战广播与集群


一.集群和广播区别

1.集群消费方式

一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务

2.广播消费方式

一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费

二.代码实现广播

1. package cn.baocl.rocketmq.consumer;
2. 
3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor;
4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
5. import com.alibaba.rocketmq.client.exception.MQClientException;
6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
7. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
8. import org.slf4j.Logger;
9. import org.slf4j.LoggerFactory;
10. import org.springframework.beans.factory.annotation.Autowired;
11. import org.springframework.beans.factory.annotation.Value;
12. import org.springframework.boot.SpringBootConfiguration;
13. import org.springframework.context.annotation.Bean;
14. import org.springframework.util.StringUtils;
15. 
16. 
17. @SpringBootConfiguration
18. public class MQConsumerConfiguration {
19. 
20. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
21. @Value("${rocketmq.consumer.namesrvAddr}")
22. private String namesrvAddr;
23. @Value("${rocketmq.consumer.groupName}")
24. private String groupName;
25. @Value("${rocketmq.consumer.consumeThreadMin}")
26. private int consumeThreadMin;
27. @Value("${rocketmq.consumer.consumeThreadMax}")
28. private int consumeThreadMax;
29. @Value("${rocketmq.consumer.topics}")
30. private String topics;
31. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
32. private int consumeMessageBatchMaxSize;
33. @Autowired
34. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
35. 
36. @Bean
37. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception {
38. if (StringUtils.isEmpty(groupName)){
39. throw new Exception("groupName is null !!!");
40.         }
41. if (StringUtils.isEmpty(namesrvAddr)){
42. throw new Exception("namesrvAddr is null !!!");
43.         }
44. if(StringUtils.isEmpty(topics)){
45. throw new Exception("topics is null !!!");
46.         }
47.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
48.         consumer.setNamesrvAddr(namesrvAddr);
49.         consumer.setConsumeThreadMin(consumeThreadMin);
50.         consumer.setConsumeThreadMax(consumeThreadMax);
51.         consumer.registerMessageListener(mqMessageListenerProcessor);
52. 
53. /**
54.          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
55.          * 如果非第一次启动,那么按照上次消费的位置继续消费
56.          */
57.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
58. /**
59.          * 设置消费模型,集群还是广播,默认为集群
60.          */
61. //广播 
62.         consumer.setMessageModel(MessageModel.BROADCASTING);
63. //集群
64. //consumer.setMessageModel(MessageModel.CLUSTERING);
65. 
66. /**
67.          * 设置一次消费消息的条数,默认为1条
68.          */
69.         consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
70. 
71. try {
72. /**
73.              * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
74.              */
75.             String[] topicTagsArr = topics.split(";");
76. for (String topicTags : topicTagsArr) {
77.                 String[] topicTag = topicTags.split("~");
78.                 consumer.subscribe(topicTag[0],topicTag[1]);
79.             }
80.             consumer.start();
81.             LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
82.         }catch (MQClientException e){
83.             LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
84. throw new Exception(e);
85.         }
86. return consumer;
87.     }
88. }

只需要以下代码 

1. //广播 
2.         consumer.setMessageModel(MessageModel.BROADCASTING);
3.         //集群
4.         //consumer.setMessageModel(MessageModel.CLUSTERING);


上一篇:调整ceph集群镜像源


下一篇:springboot统一处理返回实体与异常抛出