RabbitMQ核心思想

RabbitMQ核心思想

RabbitMQ核心思想

MQ是干什么用的?

应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等...


  • 当前最主流的消息中间件。
  • 高可用性,支持发送确认,投递确认等特性
  • 高可用,支持镜像队列
  • 支持插件

优点:

  1. 基于 Erlang, 支持高并发
  2. 支持多种平台,多种客户端,文档齐全
  3. 可靠性高
  4. 在互联网公司有较大规模等使用,社区活跃度高

1. AMQP协议介绍

RabbitMQ核心思想

Broker :接受和分发消息等的应用,RabbitMQ就是Message

Virtual Host : 虚拟机Broker , 将多个单元隔离开

Connection : publisher / consumer 和 broker之间的TCP连接

Channel : connection内部建立的逻辑连接,通常没个线程创建单独的channel

Rounting Key : 路由键,用来只是消息的路由转发,相当于快递的地址

Exchange : 交换机 ,相当于快递的分拨中心

Queue : 消息队列,消息最终被送到这里等待consumer 取走

Binding : exchange 和 queue之间的虚拟连接,用于message的分发依据


AMQP协议的核心概念-Exchange

  • 在AMQP协议或者是RabbitMQ实现中,最核心的组件是Exchange
  • Exchange 承担 RabbitMQ 的核心功能 --- 路由转发
  • Exchange 有多个种类,配置多变,需要详细讲解

RabbitMQ核心 -- Exchange解析

  1. Exchange是 AMQP 协议和RabbitMQ的 核心组件
  2. Exchange的功能是根据 绑定关系 和 路由键为消息提供路由,将消息转发至相应的队列
  3. Exchange有4种类型 :Direct / Topic / Fabout /Headers

Direct Exchange (直接路由)

  • Message中的Routing Key 如果和 Binding Key 一致, Direct Exchange 则将 message 发到对应的 queue中

RabbitMQ核心思想

Fanout Exchange (广播路由)

  • 每个发到 Fanout Exchange 的 message 都会分发到所有绑定到queue上去

RabbitMQ核心思想

Topit Exchange (话题路由)

  • 根据 Routing Key 及通配规则,Topic Exchange 将消息分发目标 Queue中
  • 全匹配 :与Direct 类似
  • Binding Key 中的 #:匹配任意个数的word

RabbitMQ核心思想

2. Docker 安装 RabbitMQ


    docker pull rabbitmq
    

这里是直接安装最新的


    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq

访问 : http://IP地址:15672

RabbitMQ核心思想
用户名和密码默认都是guest


3. RabbitMQ保证消息的可靠性

  • 需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理
  • 需要使用RabbitMQ消息返回机制,若没发现目标队列,中间件会通知发送方
  • 需要使RabbitMQ消息端确认消息,确认消息没有发生异常
  • 需要使用RabbitMQ消费端限流机制,限制消息推送速度 ,保障接受端服务稳定
  • 大量到堆积消息会给RabbitMQ产生很大到压力,需要使用RabbitMQ消息过期时间,防止消息大量积压
  • 过期后会直接丢弃, 不符合业务逻辑,需要使用RabbitMQ死信队列,收集过期消息,以供分析


4. 发送确认机制原理



消息真的发出去了吗?

  • 消息发送后,发送端不知道RabbitMQ是否真的收到了消息,若RabbitMQ异常,消息丢失,业务异常,这个时候我们就需要使用RabbitMQ发送端确认机制,确认消息发送

三种确认机制


1. 单条同步确认
  • 配置channel,开启确认模式:channel.confirmSelect()
  • 每发送一条消息,调用channel.waitForConfirms()方法等待确认
         //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        if(channel.waitForConfirms()){
            //表示发送确认处理逻辑
        }else{
            //发送失败
        }
2. 多条同步确认
  • 配置channel,开启确认模式:channel.confirmSelect()
  • 发送多条消息后,调用channel.waitForConfirms()方法等待确认
         //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        if(channel.waitForConfirms()){
            //表示发送确认处理逻辑
        }else{
            //发送失败
        }
