当交换机宕机或路由不可达时,为了保证消息不丢失,需要通知到发送者。由此引出rabbitmq的消息回退机制。声明一个组件,继承内部接口,去实现rabbitmq宕机时,消息返回给发送者,消息不会丢失。
package com.zhaoye.springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author feng
* @Date 2021/11/8 8:10
* 发布确认:交换机回调确认方法
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 仅仅实现了接口中的内部接口,并未存在在RabbitTemplate中,
* 需要进行注入到该接口的内部接口中。
*/
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this::confirm);
rabbitTemplate.setReturnsCallback(this::returnedMessage);
}
/**
* @param correlationData 保存回调消息的ID和相关信息
* @param ack 是否接收boolean
* @param cause 失败的原因
* 交换机确认回调方法
* 1 成功,true
* 2 交换机确认失败,false,失败的原因,数据
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : null;
if (ack) {
log.info("交换机已经收到id为:{}的消息", id);
} else {
log.info("交换机还未收到消息,id为:{},失败原因:{}", id, cause);
}
}
// 路由不可达,消息的回退,返回给生产者
// 只有路由不可达才回退
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息{},被交换机{}退回,退回的原因:{},路由key是:{}",new String(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
}
}
生产者:
package com.zhaoye.springbootrabbitmq.controller;
import com.zhaoye.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author feng
* @Date 2021/11/7 21:40
* 发布确认高级
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
CorrelationData correlationData1 = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFRIM_ROUTING_KEY, message, correlationData1);
log.info("发送消息内容:{}", message + "key1");
//测试队列宕机,消息确认接口回调
CorrelationData correlationData2 = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFRIM_ROUTING_KEY + "2", message, correlationData2);
log.info("发送消息内容:{}", message + "key2");
}
}
消费者:
package com.zhaoye.springbootrabbitmq.consumer;
import com.zhaoye.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author feng
* @Date 2021/11/7 21:46
* 接收消息
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message) {
String msg = new String(message.getBody());
log.info("接收到的消息:{}",msg);
}
}
组件声明:
package com.zhaoye.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author feng
* @Date 2021/11/7 21:29
* 发布确认高级
*/
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
public static final String CONFIRM_QUEUE_NAME = "confrim_queue";
public static final String CONFRIM_ROUTING_KEY = "key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue,
@Qualifier("confirmExchange")DirectExchange confirmExchange ){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFRIM_ROUTING_KEY);
}
}
properties配置
#开启发布确认模式,交换机回调确认
spring.rabbitmq.publisher-confirm-type=correlated
#路由不可达回退
spring.rabbitmq.publisher-returns=true
测试结果:
: 发送消息内容:hellokey1
2021-11-08 09:46:21.966 INFO 23928 --- [nio-8080-exec-1] c.z.s.controller.ProducerController : 发送消息内容:hellokey2
2021-11-08 09:46:21.978 ERROR 23928 --- [nectionFactory1] c.z.s.config.MyCallBack : 消息hello,被交换机confirm_exchange退回,退回的原因:NO_ROUTE,路由key是:key12
2021-11-08 09:46:21.978 INFO 23928 --- [nectionFactory1] c.z.s.config.MyCallBack : 交换机已经收到id为:1的消息
2021-11-08 09:46:21.981 INFO 23928 --- [ntContainer#0-1] c.z.s.consumer.Consumer : 接收到的消息:hello
2021-11-08 09:46:21.981 INFO 23928 --- [nectionFactory1] c.z.s.config.MyCallBack : 交换机已经收到id为:1的消息