1. 什么是消息队列
消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
2. 为什么要使用消息队列
消息队列是一种应用间的异步协作机制,常用于以下场景:
2.1 异步
可以将一些非核心流程,如日志,短信,邮件等,通过MQ的方式异步去处理。这样做的好处是缩短主流程的响应时间,提升用户体验。
2.2 解耦
按照不同的功能, 把一个服务, 拆分成多个系统。比如下订单的过程:扣减库存、生成相应单据、发红包、发短信通知等。每个不同的过程互不影响,通过 MQ 来实现任务的推送和消费。
2.3 削峰
当一个活动瞬时流量过大,对于不需要及时反馈,并且服务处理比较耗时可能会造成服务响应的缓慢甚至奔溃时,可以将对应的需求先放入队列中,服务顺序去处理对应的请求,后续通知用户结果即可。
异步,解耦,消峰,MQ的三大主要应用场景。
3. RabbitMQ 的特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
-
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 -
灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 -
消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 -
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 -
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 -
多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 -
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 -
跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 -
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
4. RabbitMQ 中的模型概念
4.1 消息模型
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
4.2 基本概念
RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
-
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 -
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。 -
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 -
Connection
网络连接,比如一个 TCP 连接。 -
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 -
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 -
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 -
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 -
Virtual Host
虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 -
Broker
表示消息队列服务器实体。
4.3 Exchange 类型
Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多。
- 直连交换机:Direct exchange
- 扇形交换机:Fanout exchange
- 主题交换机:Topic exchange
- 首部交换机:Headers exchange
创建交换机
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
交换机具有以下属性:
- exchange – 名称
- type – 类型
- durable – 如果我们声明持久交换,则为真(交换将在服务器重启后继续存在)
- autoDelete – 如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
- internal – 如果交换是内部的,则为 true,即不能由客户端直接发布。
- arguments- 扩展参数
4.3.1 Direct 直连交换机
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个 routing_key
,当消息被发送的时候,需要指定一个 binding_key
,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个 binding_key
也是支持应用到多个队列中的。
这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。
4.3.2 Fanout 扇形交换机
扇形交换机是最基本的交换机类型,它所能做的事情非常简单 ——— 广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要 “思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
4.3.3 Topic 主题交换机
主题交换机的 routing_key
需要有一定的规则,交换机和队列的 binding_key
需要采用 .#...... 的格式,每个部分用. 分开,其中:
*
表示一个单词#
表示任意数量(零个或多个)单词。
假设有一条消息的 routing_key 为 fast.rabbit.white, 那么带有这样 binding_key 的几个队列都会接收这条消息:
fast..
..white
fast.#
……
当一个队列的绑定键为#的时候,这个队列将会无视消息的路由键,接收所有的消息。
4.3.4 Headers 首部交换机
首部交换机是忽略 routing_key
的一种路由方式。路由器和交换机路由的规则是通过 Headers
信息来交换的,这个有点像 HTTP
的 Headers
。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个 Hash
的数据结构,消息发送的时候,会携带一组 hash 数据结构的信息,当 Hash 的内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash
结构中要求携带一个键 “x-match”,这个键的 Value
可以是 any
或者 all
,这代表消息携带的 Hash
是需要全部匹配 (all),还是仅匹配一个键 (any) 就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串 (string)。
匹配规则 x-match 有下列两种类型:
- x-match = all :表示所有的键值对都匹配才能接受到消息
- x-match = any :表示只要有键值对匹配就能接受到消息
4.3.4 headers 直连交换机
Headers Exchange 不同于上面三种 Exchange,它是根据 Message 的一些头部信息来分发过滤 Message,忽略routing key 的属性,如果 Header 信息和 message 消息的头信息相匹配,那么这条消息就匹配上了。
5. RabbitMQ 服务搭建
6. RabbitMQ 使用介绍
6.1 流程介绍
pom.xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
Producer
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* rabbitmq 生产者
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("user");
factory.setPassword("user");
//也可以使用url来配置
//factory.setUri("amqp://guest:guest@localhost:5672");
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 发送消息
String exchangeName = "exchangeName";
String routingKey1 = "cat";
String routingKey2 = "dong";
for (int i = 0; i < 100; i++) {
String msg = "this is msg";
if (i % 2 == 0) {
msg = "cat:" + msg;
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
} else {
msg = "dong:" + msg;
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
}
System.out.println("发送了消息:" + msg);
Thread.sleep(1000);
}
//5. 关闭连接
channel.close();
connection.close();
}
}
Consumer
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* rabbitmq 消费者
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("user");
factory.setPassword("user");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4. 创建交换机
String exchangeName = "exchangeName";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, false, true, null);
// 5.创建队列
String queueName1 = "queueCat";
String queueName2 = "queueDong";
channel.queueDeclare(queueName1, false, false, true, null);
channel.queueDeclare(queueName2, false, false, true, null);
//5. 通过不同的key 绑定队列
String routingKey1 = "cat";
String routingKey2 = "dong";
channel.queueBind(queueName1, exchangeName, routingKey1);
channel.queueBind(queueName2, exchangeName, routingKey2);
// 消费者处理
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String format = String.format("[%s]-收到消息:【%s】", envelope.getRoutingKey(), new String(body, StandardCharsets.UTF_8));
System.out.println(format);
//消息消费确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 6.绑定消费者进行消费
while (true) {
channel.basicConsume(queueName1, false, defaultConsumer);
channel.basicConsume(queueName2, false, defaultConsumer);
Thread.sleep(1000);
}
}
}
先启动 消费者 创建交换机、队列,进行指定队列的监听,然后启动生产者进行生产消息。
6.2 RabbitMQ 在 SpringBoot 中的使用
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
application.yml
# rabbitmq
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
6.2.1 声明 exchange、queue、Binding
SpringAMQP 项目对 RabbitMQ 做了很好的封装,可以很方便的手动声明队列,交换器,绑定。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* 声明一个直连交换机
* 声明两个队列
* 将两个队列通过 routingKey 绑定到交换机
*/
@Component
public class DirectDeclare {
@Bean(RabbitConfig.Direct.EXCHANGE)
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(RabbitConfig.Direct.EXCHANGE).durable(true).build();
}
@Bean(RabbitConfig.Direct.QUEUE_ONE)
public Queue directQueueOne() {
return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_ONE).build();
}
@Bean(RabbitConfig.Direct.QUEUE_TWO)
public Queue directQueueTwo() {
return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_TWO).build();
}
@Bean
public Binding bindingQueueOne(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
@Qualifier(RabbitConfig.Direct.QUEUE_ONE) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_ONE_KEY);
}
@Bean
public Binding bindingQueueTwo(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
@Qualifier(RabbitConfig.Direct.QUEUE_TWO) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_TWO_KEY);
}
}
6.2.2 生产者发送消息
RabbitTemplate 是 Spring 用于简化同步 RabbitMQ 访问(发送和接收消息)的 Helper 类。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitService {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(RabbitSendVo sendVo) {
String exchange = sendVo.getExchange();
String routingKey = sendVo.getRoutingKey();
String message = sendVo.getMessage();
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
6.2.3 消费者监听消息
消息的监听使用 @RabbitListener 来完成,使用 queues 指定需要监听的队列。
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class DirectListener {
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE})
public void listenerOne(Message message) throws IOException {
log.info("【directQueueOne】收到消息:{}", new String(message.getBody()));
}
}
同时,@RabbitListener 也可以作用于 class 上,然后使用 @RabbitHandler 配合使用。
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
@Slf4j
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE})
public class DirectListener {
@RabbitHandler
public void listenerOne(Message message) throws IOException {
log.info("【directQueueOne】收到消息:{}", new String(message.getBody()));
}
}
6.2.4 @RabbitListener、@Exchange、@Queue、@QueueBinding
使用 @RabbitListener 可以搭配 @Exchange、@Queue、@QueueBinding 等注解简化使用的过程,exchange、queue 的一些属性也可以在这里配置。
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 通过注解实现一个主题交换机
*/
@Slf4j
@Component
public class TopicListener {
/**
* 主题交换机
* 处理 Cat 相关消息
* 通过注解方式创建,命名的都是持久的
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(name = RabbitConfig.Topic.EXCHANGE, type = ExchangeTypes.TOPIC),
key = RabbitConfig.Topic.QUEUE_CAT_KEY,
value = @Queue(RabbitConfig.Topic.QUEUE_CAT)
))
public void topicCat(Channel channel, Message message) throws IOException {
log.info("【topic-cat】收到消息:{}", new String(message.getBody());
}
}
6.2.5 MessageConverter
-
涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte [] 数组的解析
-
RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
-
当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
-
SimpleMessageConverter 对于要发送的消息体 body 为 byte [] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含 class 类名,类相应方法等信息。因此性能较差
-
当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
public interface MessageConverter {
/**
* Convert a Java object to a Message.
* @param object the object to convert
* @param messageProperties The message properties.
* @return the Message
* @throws MessageConversionException in case of conversion failure
*/
Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;
/**
* Convert a Java object to a Message.
* The default implementation calls {@link #toMessage(Object, MessageProperties)}.
* @param object the object to convert
* @param messageProperties The message properties.
* @param genericType the type to use to populate type headers.
* @return the Message
* @throws MessageConversionException in case of conversion failure
* @since 2.1
*/
default Message toMessage(Object object, MessageProperties messageProperties, @Nullable Type genericType)
throws MessageConversionException {
return toMessage(object, messageProperties);
}
/**
* Convert from a Message to a Java object.
* @param message the message to convert
* @return the converted Java object
* @throws MessageConversionException in case of conversion failure
*/
Object fromMessage(Message message) throws MessageConversionException;
}
6.2.5.1 SimpleMessageConverter
SpringAMQP 默认使用 SimpleMessageConverter 来实现 消息的转换,是一个 可以处理字符串、可序列化实例或字节数组的MessageConverter实现。
SimpleMessageConverter.createMessage:
// SimpleMessageConverter.createMessage
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
if (object instanceof byte[]) {
bytes = (byte[]) object;
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
}
else if (object instanceof String) {
try {
bytes = ((String) object).getBytes(this.defaultCharset);
}
catch (UnsupportedEncodingException e) {
throw new MessageConversionException(
"failed to convert to Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setContentEncoding(this.defaultCharset);
}
else if (object instanceof Serializable) {
try {
bytes = SerializationUtils.serialize(object);
}
catch (IllegalArgumentException e) {
throw new MessageConversionException(
"failed to convert to serialized Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
}
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
return new Message(bytes, messageProperties);
}
throw new IllegalArgumentException(getClass().getSimpleName()
+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:
- application/octet-stream:二进制字节数组存储,使用 byte []
- application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
- text/plain:文本数据类型存储,使用 String
- application/json:JSON 格式,使用 Object、相应类型
普通消息实例:
contentType=text/plain
(Body:‘测试消息‘ MessageProperties [headers={spring_listener_return_correlation=d4e4e756-e85e-4ec3-accb-e6b08a7c2f80}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct, receivedRoutingKey=one, deliveryTag=2, consumerTag=amq.ctag-7eT4L6V5TKzC4GMDE_hOcg, consumerQueue=directQueueOne])
Java Bean 消息实例:
contentType=application/x-java-serialized-object
(Body:‘[B@3823efd5(byte[151])‘ MessageProperties [headers={spring_listener_return_correlation=0dc30aa6-b8cd-4f2f-9129-0fc8b729e8ea}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct, receivedRoutingKey=one, deliveryTag=1, consumerTag=amq.ctag-U0u4SdugbrO9-SbQ3bD6gg, consumerQueue=directQueueOne])
6.2.5.2 修改默认 MessageConverter
消息处理方法参数是由 MessageConverter
转化,若使用自定义 MessageConverter 则需要在
RabbitListenerContainerFactory实例中去设置(默认 Spring 使用的实现是
SimpleRabbitListenerContainerFactory`)
使用 SpringAMQP
提供的 Jackson2JsonMessageConverter
实现 消息的 Json 转换。
publisher 和 consumers 使用相同的 MessageConverter。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 消息序列化方式
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
直接注入是最方便的方法了,版本 spring-boot-starter-amqp 2.4.4
.
当然也可以使用另一种方式,在生产者和消费者两边都设置同样的 MessageConverter, 同时可以配置一些其他业务相关的属性。
@Configuration
public class RabbitConfig {
/**
* consumers 监听容器
* @param listenerContainerFactoryConfigurer listener configurer
* @param connectionFactory 连接工厂
* @return RabbitListenerContainerFactory
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
//使用配置
listenerContainerFactoryConfigurer.configure(listenerContainerFactory, connectionFactory);
//配置消息反序列方式
listenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
return listenerContainerFactory;
}
/**
* publisher template
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplateConfigurer templateConfigurer) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
//使用配置
templateConfigurer.configure(rabbitTemplate, connectionFactory);
//消息序列化方式
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//避免阻塞,将 publish 和 consumer 的 connection 分开
rabbitTemplate.setUsePublisherConnection(true);
return rabbitTemplate;
}
}
6.2.5.3 @Payload 与 @Headers
使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
System.out.println("body:"+body);
System.out.println("Headers:"+headers);
}
也可以获取单个 Header 属性
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
System.out.println("body:"+body);
System.out.println("token:"+token);
}
6.2.6 消息确认机制
RabbitMQ 的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
6.2.6.1 消息发送确认
6.2.6.1.1 ConfirmCallback
通过实现 ConfirmCallBack 接口,消息发送到交换器 Exchange 后触发回调。
使用该功能需要开启确认,spring-boot 中配置如下:
spring.rabbitmq.publisher-confirm-type= correlated
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* mq 消息投递到exchange 确认回调
* 处理失败消息
*/
@Slf4j
@Component("rabbitConfirmCallback")
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息投递到exchange失败,cause={}", cause);
}
}
}
-
correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
-
ack:消息投递到 broker 的状态,true 表示成功。
-
cause:表示投递失败的原因。
消息投递到exchange失败,cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘direct1‘ in vhost ‘/‘, class-id=60, method-id=40)
不管有没有成功发送到 exchange
,都会走 ConfirmCallback
的回调,参数 boolean ack 表示是否成功,case 是失败的原因,例如:NOT_FOUND - no exchange ‘direct1‘ in vhost ‘/‘
6.2.6.1.2 ReturnsCallback
通过实现 ReturnCallback
接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的 routingKey
找不到队列时会触发)
使用该功能需要开启确认,spring-boot 中配置如下:
spring.rabbitmq.template.mandatory: true
/**
* 消息没有匹配到合适的队列时,退回回调处理
*/
@Slf4j
@Component("rabbitReturnsCallback")
public class ReturnsCallback implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
String exchange = returned.getExchange();
String routingKey = returned.getRoutingKey();
Message message = returned.getMessage();
log.error("消息丢失[exchange={},routingKey={},message={}]", exchange, routingKey, message);
}
}
[exchange=direct,routingKey=one1,message=(Body:‘[B@669a964c(byte[151])‘ MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]
6.2.6.1.3 配置使用 ConfirmCallback、ReturnsCallback
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* publisher template
* @param connectionFactory 连接工厂
* @param templateConfigurer configurer
* @param confirmCallback confimCallback
* @param returnsCallback returnsCallback
* @return RabbitTemplate
*/
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplateConfigurer templateConfigurer,
RabbitTemplate.ConfirmCallback confirmCallback, RabbitTemplate.ReturnsCallback returnsCallback) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
//使用配置
templateConfigurer.configure(rabbitTemplate, connectionFactory);
//消息序列化方式
//rabbitTemplate.setMessageConverter(jsonMessageConverter());
//避免阻塞,将 publish 和 consumer 的 connection 分开
rabbitTemplate.setUsePublisherConnection(true);
//confirm 回调,开启publisher-confirm-type:correlated
//connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
rabbitTemplate.setConfirmCallback(confirmCallback);
//return 回调,开启template.mandatory=true
//消息根据 routingKey 无法找到合适的 queue 时,将消息退回,而不是丢失。
//rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallback);
return rabbitTemplate;
}
/**
* 消息序列化方式
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
6.2.6.2 消息接收确认
6.2.6.2.1 确认模式
- AcknowledgeMode.NONE:不确认
- AcknowledgeMode.AUTO:自动确认
- AcknowledgeMode.MANUAL:手动确认
spring-boot 中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
6.2.6.2.2 手动确认
- 确认成功
void basicAck(long deliveryTag, boolean multiple) throws IOException;
-
deliveryTag: 该消息的 index
-
multiple:是否批量. true:将一次性 ack 所有小于
deliveryTag
的消息。
假设我先发送三条消息 deliveryTag 分别是 5、6、7,可它们都没有被确认,当我发第四条消息此时 deliveryTag 为 8,multiple 设置为 true,会将 5、6、7、8 的消息全部进行确认。
消费者成功处理后,调用 channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
方法对消息进行确认。
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_ONE})
public void listenerOne(Channel channel, Message message, @Payload Mail mail) throws IOException {
log.info("【directQueueOne】收到消息:{}", mail);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
- 确认失败
确认失败,可以使用 requeue参数
选择是否将消息重新放回队列。
//第一种方式
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
-
deliveryTag: 该消息的 index,递增的。
-
multiple:是否批量. true:将一次性拒绝所有小于
deliveryTag
的消息。 -
requeue:被拒绝的是否重新入队列。
//第二种方式
void basicReject(long deliveryTag, boolean requeue) throws IOException;
-
deliveryTag: 该消息的 index。
-
requeue:被拒绝的是否重新入队列。
如果没有其他接收者监控这个
queue
的话,要注意一直无限循环发送的危险。
当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行。
建议为队列设置私信队列,当出现异常时,由死信队列监听后再进行处理。
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception {
try {
log.info("【directQueueTwo】收到消息:{}", mail);
int i = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
basicNack、basicReject区别:
channel.basicNack
与 channel.basicReject
的区别在于 basicNack
可以批量拒绝多条消息(一次性拒绝所有小于 deliveryTag 的消息),而 basicReject 一次只能拒绝一条消息。
6.2.7 死信队列
为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ
的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
6.2.7.1 死信队列是什么
死信
,在官网中对应的单词为 “Dead Letter”。死信
是 RabbitMQ
中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用
channel.basicNack
或channel.basicReject
,并且此时requeue
属性被设置为false
。 - 消息在队列的存活时间超过设置的 TTL 时间。
- 消息队列的消息数量已经超过最大队列长度。
那么该消息将成为 “死信”。
TTL - 通过 channel.queueDeclare 方法中的 x-expires 参数可以控制队列被自动删除前处于未使用状态的时间。
设置了 TTL 后,在设置的时间内,没有消费者消费这条消息,那么判定这条消息为过期。
“死信” 消息会被 RabbitMQ 进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
6.2.7.2 配置使用死信队列
大概可以分为以下步骤:
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由 key
- 为死信交换机配置死信队列
注意,并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由 key。
有了死信交换机和路由 key 后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。一般来说,会为每个业务队列分配一个独有的路由 key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
声明 exchange、queue,将 exchange、queue 绑定,然后在其他队列创建时,指定扩展参数设定其死信消息都发送到该队列。
import com.yohaps.mq.rabbittemplate.config.RabbitConfig;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* 声明死信队列
* 就是普通的交换机和队列,接收其他队列的死信消息
*/
@Component
public class DeadLetterDeclare {
@Bean(RabbitConfig.DeadLetter.EXCHANGE)
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange(RabbitConfig.DeadLetter.EXCHANGE).durable(true).build();
}
@Bean(RabbitConfig.DeadLetter.QUEUE_ONE)
public Queue deadLetterQueueOne() {
return QueueBuilder.durable(RabbitConfig.DeadLetter.QUEUE_ONE).build();
}
@Bean(RabbitConfig.DeadLetter.QUEUE_TWO)
public Queue deadLetterQueueTwo() {
return QueueBuilder.durable(RabbitConfig.DeadLetter.QUEUE_TWO).build();
}
@Bean
public Binding bindingDeadLetterQueueOne(@Qualifier(RabbitConfig.DeadLetter.EXCHANGE) DirectExchange exchange,
@Qualifier(RabbitConfig.DeadLetter.QUEUE_ONE) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.DeadLetter.QUEUE_ONE_KEY);
}
@Bean
public Binding bindingDeadLetterQueueTwo(@Qualifier(RabbitConfig.DeadLetter.EXCHANGE) DirectExchange exchange,
@Qualifier(RabbitConfig.DeadLetter.QUEUE_TWO) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.DeadLetter.QUEUE_TWO_KEY);
}
}
关键:
在需要配置死信队列的队列创建时,指定死信交换机和key.
队列属性:
- x-dead-letter-exchange:指定死信交换机
- x-dead-letter-routing-key:指定死信 routingKey
- x-message-ttl:消息过期时间 ms
- alternate-exchange:备份交换机
import com.yohaps.mq.rabbittemplate.config.RabbitConfig;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* 声明一个直连交换机
* 声明两个队列
* 将两个队列通过 routingKey 绑定到交换机
*/
@Component
public class DirectDeclare {
@Bean(RabbitConfig.Direct.EXCHANGE)
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(RabbitConfig.Direct.EXCHANGE).durable(true).build();
}
@Bean(RabbitConfig.Direct.QUEUE_ONE)
public Queue directQueueOne() {
//声明当前队列绑定的死信交换机与指定的key
return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_ONE)
.withArgument("x-dead-letter-exchange", RabbitConfig.DeadLetter.EXCHANGE)
.withArgument("x-dead-letter-routing-key", RabbitConfig.DeadLetter.QUEUE_ONE_KEY)
.build();
}
@Bean(RabbitConfig.Direct.QUEUE_TWO)
public Queue directQueueTwo() {
//声明当前队列绑定的死信交换机与指定的key
return QueueBuilder.durable(RabbitConfig.Direct.QUEUE_TWO)
.withArgument("x-dead-letter-exchange", RabbitConfig.DeadLetter.EXCHANGE)
.withArgument("x-dead-letter-routing-key", RabbitConfig.DeadLetter.QUEUE_TWO_KEY)
.withArgument("x-message-ttl",20000)
.build();
}
@Bean
public Binding bindingQueueOne(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
@Qualifier(RabbitConfig.Direct.QUEUE_ONE) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_ONE_KEY);
}
@Bean
public Binding bindingQueueTwo(@Qualifier(RabbitConfig.Direct.EXCHANGE) DirectExchange exchange,
@Qualifier(RabbitConfig.Direct.QUEUE_TWO) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitConfig.Direct.QUEUE_TWO_KEY);
}
}
死信队列的监听,也是跟普通队列没区别
@Slf4j
@Component
public class DeadLetterListener {
@RabbitListener(queues = {RabbitConfig.DeadLetter.QUEUE_ONE})
public void listenerOne(Channel channel, Message message, @Payload Mail mail) throws IOException {
log.info("【DeadLetterListener-One】收到消息:{}", mail);
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
System.out.println(receivedRoutingKey);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = {RabbitConfig.DeadLetter.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws IOException {
log.info("【DeadLetterListener-Two】收到消息:{}", mail);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
测试:在业务处理中,发生异常,捕捉到异常后,设置 queues=false
,消息会进入到死信队列。
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception {
try {
log.info("【directQueueTwo】收到消息:{}", mail);
int i = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
利用 死信队列 + TTL 的特性,可以实现延迟消息
比如:登录后,发送一条消息(推荐商品,发邮件通知等),设置 TTL 而没有消费者进行消费,过期后就会进入死信队列,在死信队列处理业务。
6.2.8 重试机制
在消费端在处理消息过程中发生异常,进行重新尝试。
开启重试机制:
# rabbitmq
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 自动ack
retry:
enabled: true
max-attempts: 5 # 重试次数
max-interval: 10000 # 重试最大间隔时间
initial-interval: 2000 # 重试初始间隔时间
multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
消费端代码模拟发生异常:
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws Exception {
log.info("【directQueueTwo】收到消息:{}", mail);
int i = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
此时启动程序,发送消息后可以看到控制台输出内容如下:
可以看到重试次数是 5 次(包含自身消费的一次),重试时间依次是 2s,4s,8s,10s(上一次间隔时间 * 间隔时间乘子),最后一次重试时间理论上是 16s,但是由于设置了最大间隔时间是 10s,因此最后一次间隔时间只能是 10s,和配置相符合。
注意:
重试并不是 RabbitMQ 重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟 mq 没有任何关系;
因此上述消费者代码不能添加 try {} catch (){},一旦捕获了异常,在自动 ack 模式下,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试的;
重试次数耗尽后,依然没有正确消费的处理查看 《MessageReCoverer》
6.2.9 MessageReCoverer
在消费端处理业务的过程中发生异常,会使用 MessageRecoverer
处理。
public interface MessageRecoverer {
/**
* 已消费但所有重试尝试失败的消息的回调。
* Callback for message that was consumed but failed all retry attempts.
*
* @param message the message to recover
* @param cause the cause of the error
*/
void recover(Message message, Throwable cause);
}
MessageRecoverer 有以下几个实现类:
- RejectAndDontRequeueRecoverer:拒绝并且不会将消息重新发回队列
- RepublishMessageRecoverer:重新发布消息
- ImmediateRequeueMessageRecoverer:立即重新返回队列
默认使用 RejectAndDontRequeueRecoverer 配置。
6.2.9.1 配置MessageReCoverer
/**
* 消费端处理消息,发生异常后,将消息发送到新的队列
*/
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "errorExchange","errorRoutingKey");
}
/**
* 消费端处理消息,发生异常后,将消息重新放入队列
* 周而复始直到不抛出异常为止,这样还是会影响后续的消息消费。
*/
@Bean
public MessageRecoverer messageRecoverer(){
return new ImmediateRequeueMessageRecoverer();
}
6.2.9.2 特别注意
在自动 ACK 模式下,消费端发生异常并且在重试次数耗尽后,默认情况 下使用 RejectAndDontRequeueRecoverer
处理(拒绝消息并且不会将消息重新发回队列),也就是 requeue=false
,如果配置了 死信队列 会将消息发送到 死信队列。
一般来说重试实在 自动 ACK
模式下进行的,如果是 手动 ACK
,重试也是正常进行的,但是尝试将消息使用 RepublishMessageRecoverer
重新发布到新的队列会失败,配置了死信队列也不会进入,而在自动 ACK 模式下,三种实习方式都是正常的。
6.2.10 手动ACK实现重试,超出后放入死信
在手动ACK模式下,尝试在重试次数耗尽后,将其放入新的队列处理,出现异常。于是,手动实现重试机制:
- 在出现异常时,进行捕获
- 将当前消息ACK
- 将当前消息手动重新放入队列,并在其 Header 记录重试次数
- 达到重试次数后,利用 TTL 过期机制,将其放入 死信队列
@RabbitListener(queues = {RabbitConfig.Direct.QUEUE_TWO})
public void listenerTwo(Channel channel, Message message, @Payload Mail mail) throws IOException, Int
try {
log.info("【directQueueTwo】收到消息:{}", mail);
int i = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//发生异常后,拒绝消息
//channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("catch");
/**
* 以下手动实现重试,耗尽次数后再进行处理
* 发生异常后,先 ACK ,然后在消息的 header 中记录重试次数,重新将消息发回队列
*/
Map<String, Object> headers = message.getMessageProperties().getHeaders();
//重试次数
Integer retryCount;
String mapKey = "retry-count";
if (!headers.containsKey(mapKey)) {
retryCount = 0;
} else {
retryCount = (Integer) headers.get(mapKey);
}
if (++retryCount < 3) {
/**
* 当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部。
* 这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行 回滚,如此反复进行
* 而比较理想的方式是,出现异常时,消息到达消息队列尾部,这样既保证消息不回丢失,又保证了正常业务的进行。
* 因此我们采取的解决方案是,将消息进行应答。
* 这时消息队列会删除该消息,同时我们再次发送该消息 到消息队列,这时就实现了错误消息进行消息队列尾部的方案
*/
log.info("已经重试 " + retryCount + " 次");
headers.put("retry-count", retryCount);
//1.应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//2.重新发送到MQ中
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), basicProperties,
message.getBody());
} else {
log.info("现在重试次数为:" + retryCount);
/**
* 不重要的操作放入 死信队列
* 消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到ttl超时时间,再转到死信,证明这个信息有问题需要人工干预
*/
//休眠2s 延迟写入队列,触发转入死信队列
Thread.sleep(2000);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
7. RabbitMQ 进阶
7.1 消息幂等性
目前实现的最多的就是 At least one
语义,也就是保证消费者至少接收到一次消息,所以会存在重复消息的情况,需要我们业务代码进行幂等的处理.
可以给发送的消息一个唯一 ID,将 ID 存储在分布式缓存(或数据库)中,这样在每次接收到消息之后,进行比较,看是否是重复的消息。
@Service
public class RabbitService {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(RabbitSendVo sendVo) {
String exchange = sendVo.getExchange();
String routingKey = sendVo.getRoutingKey();
Mail mail = sendVo.getMail();
//默认使用 UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData();
//correlationData.setId("1");
rabbitTemplate.convertAndSend(exchange, routingKey, mail,correlationData);
}
}
需要保证缓存一定是高可用的
7.2 消息顺序
一个 queue
,有多个 consumer
去消费,这样就会造成顺序的错误,consumer
从 MQ 里面读取数据是有序的,但是每个 consumer
的执行时间是不固定的,无法保证先读到消息的 consumer
一定先完成操作,这样就会出现消息并没有按照顺序执行,造成数据顺序错误。
这时,可以采用单消费者模式,一个队列只绑定一个消费者。
7.3 限流
在订单高峰期,rabbitmq
上已经堆积了很多消息等待消费,如果没有任何限流措施,贸然启动一个消费者时,如此多的消息瞬间推送给消费者,消费者可能因无法处理这么多的消息而承受巨大压力,甚至崩溃!
在 SpringBoot 中:
spring.rabbitmq.listener.prefetch=1
// 表示不限制消息大小, 一次只处理一条消息, 限制只是当前消费者有效
channel.basicQos(0, 1, false);
7.4 队列最大限制
默认情况下,rabbitmq
中的 queue
的最大长度和总字节数不受限制的(仅受全局内存,磁盘阈值的影响)。
当 queue
达到限制的阈值时,如果此 queue
配置了 dead-lettered
,则 queue
最前面的消息将被转发到 dead-lettered
,如果没有配置则最前面的消息将会被直接丢弃。从而达到通过转发或丢弃 queue
最前面的消息,来为新消息腾出空间的目的。
可以通过参数来配置 queue 的最大长度和最大总字节数:
- x-max-length:控制 queue 的最大长度 —— 非负整数
- x-max-length-bytes:控制 queue 中所有消息的最大总字节数 —— 非负整数
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10); // 设置queue的最大长度10
args.put("x-max-length-bytes", 1024); // 设置最大总字节数1KB
channel.queueDeclare("myqueue", false, false, false, args);