消息队列 RabbitMq 的学习和应用

消息队列 RabbitMq 的学习和应用

MQ简介

队列类似一种List结构,专门来存储数据的一个队列

使用场景

消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用

RabbitMq简介

消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用
JMS和AMQP的区别
消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用

RabbitMQ的概念

消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用
生产者和消费者都是建立在长链接上
消息队列 RabbitMq 的学习和应用

docker 安装

消息队列 RabbitMq 的学习和应用
下载并启动这个容器
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
容器自启
docker update rabbitmq --restart=always

测试使用 Exchange类型

一个交换机可以绑定很多队列,队列可以被多个交换机绑定
消息队列 RabbitMq 的学习和应用
有4种类型的交换机,只学习前三种
消息队列 RabbitMq 的学习和应用
直接交换机,将消息直接交给一个指定的队列,消息最终只能到达一个队列

消息队列 RabbitMq 的学习和应用

广播模式交换机,不分路由键,直接把消息交给所有绑定了的队列
消息队列 RabbitMq 的学习和应用
主题模式交换机,部分广播,匹配式

消息队列 RabbitMq 的学习和应用
创建测试交换机
消息队列 RabbitMq 的学习和应用
type:选择交换机类型
Durability:持久化和临时的,默认持久,重启虚拟机交换机不被删除
Auto delete: 是否自动删除,如果没绑定队列会自动删除
Internal:是否是内部交换机,只供内部使用的

创建一个队列

消息队列 RabbitMq 的学习和应用
点进刚才创建的交换机,进行绑定

消息队列 RabbitMq 的学习和应用

Direct-Exchange 测试

根据这个图创建几个交换机和队列来测试
消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用
发送一个测试消息
消息队列 RabbitMq 的学习和应用
路由建和队列名同名
消息队列 RabbitMq 的学习和应用
点进队列获取消息
消息队列 RabbitMq 的学习和应用
重点:Ack Mode 消息的获取模式,Nack 获取消息后又放回了队列,可重复获取,
消息队列 RabbitMq 的学习和应用
ack 自动删除,获取后消息就删除了,不会再回到队列中

Fanout-Exchange 测试

消息队列 RabbitMq 的学习和应用
同样点进创建好的交换机,绑定好刚创建的4个队列

消息队列 RabbitMq 的学习和应用
测试一个发送
消息队列 RabbitMq 的学习和应用
发现有绑定的队列都有收到消息,这是fanout扇出交换机,不写路由建也能发给全部绑定的队列
消息队列 RabbitMq 的学习和应用

Topic-Exchange 测试

创建一个测试 Topic 交换机消息队列 RabbitMq 的学习和应用
区别绑定 ,atguigu开头的 绑定路由建为 atguigu.#

消息队列 RabbitMq 的学习和应用
以路由建 atguigu.news发送消息
消息队列 RabbitMq 的学习和应用
发现都能接受到,因为经过这个路由建匹配,所有队列都匹配上了
消息队列 RabbitMq 的学习和应用
再测试,以hello.news为路由建发送消息
消息队列 RabbitMq 的学习和应用
发现只有绑定关系为 *.news 的 队列收到
消息队列 RabbitMq 的学习和应用

SpringBoot整合RabbitMQ

消息队列 RabbitMq 的学习和应用

1、 引入springboot已经准备好的场景启动器

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

查看 RabbitAutoConfiguration 发现已经给容器中通过 @Bean自动 配置了 RabbitTemplate AmqpAdmin CachingConnectionFactory RabbitMessagingTemplate

2、 配置文件配置信息
消息队列 RabbitMq 的学习和应用
3、启动类 @EnableRabbit注解开启

AmqpAdmin使用

1、如何创建 Exchange、Queue、Binding

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createExchange() {
    //String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
    // durable 是否持久化
    DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
    amqpAdmin.declareExchange(directExchange);
    System.out.println("Exchange[hello-java-exchange]创建成功");
}

@Test
public void createQueue() {
    //String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
    //exclusive 排他,一条链接连上后 别人就连不上
    Queue queue = new Queue("hello-java-queue", true, false, false);
    amqpAdmin.declareQueue(queue);
    System.out.println("Queue[hello-java-queue]创建成功");
}

