消息队列 RabbitMq 的学习和应用
- MQ简介
- 使用场景
- RabbitMq简介
- RabbitMQ的概念
- docker 安装
- 测试使用 Exchange类型
- Direct-Exchange 测试
- Fanout-Exchange 测试
- Topic-Exchange 测试
- SpringBoot整合RabbitMQ
- AmqpAdmin使用
- 如何收发消息 RabbitTemplate使用
- RabbitListener&RabbitHandler接收消息
- 消息确认机制
- 可靠投递-消费端确认
- RabbitMQ 的应用
MQ简介
队列类似一种List结构,专门来存储数据的一个队列
使用场景
RabbitMq简介
JMS和AMQP的区别
RabbitMQ的概念
生产者和消费者都是建立在长链接上
docker 安装
下载并启动这个容器docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
容器自启docker update rabbitmq --restart=always
测试使用 Exchange类型
一个交换机可以绑定很多队列,队列可以被多个交换机绑定
有4种类型的交换机,只学习前三种
直接交换机,将消息直接交给一个指定的队列,消息最终只能到达一个队列
广播模式交换机,不分路由键,直接把消息交给所有绑定了的队列
主题模式交换机,部分广播,匹配式
创建测试交换机
type:选择交换机类型
Durability:持久化和临时的,默认持久,重启虚拟机交换机不被删除
Auto delete: 是否自动删除,如果没绑定队列会自动删除
Internal:是否是内部交换机,只供内部使用的
创建一个队列
点进刚才创建的交换机,进行绑定
Direct-Exchange 测试
根据这个图创建几个交换机和队列来测试
发送一个测试消息
路由建和队列名同名
点进队列获取消息
重点:Ack Mode 消息的获取模式,Nack 获取消息后又放回了队列,可重复获取,
ack 自动删除,获取后消息就删除了,不会再回到队列中
Fanout-Exchange 测试
同样点进创建好的交换机,绑定好刚创建的4个队列
测试一个发送
发现有绑定的队列都有收到消息,这是fanout扇出交换机,不写路由建也能发给全部绑定的队列
Topic-Exchange 测试
创建一个测试 Topic 交换机
区别绑定 ,atguigu开头的 绑定路由建为 atguigu.#
以路由建 atguigu.news发送消息
发现都能接受到,因为经过这个路由建匹配,所有队列都匹配上了
再测试,以hello.news为路由建发送消息
发现只有绑定关系为 *.news 的 队列收到
SpringBoot整合RabbitMQ
1、 引入springboot已经准备好的场景启动器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
查看 RabbitAutoConfiguration
发现已经给容器中通过 @Bean
自动 配置了 RabbitTemplate AmqpAdmin CachingConnectionFactory RabbitMessagingTemplate
2、 配置文件配置信息
3、启动类 @EnableRabbit
注解开启
AmqpAdmin使用
1、如何创建 Exchange、Queue、Binding
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
// durable 是否持久化
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
System.out.println("Exchange[hello-java-exchange]创建成功");
}
@Test
public void createQueue() {
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
//exclusive 排他,一条链接连上后 别人就连不上
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
System.out.println("Queue[hello-java-queue]创建成功");
}
@Test
public void createBinding() {
// String destination【目的地】,
// Binding.DestinationType destinationType【目的地类型】可以指定是交换机还是队列,
// String exchange【交换机】,
// String routingKey【路由键】,
// Map<String, Object> arguments【自定义参数】
// 将exchange指定的交换机和destination目的地进行绑定,使用routeingKey作为指定的路由键
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
System.out.println("Binding[hello-Binding]创建成功");
}
如何收发消息 RabbitTemplate使用
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest() {
MsgTest res = new MsgTest();
res.setCreateTime(new Date());
res.setName("哈哈");
//1、发送消息
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", res);
}
对发出的对象消息,要实现序列化接口
如何让发出的对象消息是一个json
在容器放一个消息类型转换器
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
RabbitListener&RabbitHandler接收消息
RabbitListener
在任意服务的实现中测试,监听"hello-java-queue"
队列
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Object message){
System.out.println("接收到消息。。。内容:"+message+"类型"+message);
}
启动服务,会自动接收到之前发送到还未消费的消息
类型为 类型class org.springframework.amqp.core.Message
也可以直接将发送出的消息的对象写在方法参数中,直接获取
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessages(Message message,
OrderEntity orderEntity) {
byte[] body = message.getBody();
MessageProperties properties = message.getMessageProperties();
System.out.println("接收到消息。。。内容:" + orderEntity + "类型" + message.getClass());
}
Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
场景:订单服务启动多个
复制服务,模拟启动多个相同的服务
//发送多条消息
@Test
public void sendMessageTest() {
for (int i = 0; i < 10; i++) {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("哈哈"+i);
//1、发送消息
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity);
}
}
发现只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
RabbitHandler
RabbitListener
:标在类 + 方法上RabbitHandler
:表在方法上
就可以发送不同对象,接收时就可以通过方法参数重载
@Test
public void sendMessageTest() {
for (int i = 0; i < 10; i++) {
if (i%2 == 0){
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("哈哈"+i);
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity);
}else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity);
}
}
}
重载
@RabbitHandler
public void receiveMessage(Message message, OrderEntity order, Channel channel) {}
@RabbitHandler
public void receiveMessage(Message message, OrderReturnReasonEntity order, Channel channel) {}
消息确认机制
可靠投递-发送端确认
RabbitMq有三段确认机制
1、publisher confirmCallback
触发时机,生产者发送消息到服务器,服务器接收到消息了,如果是集群模式,需要所有broker收到
要使用confirmCallback ,需要定制 rabbitTemplate,在配置类中
2、publisher returnCallback
测试 消息抵达队列失败,最简单的错误就是 routeKing 写错了
发送时可以指定一个唯一id
//1、发送消息
rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.java", orderEntity, new CorrelationData(UUID.randomUUID().toString()));
两个回调的打印
Fail message[(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":"嘿嘿","receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=000d3613-d316-4483-abfa-6a4e611555e5, __TypeId__=com.atguigu.gulimall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])] replyCode[312] replyText[NO_ROUTE] exchange[hello-java-exchange] routingKey[hello22.java]
confirm...correlationData[CorrelationData [id=000d3613-d316-4483-abfa-6a4e611555e5]],=>b[true],=>s[null]
两个回调的设置源码
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 定制 RabbitTemplate
* 1、服务器收到消息就回调
* 1、spring.rabbitmq.publisher-confirms=true
* 2、设置确认回调 ConfirmCallback
* 2、消息正确抵达队列
* 1、spring.rabbitmq.publisher-returns=true
* spring.rabbitmq.template.mandatory=true
* 2、设置确认回调 ReturnCallback
* 3、消费端确认(保证每个消息被正确消费,此时broker才可以删除这个消息)
* 1、默认是自动确认的,只要消息被接收到,客户端会自动确认,服务端就会移除这个消息
*
* @PostConstruct MyRabbitConfig对象创建完成后执行这个方法
*/
@PostConstruct
public void initRabbitTemplate() {
//设置消息抵达Broker的确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 1、只要消息抵达Broker ack就为true 不管是否有消费者都会回调
* @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
* @param ack 消息是否成功收到
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm...correlationData[" + correlationData + "],=>b[" + ack + "],=>s[" + cause + "]");
}
});
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只有消息没有投递给指定的队列,才会触发这个失败回调
* @param message 投递失败的消息详细信息
* @param replyCode 恢复的状态码
* @param replyText 恢复的文本内容
* @param exchange 当时发给哪个交换机
* @param routingKey 当时用的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("Fail message[" + message + "] replyCode[" + replyCode + "] replyText[" + replyText + "] exchange[" + exchange + "] routingKey[" + routingKey + "]");
}
});
}
}
可靠投递-消费端确认
消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
测试:
发送了5个消息,队列中有5个消息待消费,debug模拟消息接收,在接收第一个消息时就将服务停止,模拟宕机,发现5个消息全部消失了。原因就是自动 ACK机制。
所以,默认是自动确认的,只要接收到消息,客户端会自动确认,服务端就会移除这个消息
为了保证消息不丢失,手动确认。
配置文件中设置为手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
手动确认模式下,只要我们没有明确告诉MQ,货物被签收,消息就一直是unacked状态,即使Consumer宕机,消息也不会丢失,会重新变为Ready,下次有新的Consumer链接进来就发给他
如何签收:
//这个标识 是 channel 内自增的,不会重复
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// false 非批量签收
channel.basicAck(deliveryTag, false);
拒绝签收:两个方法
// long deliveryTag,
// boolean requeue 是否批量拒收
channel.basicReject(deliveryTag, false);
//long deliveryTag, boolean multiple,
// boolean requeue 是否重新入队
channel.basicNack(deliveryTag, false, false);
RabbitMQ 的应用
RabbitMQ延时队列
消息TTL就是消息的存活时间
TTL消息到一定时间会被服务器丢弃,也就死亡了,可以单独处理
死了的消息可以进入一个 路由,我们称这个路由为死信路由,是一个普通的交换机
流程为
一个延迟队列,该队列不能被任何人监听,待该队列消息过期后经死信路由交换机,进入到一个专门处理过期信息的队列,监听该队列的服务就可以进行处理了
延时队列定时关单模拟
创建这样一个构造关系的交换机 order-event-exchange
,该交换机绑定两个队列,连个绑定关系分别是 order.create.order
和 order.release.order
,对应的两个队列 order.delay.queue
和 order.release.order.queue
其中 order.delay.queue
为延迟队列,
设置的死信交换机为 x-dead-letter-exchange:order-event-exchange
死信路由为 x-dead-letter-routing-key:order.release.order
存活时间 x-message-ttl:60000
消息的一个周期分析:
消费者下单后 产生一个延时关单信息,到 交换机 order-event-exchange
路由为order.create.order
,所以消息路由到 order.delay.queue
这个延时队列,存活时间到期后,这个延时队列根据设置的 死信交换机order-event-exchange
和死信路由 order.release.order
将到期消息发送到 order.release.order.queue
队列,这个队列就有服务监听,处理这个关单信息。
在spring中可以通过往容器放@Bean的方式创建 队列和交换机,
@Configuration
public class MyMQConfig {
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
System.out.println("收到过期订单信息,准备关闭订单"+entity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
//容器中的组建Queue Exchange Binding 都会自动创建(前提是RabbitMQ没有)
@Bean
public Queue orderDelayQueue() {
// 延时队列
// String name, boolean durable, boolean exclusive, boolean autoDelete,
// @Nullable Map<String, Object> arguments :
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");//死信交换机
arguments.put("x-dead-letter-routing-key", "order.release.order");//死信路由键
arguments.put("x-message-ttl", 60000);//消息过期时间 ms 1分钟
return new Queue("order.delay.queue", true, false, false, arguments);
}
@Bean
public Queue orderReleaseOrderQueue() {
//普通队列,接收已经到期的延时消息
return new Queue("order.release.order.queue", true, false, false);
}
@Bean
public Exchange orderEventExchange() {
// String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
// 普通交换机
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderCreateOrderBinding() {
//和延时队列绑定
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
@Bean
public Binding orderReleaseOrderBinding() {
//和普通队列绑定
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
}
随便写一个生产消息的方法
@Controller
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping("/test/creatPOrder")
public String creatPOrderTest() {
OrderEntity entity = new OrderEntity();
entity.setOrderSn("10010");
entity.setModifyTime(new Date());
//给MQ发消息
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", entity);
return "ok";
}
}
创建业务交换机&队列
为锁库存业务加上延时设计
创建各个队列交换机绑定关系
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "stock.release.stock.queue")
public void handle(Message message) {
}
/**
* 使用JSON序列化机制,进行消息转换
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange stockEventExchange() {
return new TopicExchange("stock-event-exchange", true, false);
}
@Bean
public Queue stockReleaseStockQueue() {
return new Queue("stock.release.stock.queue", true, false, false);
}
@Bean
public Queue stockDelayQueue() {
//延迟队列
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");//死信交换机
arguments.put("x-dead-letter-routing-key", "stock.release");//死信路由
arguments.put("x-message-ttl", 120000);//消息过期时间 ms 1分钟
return new Queue("stock.delay.queue", true, false, false, arguments);
}
@Bean
public Binding stockReleaseBinding() {
//和延时队列绑定
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
@Bean
public Binding stockLockedBinding() {
//和普通队列绑定
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
监听库存解锁
库存解锁的场景
1、下单成功,订单过期没有支付系统被系统自动取消和被用户手动取消。需要解锁库存/
2、下订单成功,库存锁定成功接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁。
库存服务中,确认商品库存成功后,锁定这个商品的库存,发送延时队列的消息,同时在mysql中保存这次锁单的详情,wms_ware_order_task
表保存这次锁单的详情,对应此表的子表wms_ware_order_task_detail
保存此次锁单中的各个商品
库存解锁逻辑
监听释放了的消息,结合存入mysql的锁单详情。
记录一个报错
Caused by: org.springframework.web.client.RestClientException: Could not extract response: no suitable HttpMessageConverter found for response type [class com.atguigu.common.utils.R] and content type [text/html;charset=UTF-8]
无法返回一个页面,因为远程调用订单服务的请求,被订单服务的拦截器拦截,跳转至了登陆页面,所以R对象无法返回一个页面
解决办法,在订单服务的拦截器中,获取一个匹配路径,放行指定匹配路径的请求,不做拦截
String uri = request.getRequestURI();
boolean match = new AntPathMatcher().match("/order/order/status/**", uri);
if (match){
return true;
}
定时关单完成
消息丢失、积压、重复解决方案
try {
//TODO 保证消息一定发送出去,每一个消息都可以做一个日志记录(给数据库保存每一个消息的详细信息)
//定期扫描数据库将失败的信息再发一次
rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderEntity);
}catch (Exception e){
//TODO 将没发送成功的消息进行重试发送
}
1、做好消息确认机制(publisher、consumer【手动ack】)
2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍
end