概念
确认机制--》可靠抵达
发送端确认
#配置文件中 开启发送端到达服务器确认
spring.rabbitmq.publisher-confirms = true
#开启发送端消息抵达队列确认
spring.rabbitmq.publisher-returns = true
#只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory = true
@Configuration
public class MyRabbitMQConfig {
// @Resource
// private RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/* *//**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*//*
@PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
*//**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*//*
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
*//**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*//*
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}*/
}
消费端确认
1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,
问题:
我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。发生消息丢失;
解决方法:消费者手动确认模式,
#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RbbitHandler
public void recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException{
System,out.println("接收到消息",+content);
byte[] body = message.getBody();
//消息头属性信息
MessageProperties properties = message.getMessageProperties();
//Thread.sleep(3000);
System.out.println("消息处理完成==》"+content.getName());
//deliveryTag 是 Channel内按顺序自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag==>"+deliveryTag);
//手动确认,批量模式
try{
if(deliveryTag%2==0){
channel.basicAck(deliveryTag, false);
}else{
//退货 requeue=false?丢弃:发回服务器,重新入队
//long deliveryTag, boolean multiple, boolean requeue
channel.basicNack(deliveryTag, false, true);
//long deliveryTag, boolean requeue
//channle.basicReject();
}
}catch(Exception e){
//网络中断
}
}