RabbitMQ如何保证发送的消息可靠(重要)

RabbitMQ如何保证发送的消息可靠

  • 1、消息可靠性
    • 1.1、消息可靠性投递概念
    • 1.2、确保消息发送到RabbitMQ服务器的交换机上(需开启Confirm(确认)模式)
    • 1.3、确保消息路由到正确的队列(使用return模式或备用交换机)
    • 1.4、确保消息在队列正确地存储
      • 1.4.1、队列持久化
      • 1.4.2、交换机持久化
      • 1.4.3、消息持久化
      • 1.4.4、集群,镜像队列,高可用
    • 1.5、确保消息从队列正确地投递到消费者
  • 2、RabbitMQ消息Confirm模式(保证从生产者到交换机的消息可靠)
    • 2.1、Confirm模式简介
    • 2.2、具体代码实现
      • 2.2.1、application.yml 开启确认模式
      • 2.2.2、生产者
        • 方式1:实现RabbitTemplate.ConfirmCallback
          • 生产者发送消息
        • 方式2:直接写在生产者发送消息类,实现RabbitTemplate.ConfirmCallback
        • 方式3:匿名内部类写法
        • 方式4:lambda表达式写法
      • 2.2.3、RabbitConfig做交换机和队列的绑定
      • 2.2.4、pom.xml配置文件
      • 2.2.5、测试
  • 3、RabbitMQ消息Return模式(保证从交换机的到队列的消息可靠)
    • 3.1、具体代码实现
      • 3.1.1、applicaton.yml
      • 3.1.2、pom.xml
      • 3.1.3、启动类
      • 3.1.4、业务层
        • 方式1:实现RabbitTemplate.ReturnsCallback
          • 回调类MyReturnCallback
          • 配置类
          • service业务层
        • 方式2:MessageService类实现RabbitTemplate.ReturnsCallback
        • 方式3:匿名内部类实现RabbitTemplate.ReturnsCallback
        • 方式4:lambda表达式实现RabbitTemplate.ReturnsCallback
      • 3.1.5、测试
  • 4、确保消息正确地存储(需保证交换机,队列,消息持久化)
    • 4.1、交换机持久化
    • 4.2、队列持久化
    • 4.3、消息持久化
  • 5、确保消息从队列正确地投递到消费者(需开启消费者手动确认模式)
  • 6、总结

1、消息可靠性

1.1、消息可靠性投递概念

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;
如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。
在这里插入图片描述
① 代表消息从生产者发送到Exchange;
② 代表消息从Exchange路由到Queue;
③ 代表消息在Queue中存储;
④ 代表消费者监听Queue并消费消息;

1.2、确保消息发送到RabbitMQ服务器的交换机上(需开启Confirm(确认)模式)

可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中;
有两种解决方案:
第一种是开启Confirm(确认)模式;(异步)
第二种是开启Transaction(事务)模式;(性能低,实际项目中很少用)

在这里插入图片描述

1.3、确保消息路由到正确的队列(使用return模式或备用交换机)

可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。
使用return模式,可以实现消息无法路由的时候返回给生产者;
当然在实际生产环境下,我们不会出现这种问题,我们都会进行严格测试才会上线(很少有这种问题);

另一种方式就是使用备份交换机(alternate-exchange)(备份交换机的创建可以参考:https://editor.****.net/md/?articleId=143615895),无法路由的消息会发送到这个备用交换机上。(备份交换机选择扇形交换机,扇形交换机消息可以发送到所有与之连接的队列,再通过消费者监听该队列,如果该队列有消息,证明正常交换机没有正常发送消息到正常队列,此时消费者可以邮件、短信、警告,记录日志方式提醒系统运维人员)
在这里插入图片描述

1.4、确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;
解决方案:

1.4.1、队列持久化

代码:

QueueBuilder.durable(QUEUE).build();

1.4.2、交换机持久化

代码:

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

1.4.3、消息持久化

代码:
默认持久化

MessageProperties messageProperties = new MessageProperties();
//设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码
 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 

1.4.4、集群,镜像队列,高可用

1.5、确保消息从队列正确地投递到消费者

在这里插入图片描述

采用消息消费时的手动ack确认机制来保证;
如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

开启手动ack消息消费确认

// appLication.yml配置文件配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息;
如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

2、RabbitMQ消息Confirm模式(保证从生产者到交换机的消息可靠)

2.1、Confirm模式简介

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,exchange交换机会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障。
在这里插入图片描述

2.2、具体代码实现

1 配置文件application.yml 开启确认模式:spring.rabbitmq.publisher-confirm-type=correlated
2 写一个类实现RabbitTemplate.ConfirmCallback,判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理;
设置rabbitTemplate的确认回调方法
3 rabbitTemplate.setConfirmCallback(messageConfirmCallBack);

在这里插入图片描述

2.2.1、application.yml 开启确认模式

在这里插入图片描述

server:
  port: 8080
spring:
  application:
    name: confirm-test01

  rabbitmq:
    host: 你的服务器IP
    port: 5672
    username: 你的账号
    password: 你的密码
    virtual-host: power
    publisher-confirm-type: correlated #开启生产者的确认模式,设置关联模式

my:
  exchangeName: exchange.confirm.01
  queueName: queue.confirm.01

2.2.2、生产者

方式1:实现RabbitTemplate.ConfirmCallback

单独写一个类,实现RabbitTemplate.ConfirmCallback

package com.power.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("消息正确到达交换机");
            return;
        }
        //ack为false,消息没有到达交换机
        log.error("消息没有到达交换机,原因是:{}",cause);
    }
}
生产者发送消息
package com.power.service;