@Test
public void createBinding() {
    // String destination【目的地】,
    // Binding.DestinationType destinationType【目的地类型】可以指定是交换机还是队列,
    // String exchange【交换机】,
    // String routingKey【路由键】,
    // Map<String, Object> arguments【自定义参数】
    // 将exchange指定的交换机和destination目的地进行绑定,使用routeingKey作为指定的路由键
    Binding binding = new Binding("hello-java-queue",
            Binding.DestinationType.QUEUE,
            "hello-java-exchange",
            "hello.java",
            null);
    amqpAdmin.declareBinding(binding);
    System.out.println("Binding[hello-Binding]创建成功");
}

如何收发消息 RabbitTemplate使用

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest() {
    MsgTest res = new MsgTest();
    res.setCreateTime(new Date());
    res.setName("哈哈");
    //1、发送消息
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", res);

}

对发出的对象消息,要实现序列化接口

如何让发出的对象消息是一个json
在容器放一个消息类型转换器

@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

RabbitListener&RabbitHandler接收消息

RabbitListener

在任意服务的实现中测试,监听"hello-java-queue"队列

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Object message){
    System.out.println("接收到消息。。。内容:"+message+"类型"+message);
}

启动服务,会自动接收到之前发送到还未消费的消息
类型为 类型class org.springframework.amqp.core.Message

也可以直接将发送出的消息的对象写在方法参数中,直接获取

    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveMessages(Message message,
                               OrderEntity orderEntity) {
        byte[] body = message.getBody();
        MessageProperties properties = message.getMessageProperties();
        System.out.println("接收到消息。。。内容:" + orderEntity + "类型" + message.getClass());
    }

Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息

场景:订单服务启动多个

复制服务,模拟启动多个相同的服务
消息队列 RabbitMq 的学习和应用

//发送多条消息
@Test
public void sendMessageTest() {
    for (int i = 0; i < 10; i++) {
        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
        reasonEntity.setId(1L);
        reasonEntity.setCreateTime(new Date());
        reasonEntity.setName("哈哈"+i);
        //1、发送消息
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity);
    }
}



发现只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息

RabbitHandler

RabbitListener :标在类 + 方法上
RabbitHandler :表在方法上

就可以发送不同对象,接收时就可以通过方法参数重载

@Test
public void sendMessageTest() {

    for (int i = 0; i < 10; i++) {
        if (i%2 == 0){
            OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
            reasonEntity.setId(1L);
            reasonEntity.setCreateTime(new Date());
            reasonEntity.setName("哈哈"+i);
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity);
        }else {
            OrderEntity orderEntity = new OrderEntity();
            orderEntity.setOrderSn(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity);
        }
    }

}

重载


@RabbitHandler
public void receiveMessage(Message message, OrderEntity order, Channel channel) {}

@RabbitHandler
public void receiveMessage(Message message, OrderReturnReasonEntity order, Channel channel) {}

消息确认机制

可靠投递-发送端确认

消息队列 RabbitMq 的学习和应用
RabbitMq有三段确认机制
1、publisher confirmCallback
触发时机,生产者发送消息到服务器,服务器接收到消息了,如果是集群模式,需要所有broker收到

消息队列 RabbitMq 的学习和应用
要使用confirmCallback ,需要定制 rabbitTemplate,在配置类中

2、publisher returnCallback

消息队列 RabbitMq 的学习和应用
测试 消息抵达队列失败,最简单的错误就是 routeKing 写错了

发送时可以指定一个唯一id

//1、发送消息
rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.java", orderEntity, new CorrelationData(UUID.randomUUID().toString()));

两个回调的打印

