[RabbitMQ] 确保消息不丢失

分类

  1. 生产者丢失: 生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能
  2. MQ中丢失: 就是 RabbitMQ 自己弄丢了数据
  3. 消费端丢失: 你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

[RabbitMQ] 确保消息不丢失

生产者端丢失消息

一种方法是用RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ事务channel.txSelect,然后发送消息,如果消息没有成功被RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。吞吐量会下来,因为太耗性能。而且事务机制是同步的,你提交一个事务之后会阻塞在那儿。

最常用的方法是开启confirm模式:消息发送完之后会执行confirm方法,并回返回一个boolean类型的变量ack,如果为true表示这个消息 ok 了,如果为false,说明消息接收失败,你可以在confirm函数里面进行重新发送。confirm机制是异步的,你发送一个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步执行confirm方法通知你这个消息接收的状态。

而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

xml文件配置:

server:
  port: 8000
spring:
  rabbitmq:
    host: 192.168.107.128
    username: root
    password: root@123
    virtual-host: /
    template:
      retry:
        enabled: true
    listener:
      simple:
        acknowledge-mode: manual
      type: simple
    publisher-returns: true
    publisher-confirm-type: correlated

发送方:


@Service
@Slf4j
public class MQSender implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    public MQSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }
    public void sendSeckillMessage(String msg){
        log.info("发送消息"+msg);
        CorrelationData correlationData = new CorrelationData(msg);
        rabbitTemplate.convertAndSend("seckillExchange","seckill.message",msg,correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        //correlationData.getId()就是msg
        if(!ack){
            sendSeckillMessage(correlationData.getId());
        }
    }
}

消费方:

@Component
@Slf4j
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = "seckillExchange",type = "topic"),
        value = @Queue(value = "seckillQueue",durable = "true"),
        key = "seckill.message"
))
public class MQReceiver {
    @RabbitHandler
    public void receive(String msg){
        log.info("接收消息"+msg);
        //执行下面的逻辑
    }
}

MQ中丢失消息

在@Queue和@Exchange注解中都有autoDelete属性,值是布尔类型的字符串。如:autoDelete=“false”

  • @Queue:当所有消费客户端断开连接后,是否自动删除队列: true:删除,false:不删除。

  • @Exchange:当所有绑定队列都不在使用时,是否自动删除交换器: true:删除,false:不删除。

  • durable: 参数为false,消息在重启rabbitmq后丢失;修改为true后重新启动项目

@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange(value = "seckillExchange", type = "topic",durable = "true",autoDelete="false"),
        value = @Queue(value = "seckillQueue",durable = "true",autoDelete="false"),
        key = "seckill.message"
))

持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。

消费端丢失

你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

这个时候得用 RabbitMQ 提供的ack机制,简单来说,就是你关闭 RabbitMQ 的自动ack,设置为手动ack

rabbitmq.listener.direct:
	acknowledge-mode: manual
rabbitmq.listener.simple:
	acknowledge-mode: manual

然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的

    @RabbitHandler
    public void receive(Message message, Channel channel, String msg) throws IOException{
        log.info("接收消息"+msg);
        boolean isAck = true;
        try {
            int f=1/0;
        } catch (Exception e) {
            e.printStackTrace();
            isAck=false;
            log.info("ack:"+message.getMessageProperties().getDeliveryTag());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        }
        if(isAck){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
上一篇:每周分享(3)


下一篇:数据包拓展