import com.power.config.MyConfirmCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MyConfirmCallback confirmCallback;

    @PostConstruct//构造方法后执行,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(confirmCallback);
    }

    @Bean
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        CorrelationData correlationData = new CorrelationData();//关联数据
        correlationData.setId("order_123456");//发送订单信息
        rabbitTemplate.convertAndSend("exchange.confirm.01","info",message,correlationData);
        log.info("消息发送完毕,发送时间是:{}",new Date());
    }
}
方式2:直接写在生产者发送消息类,实现RabbitTemplate.ConfirmCallback
package com.power.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService implements RabbitTemplate.ConfirmCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct//构造方法后执行,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    @Bean
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        CorrelationData correlationData = new CorrelationData();//关联数据
        correlationData.setId("order_123456");//发送订单信息
        rabbitTemplate.convertAndSend("exchange.confirm.01","info",message,correlationData);
        log.info("消息发送完毕,发送时间是:{}",new Date());
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("消息正确到达交换机");
            return;
        }
        //ack为false,消息没有到达交换机
        log.error("消息没有到达交换机,原因是:{}",cause);
    }
}
方式3:匿名内部类写法

在这里插入图片描述

package com.power.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct//构造方法后执行,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(
            new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if(ack){
                        log.info("消息正确到达交换机");
                        return;
                    }
                    //ack为false,消息没有到达交换机
                    log.error("消息没有到达交换机,原因是:{}",cause);
                }
            }
        );
    }

    @Bean
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        CorrelationData correlationData = new CorrelationData();//关联数据
        correlationData.setId("order_123456");//发送订单信息
        rabbitTemplate.convertAndSend("exchange.confirm.01","info",message,correlationData);
        log.info("消息发送完毕,发送时间是:{}",new Date());
    }
   
}
方式4:lambda表达式写法

在这里插入图片描述

package com.power.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct//构造方法后执行,相当于初始化作用
    public void init(){
        rabbitTemplate.setConfirmCallback(
             //lambda表达式写法
            (correlationData, ack, cause)->{
                log.info("关联id:{}",correlationData.getId());
                if(ack){
                    log.info("消息正确到达交换机");
                    return;
                }
                //ack为false,消息没有到达交换机
                log.error("消息没有到达交换机,原因是:{}",cause);
            }
        );
    }

    @Bean
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        CorrelationData correlationData = new CorrelationData();//关联数据
        correlationData.setId("order_123456");//发送订单信息
        rabbitTemplate.convertAndSend("exchange.confirm.03","info",message,correlationData);
        log.info("消息发送完毕,发送时间是:{}",new Date());
    }

}

2.2.3、RabbitConfig做交换机和队列的绑定

package com.power.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Value("${my.exchangeName}")
    private String exchangeName;

    @Value("${my.queueName}")
    private String queueName;

    //创建直连交换机
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }

    //创建队列
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueName).build();
    }

    //交换机绑定队列
    @Bean
    public Binding binding(DirectExchange exchangeName,Queue queueName){
        return BindingBuilder.bind(queueName).to(exchangeName).with("info");
    }
}

2.2.4、pom.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.power</groupId>
  <artifactId>rabbit_08_confirm01</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>rabbit_08_confirm01</name>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  
上一篇:嵌入式新手必读好文,常见传感器类型中,LM393的作用,及模块原理(看不懂来问我)!!!


下一篇:Kafka面试题 part-1