Fail message[(Body:'{"id":null,"memberId":null,"orderSn":null,"couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":"嘿嘿","receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={spring_returned_message_correlation=000d3613-d316-4483-abfa-6a4e611555e5, __TypeId__=com.atguigu.gulimall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])] replyCode[312] replyText[NO_ROUTE] exchange[hello-java-exchange] routingKey[hello22.java]
confirm...correlationData[CorrelationData [id=000d3613-d316-4483-abfa-6a4e611555e5]],=>b[true],=>s[null]

两个回调的设置源码

@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 定制 RabbitTemplate
     * 1、服务器收到消息就回调
     *      1、spring.rabbitmq.publisher-confirms=true
     *      2、设置确认回调 ConfirmCallback
     * 2、消息正确抵达队列
     *      1、spring.rabbitmq.publisher-returns=true
     *      spring.rabbitmq.template.mandatory=true
     *      2、设置确认回调 ReturnCallback
     * 3、消费端确认(保证每个消息被正确消费,此时broker才可以删除这个消息)
     *      1、默认是自动确认的,只要消息被接收到,客户端会自动确认,服务端就会移除这个消息
     *
     * @PostConstruct MyRabbitConfig对象创建完成后执行这个方法
     */
    @PostConstruct
    public void initRabbitTemplate() {
        //设置消息抵达Broker的确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 1、只要消息抵达Broker ack就为true 不管是否有消费者都会回调
             * @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...correlationData[" + correlationData + "],=>b[" + ack + "],=>s[" + cause + "]");

            }
        });
        //设置消息抵达队列的确认回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只有消息没有投递给指定的队列,才会触发这个失败回调
             * @param message    投递失败的消息详细信息
             * @param replyCode  恢复的状态码
             * @param replyText  恢复的文本内容
             * @param exchange   当时发给哪个交换机
             * @param routingKey 当时用的路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail message[" + message + "] replyCode[" + replyCode + "] replyText[" + replyText + "] exchange[" + exchange + "] routingKey[" + routingKey + "]");
            }
        });
    }
}

可靠投递-消费端确认

消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)

测试:
发送了5个消息,队列中有5个消息待消费,debug模拟消息接收,在接收第一个消息时就将服务停止,模拟宕机,发现5个消息全部消失了。原因就是自动 ACK机制。

所以,默认是自动确认的,只要接收到消息,客户端会自动确认,服务端就会移除这个消息

为了保证消息不丢失,手动确认。
配置文件中设置为手动模式

spring.rabbitmq.listener.simple.acknowledge-mode=manual

手动确认模式下,只要我们没有明确告诉MQ,货物被签收,消息就一直是unacked状态,即使Consumer宕机,消息也不会丢失,会重新变为Ready,下次有新的Consumer链接进来就发给他

如何签收:

//这个标识 是 channel 内自增的,不会重复
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// false 非批量签收
channel.basicAck(deliveryTag, false);

拒绝签收:两个方法

// long deliveryTag, 
// boolean requeue 是否批量拒收
channel.basicReject(deliveryTag, false);

//long deliveryTag, boolean multiple, 
// boolean requeue 是否重新入队
channel.basicNack(deliveryTag, false, false);

RabbitMQ 的应用

RabbitMQ延时队列

消息队列 RabbitMq 的学习和应用

消息TTL就是消息的存活时间
消息队列 RabbitMq 的学习和应用
TTL消息到一定时间会被服务器丢弃,也就死亡了,可以单独处理
消息队列 RabbitMq 的学习和应用
死了的消息可以进入一个 路由,我们称这个路由为死信路由,是一个普通的交换机
消息队列 RabbitMq 的学习和应用
流程为

一个延迟队列,该队列不能被任何人监听,待该队列消息过期后经死信路由交换机,进入到一个专门处理过期信息的队列,监听该队列的服务就可以进行处理了
消息队列 RabbitMq 的学习和应用

延时队列定时关单模拟

消息队列 RabbitMq 的学习和应用
创建这样一个构造关系的交换机 order-event-exchange ,该交换机绑定两个队列,连个绑定关系分别是 order.create.orderorder.release.order ,对应的两个队列 order.delay.queueorder.release.order.queue 其中 order.delay.queue为延迟队列,
设置的死信交换机为 x-dead-letter-exchange:order-event-exchange 死信路由为 x-dead-letter-routing-key:order.release.order 存活时间 x-message-ttl:60000

消息的一个周期分析:

消费者下单后 产生一个延时关单信息,到 交换机 order-event-exchange 路由为order.create.order,所以消息路由到 order.delay.queue 这个延时队列,存活时间到期后,这个延时队列根据设置的 死信交换机order-event-exchange 和死信路由 order.release.order 将到期消息发送到 order.release.order.queue 队列,这个队列就有服务监听,处理这个关单信息。

在spring中可以通过往容器放@Bean的方式创建 队列和交换机,

@Configuration
public class MyMQConfig {

