RabbitMQ

1、RabbitMQ的作用

异步处理

RabbitMQ

应用解耦

RabbitMQ

流量控制、削峰

RabbitMQ


2、概述

1)消息中间件有两个重要的概念:消息代理和目的地

当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地


2)消息队列主要有两种形式的目的地

队列(queue)点对点的消息通信(point to point)

主题(topic) 发布(publish) / 订阅(subscribe)消息通信


3)点对点模式

消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列获取消息,消息读取后被移出队列

消息只有唯一的发送者和接收者,并不是说只能有一个接收者


4)发布订阅模式

发送者发送消息到主题,多个接收者监听这个主题,那么会在消息到达时同时收到消息


5)JMS(Java Message Service)

基于JVM消息代理的规范。ActiveMQ是JMS的实现


6)AMQP(Advanced Message Queuing Protocal)

高级消息队列协议,也是一个消息代理的规范,兼容JMS

RabbitMQ是AMQP的实现

JMS和AMQP的区别:

RabbitMQ


7)Spring支持

spring-jms提供了对JMS的支持

spring-rabbit提供了对AMQP的支持

需要ConnectionFactory的实现来连接消息代理

提供JmsTemplate、RabbitTemplate来发送消息

@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息

@EnableJms、@EnableRabbit开启支持


8)Spring Boot自动配置

JmsAutoConfiguration RabbitAutoConfiguration


9)市面的MQ产品

ActiveMQ、RabbitMQ、RocketMQ、Kafka


3、RabbitMQ的概念

RabbitMQ


1)Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等


2)Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序


3)Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别


4)Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走


5)Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。


6)Connection

网络连接,比如一个TCP连接


7)Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接


8)Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。


9)Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /


10)Broker



4、Docker安装RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

4369, 25672 (Erlang发现&集群端口) 5672, 5671 (AMQP端口)

15672 (web管理后台端口) 61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)


5、RabbitMQ中的Exchange类型

1)Direct.Exchange

交换机会根据消息的路由键将消息发送到某个队列上,是完全匹配单播的模式

RabbitMQ


2)Fanout.Exchange

RabbitMQ


3)Topic Exchange

RabbitMQ


6、SpringBoot整合RabbitMQ

1)pom文件中引入spring-boot-starter-amqp

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


2)yml文件中配置RabbitMQ的地址和端口

spring:  #配置数据源
  rabbitmq:
    host: 121.40.182.123
    port: 5672   #RabbitMQ客户端需要连接的端口


3)创建交换机Exchange,队列Queue,绑定 Binding

①、在RabbitMQ管理端进行手动创建

RabbitMQ


②、通过amqpAdmin在代码中进行创建

/**
 * 创建交换机
 */
@Test
public void createExchange(){
    DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
    amqpAdmin.declareExchange(directExchange);
    log.info("Exchange[{}]创建成功","hello-java-exchange");
}


/**
 * 创建队列
 */
@Test
public void createQueue(){
    Queue queue = new Queue("hello-java-queue",false,false,false);
    amqpAdmin.declareQueue(queue);
    log.info("Queue[{}]创建成功","hello-java-queue");
}


/**
 * @Description: 将交换机和队列进行绑定
 * @param
 * @author  houChen
 * @date  2022/1/24 20:51
 */
@Test
public void createBinding(){
    // public Binding(String destination【目的地】,
    // Binding.DestinationType destinationType【目的地的类型:Queue Exchange】, String exchange【交互机】, String routingKey【路由键】, Map<String, Object> arguments【其他参数】)

    // 整体就是: 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键
    Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
    amqpAdmin.declareBinding(binding);
    log.info("Binding[{}]创建成功","hello-java-binding");


4)发送消息

@Test
public void sendMessage(){
    OrderEntity orderEntity = new OrderEntity();
    orderEntity.setId(1L);
    orderEntity.setCreateTime(new Date());
    orderEntity.setTotalAmount(new BigDecimal(39));
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity);
    log.info("消息发送成功: {}","hello world");
}


5)接收消息