3. 异步确认
  • 配置channel,开启确认模式:channel.confirmSelect()
  • 在channel上添加监听: addConfirmListener , 发送消息后,会回调此方法,通知是否发送成功
  • 异步确认有可能是单条,也有可能是多条,取决于MQ
        //建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String message = objectMapper.writeValueAsString(orderMessageDTO);
        channel.confirmSelect();
        channel.basicPublish(
                "exhange.order.restaurant",
                "key.restaurant",
                null,
                message.getBytes());
        ConfirmListener confirmListener = new ConfirmListener(){

            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Ack  " + deliveryTag + multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack  " + deliveryTag + multiple);
            }
        };
        channel.addConfirmListener(confirmListener);

5. 消息返回机制


消息真被路由了吗?

  • 消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃,业务异常,需要使用RabbitMQ消息返回机制,确认消息被正确路由

消息的开启方法:

  1. 在RabbitMQ基础配置中又一个关键配置项:Mandatory
  2. Mandatory若为false,RabbitMQ将直接丢弃无法路由的消息
  3. Mandatory若为true,RabbitMQ才会处理无法路由的消息

RabbitMQ核心思想

DeliverCallback deliverCallback = ((consumerTag, message) -> {
        //拿到消息
        String messageBody = new String(message.getBody());
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        try {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("Message Return:");
                    //处理失败的业务逻辑
                }
            });
            channel.basicPublish(
                    "exhange.order.restaurant",
                    "key.restaurant",
                    true,
                    null,
                    messageBody.getBytes());

        }catch (Exception e){
            log.error(e.getMessage());
        }

    });

6. 消费端确认机制


消费端处理异常怎么办?

  • 默认情况下,消费端接收消息时,消息会被自动确认(ACK),发生异常时,发送端与消息中间件无法得知消息处理情况,需要使用RabbitMQ 消息端确认机制,确认消息被正确处理

消费端ACK类型

手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息


- 单条手动ACK : multiple = false

- 多条手动ACK : multiple = true

- 推荐使用单条ACK

    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);


    channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);

自动ACK:消费端收到消息后,会自动签收消息


7. 消费端限流机制

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推给接受端,造成接受端服务奔溃

在高并发端场景下,有个微服务奔溃了,本科期间队列挤压了大量消息,微服务上线后,收到大量并发消息。将同样多端消息推给能力不同端副本,会导致部分副本异常

针对以上问题,RabbitMQ 开发了 Qos (服务质量保证) 功能,Qos功能保证了在一定树木消息违背确认前,不消费新的消息

//这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
channel.basicQos(1);

8. RabbitMQ的过期时间(TTL)

  • RabbitMQ的过期时间称为 TTL (time to live), 生存时间
  • RabbitMQ的过期时间分为消息TTL 和 队列 TTL
  • 消息TTL设置了单条消息的过期时间
  • 队列TTL设置了队列中所有消息的过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .deliveryMode(2) //deliveryMode=1代表不持久化,deliveryMode=2代表持久化
            .contentEncoding("UTF-8") // 编码方式
            .expiration("10000") // 过期时间
            .headers(headers) //自定义属性
            .build();


String messageBody = "发送的消息"

//发送通道
channel.basicPublish(
                    "exhange.order.restaurant",
                    "key.restaurant",
                    true,
                    properties,
                    messageBody.getBytes());

9. 死信队列

如何转移过期的消息?

  • 消息被设置了过期时间,过期后会直接被丢弃,直接被丢弃的消息无法对系统运行异常发出警报,需要使用RabbitMQ死信队列,收集过期消息,以供分析

什么是死信队列

  • 队列被配置了DLX属性 (Dead-Letter-Exchange) 当一个消息变成死信(dead message)后,能重新被发布到另一个 Exchange , 这个Exchange也是一个普通交换机,死信被死信交换机路由后,一般进入一个固定队列

RabbitMQ核心思想

怎么变成死信
  • 消息被拒绝 (reject / nack) 并且 requeue = false
  • 消息过期(TTL到期)
  • 队列达到最大长度

个人博客地址:http://blog.yanxiaolong.cn/

上一篇:Redisson官方文档 - 13. 工具


下一篇:Redis主从复制与优化