文章目录
RocketMQ
官方文档:https://rocketmq.apache.org/docs/quick-start/
-
解耦 中间件AB
-
异步 下单→>成功验证短信、邮件、仓储调度
-
消息驱动/事件驱动型
1.开启rocketmq
- 添加环境变量
ROCKETMQ_HOME="D:\rocketmq"
NAMESRV_ADDR="localhost:9876"
- 在rocketmq的bin目录下开启cmd命令行
输入一下两个命令
mqnamesrv.cmd
启动第一个命令后被阻塞,重新开一个命令行继续输入
mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
2 应用
- 引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
- 添加配置
rocketmq:
topic: PlaceOrder
namesrv-addr: 127.0.0.1:9876
producer:
producer-group: SleeveProducerGroup
consumer:
consumer-group: SleeveConsumerGroup
3 延迟消息队列
预定消息与普通消息的不同之处在于,它们直到指定的时间之后才会被传递。
生产者
@Component
public class ProducerSchedule {
@Value("${rocketmq.producer.producer-group}")
private String producerGroup;
@Value("${rocketmq.namesrv-addr}")
private String namesrvAddr;
// rocketmq 的生产者
private DefaultMQProducer defaultMQProducer;
// 执行顺序: A Construct--> B @Autowired --> @PostConstruct
@PostConstruct
public void setDefaultMQProducer() {
if (defaultMQProducer == null) {
//在类实例化后,创建defaultMQProducer对象
this.defaultMQProducer = new DefaultMQProducer(producerGroup);
// 设置rockemq的地址
defaultMQProducer.setNamesrvAddr(namesrvAddr);
}
try {
// 开启生成者
defaultMQProducer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public String send(String topic, String body) throws Exception {
Message message = new Message(topic, body.getBytes());
// messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 设置延时级别
message.setDelayTimeLevel(3);
// 发送消息
SendResult sendResult = defaultMQProducer.send(message);
System.out.println(sendResult.getMsgId());
System.out.println(sendResult.getSendStatus());
return sendResult.getMsgId();
}
}
消费者
/**
* 实现CommandLineRunner接口的方法 该类由SpringBoot调用
* 在程序初始化时,只执行一次
*/
@Component
public class ConsumerSchedule implements CommandLineRunner {
@Value("${rocketmq.consumer.consumer-group}")
private String consumerGroup;
@Value("${rocketmq.namesrv-addr}")
private String namesrvAddr;
@Value("${rocketmq.topic}")
private String topic;
@Autowired
private OrderCancelService orderCancelService;
@Autowired
private CouponBackService couponBackService;
public void getMessage() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 设置rockemq的地址
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 监听test主题下的所有消息
consumer.subscribe(topic, "*");
// 并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
String orderMessage = new String(msg.getBody());
System.out.println(orderMessage);
if (StringUtils.isEmpty(orderMessage)) break;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
@Override
public void run(String... args) throws Exception {
this.getMessage();
}
}
4 RocketMQ消费位置
public enum ConsumeFromWhere {
/**
* 一个新的订阅组第一次启动从队列的最后位置开始消费<br>
* 后续再启动接着上次消费的进度开始消费
*/
CONSUME_FROM_LAST_OFFSET,
@Deprecated
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
@Deprecated
CONSUME_FROM_MIN_OFFSET,
@Deprecated
CONSUME_FROM_MAX_OFFSET,
/**
* 一个新的订阅组第一次启动从队列的最前位置开始消费<br>
* 后续再启动接着上次消费的进度开始消费
*/
CONSUME_FROM_FIRST_OFFSET,
/**
* 一个新的订阅组第一次启动从指定时间点开始消费<br>
* 后续再启动接着上次消费的进度开始消费<br>
* 时间点设置参见DefaultMQPushConsumer.consumeTimestamp参数
*/
CONSUME_FROM_TIMESTAMP,
}
4 rocketmq的有序消费模式和并发消费模式的区别
rocketmq消费者注册监听有两种模式,有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。
MessageListenerOrderly正确消费返回ConsumeOrderlyStatus.SUCCESS,
稍后消费返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently正确消费返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
稍后消费返回ConsumeConcurrentlyStatus.RECONSUME_LATER