Spring Boot 整合 RabbitMQ
Spring Boot 提供了 spring-boot-starter-amqp 组件对实现了 AMQP 协议的消息队列(RabbitMQ)的快速整合。
#1. hello world
提示
我们分发送和接收 2 部分来学习 Spring Boot 和 RabbitMQ 的整合。
-
在 pom.xml 中引入 spring-boot-starter-amqp
Copied!<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
注意
虽然你在界面上选择的是 RabbitMQ ,但是本质上引入的是 AMQP ,因为 RabbitMQ 是 AMQP 的一种实现,也是默认实现。
-
启用自动配置
老规矩,使用 @EnableRabbit 注解标注于配置类上,以表示使用 RabbitMQ 的注解功能。
-
配置文件
配置 RabbitMQ 的连接地址、端口以及账户信息:
Copied!spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=hemiao ## ----------------------------------------------- logging.level.root=INFO logging.level.xxx.yyy.zzz=DEBUG logging.pattern.console=${CONSOLE_LOG_PATTERN:\ %clr(${LOG_LEVEL_PATTERN:%5p}) \ %clr([%15.15t]){faint} \ %clr(%-40.40logger{39}){cyan} \ %clr(:){faint} %m%n\ ${LOG_EXCEPTION_CONVERSION_WORD:%wEx}}
-
编写消息接收者/消费者的代码:HelloReceiver.java
Copied!@Slf4j @Component public class HelloReceiver { @RabbitListener(queues = "Q1") public void process(String hello) { log.info("Receiver : {}", hello); } }
-
验证
在 RabbitMQ 的管理台页面上,直接向 Exchange 发送消息,确保 Exchange 会把消息转到 Q1 队列,随后,你会发现你写的代码自动触发执行了。
#2. 创建 Exchange、Queue 和 Binding
提示
类似于 Hibernate/JPA 和 spring-data-elasticsearch 的自动建表建库功能,spring-boot-starter-amqp 可以帮我们去创建 Exchange、Queue 以及它俩之间的 Binding 关系。但是,这个功能有利有弊,有人喜欢,有人不喜欢。
#创建 Exchange
@Bean
public Exchange exchange() {
// return new TopicExchange("test-exchange-1");
return new TopicExchange("test-exchange-1", true, false);
}
Copied!
参数说明:
参数 | 说明 |
---|---|
name | 字符串值,exchange 的名称。 |
durable | 布尔值,表示该 exchage 是否持久化。 它决定了当 RabbitMQ 重启后,你是否还能 “看到” 重启前创建的 exchange 。 |
autoDelete | 布尔值,表示当该 exchange 没“人”(queue)用时,是否会被自动删除。 即,实现逻辑上的临时交换机。项目启动时连接到 RabbitMQ ,创建交换机;项目停止时断开连接,RabbitMQ 自动删除交换机。 |
不指定 durable 和 autoDelete 时,默认分别是 true
和 false
。表示持久化、不用自动删除。
补充,这背后调用的是原生 API 中的 Channel
的 .exchangeDeclare()
方法。
#创建 Queue
@Bean
public Queue queue() {
// return new Queue("test-queue-1");
return new Queue("test-queue-1", true, false, false);
}
Copied!
参数说明:
参数 | 说明 |
---|---|
name | 字符串值,queue 的名称。 |
durable | 布尔值,表示该 queue 是否持久化。 它决定了当 RabbitMQ 重启后,你是否还能 “看到” 重启前创建的 queue 。 另外,需要注意的是,queue 的持久化不等于其中的消息也会被持久化。 |
exclusive | 布尔值,表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用,其它连接不可见/不可用。 |
autoDelete | 布尔值,表示当该 queue 没“人”(connection)用时,是否会被自动删除。 即,实现逻辑上的临时队列。项目启动时连接到 RabbitMQ ,创建队列;项目停止时断开连接,RabbitMQ 自动删除队列。 |
不指定 durable、exclusive 和 autoDelete 时,默认为 true 、 false 和 false 。表示持久化、非排它、不用自动删除。
补充,这背后调用的是原生 API 中的 Channel 的 .queueDeclare() 方法。
#创建 Binding
@Bean
public Binding binding(Exchange exchange, Queue queue) {
return BindingBuilder
.bind(queue).to(exchange).with("*.orange.*")
.noargs();
}
Copied!
#3. 发送消息
spring-rabbit 提供了 RabbitTemplate 来简化原生 API 的消息发送方法。
(最简单的情况下),你可以直接要求 Spring 给你注入一个 RabbitTemplate,通过它来发送消息:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void demo() {
rabbitTemplate.convertAndSend("queue-demo-1", "hello world");
}
Copied!
.convertAndSend 方法的第一个参数是 routing-key,第二个参数是你所要发送的消息。
在没有明确指定 Exchange 的情况下,该消息发送给了 RabbitMQ 的 default-exchange。而 default-exchage 是将 routing-key 视为 queue-name 。
也就是说,上述代码中的 routing-key 是 queue-demo-1,那么该消息最终是发送给 queue-demo-1 队列。
提示
.convertAndSend 方法是 .send 方法的包装/简化。.send 方法的调用相对比较繁琐。
#4. 接收/消费消息(PUSH 型)
接收/消费消息的方式有两种:Push 型和 Pull 型。
Push 型表示由 RabbitMQ Broker 负责将消息推送给消费者。消费者在一开始指定/配置监听哪个队列的消息后,就无需考虑其它。当该队列收到消息后,消费者的指定方法就会被触发执行。
PUSH 消费的配置非常简单,对你的消费者类的 “被触发方法” 标注 @RabbitListener 注解。当然,前提是消费者类要托管给 Spring:
@Component
public class Consumer1 {
private static final Logger log = LoggerFactory.getLogger(Consumer1.class);
@RabbitListener(queues = "queue-demo-1")
public void process(String message) {
log.info("Consumer 1: {}", message);
}
}
Copied!
#5. 对象的支持
Spring Boot 已经完美支持对象的发送和接收,不需要额外的配置。
警告
所传递的对象必须要实现 Serializable 接口。
!声明队列
@Bean
public Queue departmentQueue() {
return new Queue("hello");
}
Copied!
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void demo() {
rabbitTemplate.convertAndSend("hello", LocalDate.now());
}
@Slf4j
@Component
public class MessageReceiver {
@RabbitListener(queues = "hello")
public void process(LocalDate date) {
log.info("Receiver : {}", date);
}
}
Copied!
#6. Topic Exchange
Topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key *地绑定不同的队列。
考虑到环境中残留的之前的相关信息对测试的影响,如果发现测试代码的执行结果『莫名其妙』,记得在 RabbitMQ 的 web 管理系统中将相关内容清除干净,构造一个纯净的测试环境测试。
首先对 Topic 规则配置:
/* 两个 Queue */
@Bean("Q1")
public Queue queue1() { return new Queue("Q1"); }
@Bean("Q2")
public Queue queue2() { return new Queue("Q2"); }
/* 一个 Exchange */
@Bean
public TopicExchange topicExchange() { return new TopicExchange("topic-exchange"); }
/* 三个 Binding:关联 Exchange 和 Queue */
@Bean
public Binding binding1(@Qualifier("Q1") Queue queue, TopicExchange topicExchange) {
return BindingBuilder
.bind(queue).to(topicExchange).with("*.orange.*")
.noargs();
}
@Bean
public Binding binding21(@Qualifier("Q2") Queue queue, TopicExchange topicExchange) {
return BindingBuilder
.bind(queue).to(topicExchange).with("*.*.rabbit")
.noargs();
}
@Bean
public Binding binding22(@Qualifier("Q2") Queue queue, TopicExchange topicExchange) {
return BindingBuilder
.bind(queue).to(topicExchange).with("lazy.#")
.noargs();
}
Copied!
即便不编写消费者,你也可以在 15672 管理台页面上,直接看到各个 Queue 中有多少条消息。
创建两个消费者:
@Slf4j
@Component
public class C1 {
@RabbitListener(queues = "Q1")
public void process(String message) {
log.info("C1: {}", message);
}
}
@Slf4j
@Component
public class C2 {
@RabbitListener(queues = "Q2")
public void process(String message) {
log.info("C2: {}", message);
}
}
Copied!
测试:(这里偷了个懒,没有去创建发送者类,直接在 Junit 中使用了 AmqpTemplate 发送消息)。
@Autowired
private AmqpTemplate rabbitTemplate;
@Test
public void demo1() throws InterruptedException {
rabbitTemplate.convertAndSend("testTopic", "hello.orange", "hello orange");
rabbitTemplate.convertAndSend("testTopic", "hello.orange.world", "hello orange world");
rabbitTemplate.convertAndSend("testTopic", "hello.world.rabbit", "hello world rabbit");
rabbitTemplate.convertAndSend("testTopic", "lazy", "lazy");
rabbitTemplate.convertAndSend("testTopic", "lazy.good", "good");
rabbitTemplate.convertAndSend("testTopic", "lazy.good.bye", "goodbye");
Thread.sleep(1000L);
}
Copied!
#7. Fanout Exchange
@Bean("green")
public Queue greenQueue() { return new Queue("green"); }
@Bean("red")
public Queue redQueue() { return new Queue("red"); }
@Bean("orange")
public Queue orangeQueue() { return new Queue("orange"); }
@Bean
public FanoutExchange exchange() { return new FanoutExchange("testFanout"); }
@Bean
public Binding binging1(FanoutExchange exchange, @Qualifier("green") Queue queue) {
return BindingBuilder
.bind(queue).to(exchange).with("")
.noargs();
}
@Bean
public Binding binging2(FanoutExchange exchange, @Qualifier("red") Queue queue) {
return BindingBuilder
.bind(red).to(exchange).with("")
.noargs();
}
@Bean
public Binding binging3(FanoutExchange exchange, @Qualifier("orange") Queue queue) {
return BindingBuilder
.bind(orange).to(exchange).with("")
.noargs();
}
Copied!
@Test
public void demo2() throws InterruptedException {
rabbitTemplate.convertAndSend("blue", "", "green");
rabbitTemplate.convertAndSend("blue", "", "red");
rabbitTemplate.convertAndSend("blue", "", "orange");
Thread.sleep(1000L);
}
Copied!
Customer-A、Customer-B、Customer-C 都会收到这三条消息,即,控制台会打印出 9 条日志。
#8. 接收/消费消息(PULL 型)
PULL 型消费意味着需要消费者主动从 RabbitMQ Broker 上『取』消息。
PULL 型消费『不依靠』@RabbitListener 注解。而是需要在代码中手动调用 .receiveAndConvert 方法。
.receiveAndConvert 方法是 .receive 方法的简化版。
@Test
public void demo5() {
rabbitTemplate.convertAndSend("queue-demo-1", "hello world");
}
@Test
public void demo4() {
log.info("{}", rabbitTemplate.receiveAndConvert("queue-demo-1"));
}
Copied!
#9. 发送者确认
注意
发送者如何知道自己所发送的消费成功抵达了 RabbitMQ Broker 中的 Exchange 中,乃至成功抵达了 RabbitMQ Broker 中的 Queue 中?
生产者确认
#确认消息已到 Exchange
RabbitMQ 有一个配置属性 spring.rabbitmq.publisher-confirm-type
控制是否开启确认功能。该属性默认值是 NONE ,表示不开启消息确认。
-
publisher-confirm-type = SIMPLE
当改属性的值为 SIMPLE 时,表示支持以简单(同步阻塞等待)方式获得确认与否的信息。
这里会调用 Template#waitForConfirms 方法,不过这个方法有个要求,它必须在 Template#invoke 方法中使用。
Copied!String str = rabbitTemplate.invoke((operations) -> { // 参数 operations 实际上就是 Template 。 operations.convertAndSend("red", "hello world"); log.info("{}", operations.waitForConfirms(1000)); // 阻塞等待 1 秒,以获得确认信息。 return "over"; // lambda 表达式的值将成为 invoke 方法的返回值。 }); log.info("{}", str);
你可以向不存在的 Exchange 发送消息已验证效果。
-
publisher-confirm-type = CORRELATED
当改属性的值为 CORRELATED 时,表示支持以异步回调方式获得确认与否的信息。
在之前的代码中,是 spring-rabbit 帮我们创建 ConnectionFactory,再进一步创建 RabbitTemplate,并注入到我们的代码中进而被我们使用。
现在由于需要对 RabbitTemplate 进行设置,因此,我们需要自己创建并设置 RabbitTemplate。(不过,还是需要 spring-rabbit 帮我们创建 Connection Factory,并注入)
Copied!@Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); // 当 Exchange 收到消息后,这里设置的回调方法会被触发执行 rabbitTemplate.setConfirmCallback( ... ); return rabbitTemplate; }
你可以使用 lamda 表达式来简化下列匿名实现类。
Copied!rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 该方法无论 Exchange 能否收到消息都会执行。 */ @Override public void confirm(CorrelationData correlationData, boolean ack, java.lang.String cause) { if (ack) log.info("消息已发送至 Exchange"); else log.info("消息未能发送到 Exchange。{}", cause); } });
#确认消息已到 Message Queue
## 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
Copied!
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
...
// 当消息『走不到』RabbitMQ 的 Queue 时会被触发执行。
rabbitTemplate.setReturnCallback( ... );
return rabbitTemplate;
}
Copied!
你可以使用 lamda 表达式来简化下列匿名实现类。
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 该方法在 Queue 无法收到消息时被触发执行。Queue 能收到消息,反而不会执行。
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("ReturnCallback 消息:{}", message);
log.info("ReturnCallback 回应码:{}", replyCode);
log.info("ReturnCallback 回应信息:{}", replyText);
log.info("ReturnCallback 交换机:{}", exchange);
log.info("ReturnCallback 路由键:{}", routingKey);
}
});
Copied!
你可以向不存在的 Exchange 和 Queue 发送消息已验证效果。
#10. 消费端的确认与拒绝
默认情况下,RabbitMQ 启用的是消费端自动(auto)回复。即,当消费端收到消息,就会给 RabbitMQ Broker 作出回复,表示已收到。
只有在消费端回复 RabbitMQ Broker 之后,RabbitMQ Broker 才会将该消息从消息队列中移除。
回复的行为除了有 AUTO 之外,还有 NONE 和 MANUAL 。
NONE 表示不回复,即,RabbitMQ Broker 永远不可能知道消费者端到底有没有收到消息。RabbitMQ Broker 发出
MANUAL 则意味着需要在消费者端手动发送回复信息。在消费者回复前,该消息在消费端未回复前在 RabbitMQ Brocker 上一直处于 Unacked 状态。如果消费者始终都不回复该消息,那么直到消费者与 RabbitMQ 断开连接之后,这条消息才会重新变为 Ready 状态。
启用消费端的确认功能需要打开配置开关:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
Copied!
于此同时,消息消费者的处理方法需要改造成以下形式:
@Component
public class Consumer2 {
@RabbitListener(queues = "queue-demo-1")
public void process(String message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
...
}
}
Copied!
#确认消息
确认消息使用 channel 的 .basicAck
方法:
channel.basicAck(tag, false);
Copied!
basicAck 方法需要传递两个参数:
-
deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel(Channel 是比 Connection 更小的单位),RabbitMQ 通过 Channel 向消费者投递消息时,都会为该消息分配一个唯一性标识:delivery tag 。同一个 Channel 中的消息的 delivery tag 都是唯一且单调递增的。
-
multiple:是否批量确认。当参数为 false 时,意味着确认单条消息,RabbitMQ 仅从消息队列中删除该消息;当参数为 true 时,意味着批量确认,RabbitMQ 会从消息队列中删除编号小于等于该消息的所有信息。
#拒绝消息
拒绝消息使用 channel 的 .basicReject
方法:
channel.basicReject(tag, false);
Copied!
basicReject 方法也需要传力两个参数:
-
deliveryTag(唯一标识 ID):同上。
-
requeue(重入标识):标识该消息是否需要 RabbitMQ Broker 重新入队。(有可能的话,会被该队列的其它消费者消费)。
另外,拒绝的方法还有 .basicNack,表示批量拒绝。