RabbitMQ核心思想
MQ是干什么用的?
应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等...
- 当前最主流的消息中间件。
- 高可用性,支持发送确认,投递确认等特性
- 高可用,支持镜像队列
- 支持插件
优点:
- 基于 Erlang, 支持高并发
- 支持多种平台,多种客户端,文档齐全
- 可靠性高
- 在互联网公司有较大规模等使用,社区活跃度高
1. AMQP协议介绍
Broker :接受和分发消息等的应用,RabbitMQ就是Message
Virtual Host : 虚拟机Broker , 将多个单元隔离开
Connection : publisher / consumer 和 broker之间的TCP连接
Channel : connection内部建立的逻辑连接,通常没个线程创建单独的channel
Rounting Key : 路由键,用来只是消息的路由转发,相当于快递的地址
Exchange : 交换机 ,相当于快递的分拨中心
Queue : 消息队列,消息最终被送到这里等待consumer 取走
Binding : exchange 和 queue之间的虚拟连接,用于message的分发依据
AMQP协议的核心概念-Exchange
- 在AMQP协议或者是RabbitMQ实现中,最核心的组件是Exchange
- Exchange 承担 RabbitMQ 的核心功能 --- 路由转发
- Exchange 有多个种类,配置多变,需要详细讲解
RabbitMQ核心 -- Exchange解析
- Exchange是 AMQP 协议和RabbitMQ的 核心组件
- Exchange的功能是根据 绑定关系 和 路由键为消息提供路由,将消息转发至相应的队列
- Exchange有4种类型 :Direct / Topic / Fabout /Headers
Direct Exchange (直接路由)
- Message中的Routing Key 如果和 Binding Key 一致, Direct Exchange 则将 message 发到对应的 queue中
Fanout Exchange (广播路由)
- 每个发到 Fanout Exchange 的 message 都会分发到所有绑定到queue上去
Topit Exchange (话题路由)
- 根据 Routing Key 及通配规则,Topic Exchange 将消息分发目标 Queue中
- 全匹配 :与Direct 类似
- Binding Key 中的 #:匹配任意个数的word
2. Docker 安装 RabbitMQ
docker pull rabbitmq
这里是直接安装最新的
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
访问 : http://IP地址:15672
用户名和密码默认都是guest
3. RabbitMQ保证消息的可靠性
- 需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理
- 需要使用RabbitMQ消息返回机制,若没发现目标队列,中间件会通知发送方
- 需要使RabbitMQ消息端确认消息,确认消息没有发生异常
- 需要使用RabbitMQ消费端限流机制,限制消息推送速度 ,保障接受端服务稳定
- 大量到堆积消息会给RabbitMQ产生很大到压力,需要使用RabbitMQ消息过期时间,防止消息大量积压
- 过期后会直接丢弃, 不符合业务逻辑,需要使用RabbitMQ死信队列,收集过期消息,以供分析
4. 发送确认机制原理
消息真的发出去了吗?
- 消息发送后,发送端不知道RabbitMQ是否真的收到了消息,若RabbitMQ异常,消息丢失,业务异常,这个时候我们就需要使用RabbitMQ发送端确认机制,确认消息发送
三种确认机制
1. 单条同步确认
- 配置channel,开启确认模式:channel.confirmSelect()
- 每发送一条消息,调用channel.waitForConfirms()方法等待确认
//建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
null,
message.getBytes());
if(channel.waitForConfirms()){
//表示发送确认处理逻辑
}else{
//发送失败
}
2. 多条同步确认
- 配置channel,开启确认模式:channel.confirmSelect()
- 发送多条消息后,调用channel.waitForConfirms()方法等待确认
//建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
null,
message.getBytes());
if(channel.waitForConfirms()){
//表示发送确认处理逻辑
}else{
//发送失败
}
3. 异步确认
- 配置channel,开启确认模式:channel.confirmSelect()
- 在channel上添加监听: addConfirmListener , 发送消息后,会回调此方法,通知是否发送成功
- 异步确认有可能是单条,也有可能是多条,取决于MQ
//建立连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String message = objectMapper.writeValueAsString(orderMessageDTO);
channel.confirmSelect();
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
null,
message.getBytes());
ConfirmListener confirmListener = new ConfirmListener(){
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Ack " + deliveryTag + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack " + deliveryTag + multiple);
}
};
channel.addConfirmListener(confirmListener);
5. 消息返回机制
消息真被路由了吗?
- 消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃,业务异常,需要使用RabbitMQ消息返回机制,确认消息被正确路由
消息的开启方法:
- 在RabbitMQ基础配置中又一个关键配置项:Mandatory
- Mandatory若为false,RabbitMQ将直接丢弃无法路由的消息
- Mandatory若为true,RabbitMQ才会处理无法路由的消息
DeliverCallback deliverCallback = ((consumerTag, message) -> {
//拿到消息
String messageBody = new String(message.getBody());
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
log.info("Message Return:");
//处理失败的业务逻辑
}
});
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
true,
null,
messageBody.getBytes());
}catch (Exception e){
log.error(e.getMessage());
}
});
6. 消费端确认机制
消费端处理异常怎么办?
- 默认情况下,消费端接收消息时,消息会被自动确认(ACK),发生异常时,发送端与消息中间件无法得知消息处理情况,需要使用RabbitMQ 消息端确认机制,确认消息被正确处理
消费端ACK类型
手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息
- 单条手动ACK : multiple = false
- 多条手动ACK : multiple = true
- 推荐使用单条ACK
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
channel.basicNack(message.getEnvelope().getDeliveryTag(),false,false);
自动ACK:消费端收到消息后,会自动签收消息
7. 消费端限流机制
业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推给接受端,造成接受端服务奔溃
在高并发端场景下,有个微服务奔溃了,本科期间队列挤压了大量消息,微服务上线后,收到大量并发消息。将同样多端消息推给能力不同端副本,会导致部分副本异常
针对以上问题,RabbitMQ 开发了 Qos (服务质量保证) 功能,Qos功能保证了在一定树木消息违背确认前,不消费新的消息
//这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
channel.basicQos(1);
8. RabbitMQ的过期时间(TTL)
- RabbitMQ的过期时间称为 TTL (time to live), 生存时间
- RabbitMQ的过期时间分为消息TTL 和 队列 TTL
- 消息TTL设置了单条消息的过期时间
- 队列TTL设置了队列中所有消息的过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) //deliveryMode=1代表不持久化,deliveryMode=2代表持久化
.contentEncoding("UTF-8") // 编码方式
.expiration("10000") // 过期时间
.headers(headers) //自定义属性
.build();
String messageBody = "发送的消息"
//发送通道
channel.basicPublish(
"exhange.order.restaurant",
"key.restaurant",
true,
properties,
messageBody.getBytes());
9. 死信队列
如何转移过期的消息?
- 消息被设置了过期时间,过期后会直接被丢弃,直接被丢弃的消息无法对系统运行异常发出警报,需要使用RabbitMQ死信队列,收集过期消息,以供分析
什么是死信队列
- 队列被配置了DLX属性 (Dead-Letter-Exchange) 当一个消息变成死信(dead message)后,能重新被发布到另一个 Exchange , 这个Exchange也是一个普通交换机,死信被死信交换机路由后,一般进入一个固定队列
怎么变成死信
- 消息被拒绝 (reject / nack) 并且 requeue = false
- 消息过期(TTL到期)
- 队列达到最大长度
个人博客地址:http://blog.yanxiaolong.cn/