    @RabbitListener(queues = "order.release.order.queue")
    public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
        System.out.println("收到过期订单信息,准备关闭订单"+entity.getOrderSn());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }


    //容器中的组建Queue Exchange Binding 都会自动创建(前提是RabbitMQ没有)
    @Bean
    public Queue orderDelayQueue() {
        // 延时队列
        // String name, boolean durable, boolean exclusive, boolean autoDelete,
        //			@Nullable Map<String, Object> arguments :
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange");//死信交换机
        arguments.put("x-dead-letter-routing-key", "order.release.order");//死信路由键
        arguments.put("x-message-ttl", 60000);//消息过期时间 ms 1分钟
        return new Queue("order.delay.queue", true, false, false, arguments);
    }

    @Bean
    public Queue orderReleaseOrderQueue() {
        //普通队列,接收已经到期的延时消息
        return new Queue("order.release.order.queue", true, false, false);
    }

    @Bean
    public Exchange orderEventExchange() {

        // String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
        // 普通交换机
        return new TopicExchange("order-event-exchange", true, false);
    }

    @Bean
    public Binding orderCreateOrderBinding() {

        //和延时队列绑定
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",
                null);
    }

    @Bean
    public Binding orderReleaseOrderBinding() {

        //和普通队列绑定
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",
                null);
    }


}

随便写一个生产消息的方法

@Controller
public class HelloController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @ResponseBody
    @GetMapping("/test/creatPOrder")
    public String creatPOrderTest() {
        OrderEntity entity = new OrderEntity();
        entity.setOrderSn("10010");
        entity.setModifyTime(new Date());

        //给MQ发消息
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", entity);
        return "ok";
    }
}

创建业务交换机&队列

为锁库存业务加上延时设计
消息队列 RabbitMq 的学习和应用
创建各个队列交换机绑定关系

@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "stock.release.stock.queue")
    public void handle(Message message) {

    }

    /**
     * 使用JSON序列化机制,进行消息转换
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public Exchange stockEventExchange() {
        return new TopicExchange("stock-event-exchange", true, false);
    }

    @Bean
    public Queue stockReleaseStockQueue() {
        return new Queue("stock.release.stock.queue", true, false, false);
    }

    @Bean
    public Queue stockDelayQueue() {
        //延迟队列
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");//死信交换机
        arguments.put("x-dead-letter-routing-key", "stock.release");//死信路由
        arguments.put("x-message-ttl", 120000);//消息过期时间 ms 1分钟
        return new Queue("stock.delay.queue", true, false, false, arguments);
    }


    @Bean
    public Binding stockReleaseBinding() {

        //和延时队列绑定
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
    }

    @Bean
    public Binding stockLockedBinding() {

        //和普通队列绑定
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }

}

监听库存解锁

库存解锁的场景
1、下单成功,订单过期没有支付系统被系统自动取消和被用户手动取消。需要解锁库存/

2、下订单成功,库存锁定成功接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁。

库存服务中,确认商品库存成功后,锁定这个商品的库存,发送延时队列的消息,同时在mysql中保存这次锁单的详情,wms_ware_order_task表保存这次锁单的详情,对应此表的子表wms_ware_order_task_detail 保存此次锁单中的各个商品

库存解锁逻辑

监听释放了的消息,结合存入mysql的锁单详情。

记录一个报错

Caused by: org.springframework.web.client.RestClientException: Could not extract response: no suitable HttpMessageConverter found for response type [class com.atguigu.common.utils.R] and content type [text/html;charset=UTF-8]

无法返回一个页面,因为远程调用订单服务的请求,被订单服务的拦截器拦截,跳转至了登陆页面,所以R对象无法返回一个页面
解决办法,在订单服务的拦截器中,获取一个匹配路径,放行指定匹配路径的请求,不做拦截

String uri = request.getRequestURI();
boolean match = new AntPathMatcher().match("/order/order/status/**", uri);

if (match){
    return true;
}

定时关单完成

消息丢失、积压、重复解决方案

消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用

try {
    //TODO 保证消息一定发送出去,每一个消息都可以做一个日志记录(给数据库保存每一个消息的详细信息)
    //定期扫描数据库将失败的信息再发一次
    rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderEntity);
}catch (Exception e){
    //TODO 将没发送成功的消息进行重试发送
}

1、做好消息确认机制(publisher、consumer【手动ack】)
2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍

消息队列 RabbitMq 的学习和应用
消息队列 RabbitMq 的学习和应用

end

上一篇:Exchange Server 命令行


下一篇:RabbitMq 的理论及应用示例(一)