一.集群和广播区别
1.集群消费方式
一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务
2.广播消费方式
一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费
二.代码实现广播
1. package cn.baocl.rocketmq.consumer; 2. 3. import cn.baocl.rocketmq.processor.MQConsumeMsgListenerProcessor; 4. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; 5. import com.alibaba.rocketmq.client.exception.MQClientException; 6. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; 7. import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; 8. import org.slf4j.Logger; 9. import org.slf4j.LoggerFactory; 10. import org.springframework.beans.factory.annotation.Autowired; 11. import org.springframework.beans.factory.annotation.Value; 12. import org.springframework.boot.SpringBootConfiguration; 13. import org.springframework.context.annotation.Bean; 14. import org.springframework.util.StringUtils; 15. 16. 17. @SpringBootConfiguration 18. public class MQConsumerConfiguration { 19. 20. public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class); 21. @Value("${rocketmq.consumer.namesrvAddr}") 22. private String namesrvAddr; 23. @Value("${rocketmq.consumer.groupName}") 24. private String groupName; 25. @Value("${rocketmq.consumer.consumeThreadMin}") 26. private int consumeThreadMin; 27. @Value("${rocketmq.consumer.consumeThreadMax}") 28. private int consumeThreadMax; 29. @Value("${rocketmq.consumer.topics}") 30. private String topics; 31. @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") 32. private int consumeMessageBatchMaxSize; 33. @Autowired 34. private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; 35. 36. @Bean 37. public DefaultMQPushConsumer testRocketMQConsumer() throws Exception { 38. if (StringUtils.isEmpty(groupName)){ 39. throw new Exception("groupName is null !!!"); 40. } 41. if (StringUtils.isEmpty(namesrvAddr)){ 42. throw new Exception("namesrvAddr is null !!!"); 43. } 44. if(StringUtils.isEmpty(topics)){ 45. throw new Exception("topics is null !!!"); 46. } 47. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); 48. consumer.setNamesrvAddr(namesrvAddr); 49. consumer.setConsumeThreadMin(consumeThreadMin); 50. consumer.setConsumeThreadMax(consumeThreadMax); 51. consumer.registerMessageListener(mqMessageListenerProcessor); 52. 53. /** 54. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 55. * 如果非第一次启动,那么按照上次消费的位置继续消费 56. */ 57. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); 58. /** 59. * 设置消费模型,集群还是广播,默认为集群 60. */ 61. //广播 62. consumer.setMessageModel(MessageModel.BROADCASTING); 63. //集群 64. //consumer.setMessageModel(MessageModel.CLUSTERING); 65. 66. /** 67. * 设置一次消费消息的条数,默认为1条 68. */ 69. consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); 70. 71. try { 72. /** 73. * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3 74. */ 75. String[] topicTagsArr = topics.split(";"); 76. for (String topicTags : topicTagsArr) { 77. String[] topicTag = topicTags.split("~"); 78. consumer.subscribe(topicTag[0],topicTag[1]); 79. } 80. consumer.start(); 81. LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr); 82. }catch (MQClientException e){ 83. LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e); 84. throw new Exception(e); 85. } 86. return consumer; 87. } 88. }
只需要以下代码
1. //广播 2. consumer.setMessageModel(MessageModel.BROADCASTING); 3. //集群 4. //consumer.setMessageModel(MessageModel.CLUSTERING);