分类
- 生产者丢失: 生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能
- MQ中丢失: 就是 RabbitMQ 自己弄丢了数据
- 消费端丢失: 你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。
生产者端丢失消息
一种方法是用RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ事务channel.txSelect
,然后发送消息,如果消息没有成功被RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
。吞吐量会下来,因为太耗性能。而且事务机制是同步的,你提交一个事务之后会阻塞在那儿。
最常用的方法是开启confirm模式:消息发送完之后会执行confirm方法,并回返回一个boolean类型的变量ack,如果为true表示这个消息 ok 了,如果为false,说明消息接收失败,你可以在confirm函数里面进行重新发送。confirm机制是异步的,你发送一个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步执行confirm方法通知你这个消息接收的状态。
而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
xml文件配置:
server:
port: 8000
spring:
rabbitmq:
host: 192.168.107.128
username: root
password: root@123
virtual-host: /
template:
retry:
enabled: true
listener:
simple:
acknowledge-mode: manual
type: simple
publisher-returns: true
publisher-confirm-type: correlated
发送方:
@Service
@Slf4j
public class MQSender implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
public MQSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
public void sendSeckillMessage(String msg){
log.info("发送消息"+msg);
CorrelationData correlationData = new CorrelationData(msg);
rabbitTemplate.convertAndSend("seckillExchange","seckill.message",msg,correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
//correlationData.getId()就是msg
if(!ack){
sendSeckillMessage(correlationData.getId());
}
}
}
消费方:
@Component
@Slf4j
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "seckillExchange",type = "topic"),
value = @Queue(value = "seckillQueue",durable = "true"),
key = "seckill.message"
))
public class MQReceiver {
@RabbitHandler
public void receive(String msg){
log.info("接收消息"+msg);
//执行下面的逻辑
}
}
MQ中丢失消息
在@Queue和@Exchange注解中都有autoDelete属性,值是布尔类型的字符串。如:autoDelete=“false”
。
-
@Queue:当所有消费客户端断开连接后,是否自动删除队列: true:删除,false:不删除。
-
@Exchange:当所有绑定队列都不在使用时,是否自动删除交换器: true:删除,false:不删除。
-
durable: 参数为false,消息在重启rabbitmq后丢失;修改为true后重新启动项目
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "seckillExchange", type = "topic",durable = "true",autoDelete="false"),
value = @Queue(value = "seckillQueue",durable = "true",autoDelete="false"),
key = "seckill.message"
))
持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。
消费端丢失
你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。
这个时候得用 RabbitMQ 提供的ack机制,简单来说,就是你关闭 RabbitMQ 的自动ack,设置为手动ack
rabbitmq.listener.direct:
acknowledge-mode: manual
rabbitmq.listener.simple:
acknowledge-mode: manual
然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的
@RabbitHandler
public void receive(Message message, Channel channel, String msg) throws IOException{
log.info("接收消息"+msg);
boolean isAck = true;
try {
int f=1/0;
} catch (Exception e) {
e.printStackTrace();
isAck=false;
log.info("ack:"+message.getMessageProperties().getDeliveryTag());
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
if(isAck){
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}