/*
接收消息

        参数可以写一下类型
        1、Message message : 原生消息详细信息 消息头 + 消息体
        2、T<发送的消息的类型> t
        3、Channel channel : 当前传输数据的通道

        【注意】
           Queue: 一个Queue可以很多个客户端来监听,只要有一个客户端收到消息,Queue就会删除消息,有且仅有一个客户端会接收到消息

           场景:
           1、当订单服务启动多个时,发出一个消息时,有且仅有一个客户端会收到消息
           2、一个客户端,只有当一个消息处理完,才会接受另一个消息

 */
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel){
    System.out.println("接收到消息:" + content);
    //channel内按顺序递增的
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    // deliveryTag: 消息投递标签   multi: 是否批量确认
    try {
        if(deliveryTag%2==0){
            System.out.println("手动签收");
            //参数: long deliveryTag,boolean multiple
            channel.basicAck(deliveryTag,false);
        }else{
            System.out.println("拒绝签收");
            //参数  : long deliveryTag,boolean multiple,boolean require
            //require = false : 直接将该消息丢弃    require = true: 将该消息发回服务器,服务器重新入队
            channel.basicNack(deliveryTag,false,false);
            //channel.basicReject();
        }
    } catch (IOException e) {
        //网络中断
        e.printStackTrace();
    }
}


7、RabbitMQ消息确认机制 - 可靠到达

RabbitMQ



1)broker收到消息进行回调

①、yml文件中添加配置:

spring.rabbitmq.publisher-confirms= true # 开启发送端抵达broker的确认


②、rabbitTemplate中设置confirmCallback()

@Slf4j
@Configuration
public class RabbitConfg {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 消息发送的序列化机制
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     */
    @PostConstruct
    public void initRabbitTemplate(){
        //设置确认回调
        RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            /**
             * @param correlationData  当前消息的唯一关联数据(消息的唯一ID)
             * @param ack              消息是否成功收到  (生产者的消息是否正确投递到Exchange)
             * @param cause            消息投递失败的原因
             */
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("confirm...correlationData[{}] ack[{}] cause[{}]",correlationData,ack,cause);
            }
        };
        rabbitTemplate.setConfirmCallback(confirmCallback);
    }
}


2)消息正确抵达队列进行回调

①、yml文件进行配置

spring.rabbitmq.publisher-returns= true # 开启发送端抵达队列的确认

spring.rabbitmq.template.mandatory= true # 只要抵达队列,以异步发送优先回调我们的这个return


②、设置消息抵达队列的回调

//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    /**
     * 只要消息没有投递给指定的队列,就会触发这个失败回调
     * @param message      投递失败消息的详细信息
     * @param replyCode    回复的状态码
     * @param replyText    回复的文本内容
     * @param exchange     当时这个消息发送给哪个交换机
     * @param routingKey   当时这个消息使用的是哪个路由键
     */
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("Fail message[{}] replyCode[{}] replyText[{}] exchange[{}] routingKey[{}]",message,replyCode,replyText,exchange,routingKey);
    }
});


3)消费端消息确认

yml文件中进行配置

spring.rabbitmq.listener.simple.acknowledge-mode: manual


消费端代码对消息进行处理后,进行手动确认

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel){
    System.out.println("接收到消息:" + content);
    //channel内按顺序递增的
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    // deliveryTag: 消息投递标签   multi: 是否批量确认
    try {
        if(deliveryTag%2==0){
            System.out.println("手动签收");
            //参数: long deliveryTag,boolean multiple
            channel.basicAck(deliveryTag,false);
        }else{
            System.out.println("拒绝签收");
            //参数  : long deliveryTag,boolean multiple,boolean require
            //require = false : 直接将该消息丢弃    require = true: 将该消息发回服务器,服务器重新入队
            channel.basicNack(deliveryTag,false,false);
            //channel.basicReject();
        }
    } catch (IOException e) {
        //网络中断
        e.printStackTrace();
    }
}
上一篇:开发工具 -- gcc编译器使用


下一篇:基于 Rancher 的企业 CICD 环境搭建