MQ概述
什么是消息队列?
使用队列来通信的组件,把要传输的消息放在队列中。
为什么要使用消息队列(优点)?
- 系统解耦:解耦消息生产者和消费者之间的关系
- 异步调用:用户调用接口时,由于接口之间调用导致用时时间比较久,用户体验不好。调用接口后将消息放入到MQ后就返回,用户体验好,最终一致性由MQ来保证
- 流量削峰:减少高峰时期对服务器压力,先把请求放到MQ中,系统根据实际能处理的并发量来消费请求
解耦的面试技巧:你需要思考一下,在你自己的系统里面有没有类似的情况,一个系统或者模块,调用了多个系统或者模块,它们互相之间的调用非常复杂,并且维护起来很麻烦,但其实这个调用是不需要直接同步调用接口的,如果用MQ给它异步化解耦也是可以的,你就需要思考在你的项目里,是不是可以用MQ给它进行系统的解耦,可以自己组织一下语言回答。
会带来什么问题(缺点)?
- 系统可用性降低。如果MQ挂了,整个系统就不能服务了
- 系统复杂性提高。消息丢失、消息重复消费、消息重复发送、消息顺序错乱等问题
- 一致性问题。将消息放到MQ后就返回给用户成功的信息,但是其他系统消费消息时,若某个系统失败了,导致数据不一致
RabbitMQ
什么是RabbitMQ?
它是实现了高级消息队列协议(AMQP)的消息中间件。
RabbitMQ 组件
- Broker:一个Broker可以看做一个RabbitMQ服务器
- Connection:实际的连接(TCP连接)
- Channel:虚拟连接。一个Connection上可以有多个Channel(信道),信道没有数量限制,消息在信道上传输。
- Message
- Virtual host:虚拟broker。其内部含有独立的queue、exchange、binding,以及独立的权限系统
- Exchange:交换机。生产者将消息发送到交换机,由交换机将消息路由到一个或多个队列中。路由不到时,返回给生产者或直接丢弃(mandatory 为 true,返回消息给生产者;为 false 丢弃消息)
- Queue:存储消息的数据结构。可以设置长度,处于ready状态的消息会被计数,不统计处于unack的消息。消息重新入队会处于队头。多个消费者可以订阅同一个队列,队列中的消息会平摊给各个消费者处理
RabbitMQ工作模型
- 简单队列(Simple Queue):生产者将消息投递到队列里,消息者从队列里取消息
- 工作队列(Worker Queue):一个生产者,一个队列,多个消费者,用于消息消费耗时的场景
- 发布订阅(fanout)
- Direct:消息根据路由键投递相应的队列中
- Topic:消息根据路由键和绑定键的匹配,投递到相应的队列中。如果多个键匹配成功,且目标队列是同一个队列,队列只会收到一条消息
- Headers:用键值对做匹配,匹配方式有all和any,不常使用
其中3-6是交换机的四种工作模式(四种交换机类型)
RabbitMQ消费模式包括哪些?
推送(push)
- 通过
channel.basicConsume
将 channel(信道)设置为推模式,消息可用时自动将消息推送给消费者 - 消费者需要设置缓冲区缓存消息
- 实时性好(长连接)
拉取(pull)
- 使用
channel.basicGet
拉消息 - 轮询模型,消费者发送 get 请求获取消息,如果队列中没有消息,则获得空的回复
- 需要时才去拉取消息,实时性差,耗费资源(短连接)
Qos(质量服务)
消息发送给消费者后,默认是自动确认,如果消费者未能消费成功,则消息丢失。
通过显式确认可以保证只有当消息处理完成并收到Ack后才从队列中删除。但是存在的问题是:1)消息太多全部传给消费者,可能造成消费者内存爆满;2)消息处理慢时,想让别的消费者一起处理,但是这些消息都被原来的消费者接收了,这些消息不会再发送给新添加的消费者
Qos可以解决上述问题,需要开启消息的显式确认,设置每次传输给消费者的消息条数为n,消费者处理完n条消息后再获取n条消息进行处理;而新增消费者时,消息可以立即发送给新的消费者。
多个消费者监听一个队列,消息如何分发?
- 轮询:默认策略
- 公平分发(QoS):给空闲的消费者发送更多的消息(当消费者有x条消息没有响应时,不再给该消费者发消息)
RabbitMQ如何保证消息的可靠性?
-
生产者到队列:事务机制和Confirm机制,注意:事务机制和Confirm机制是互斥的,两者不能共存,会导致RabbitMQ报错。
-
队列自身:持久化、集群、普通模式、镜像模式。
-
队列到消费者:basicAck机制、死信队列、消息补偿机制
生产者如何可靠地将消息投递到队列中?
confirm机制:
- 生产者生产的消息带有唯一 ID
- 消息被投递到目标队列后,发送Ack消息(包含消息的唯一ID)给生产者
- 有可能因为网络问题导致Ack消息无法发送到生产者,那么生产者在等待超时后,会重传消息;或者RabbitMQ内部错误导致消息丢失,则发送nack消息
- 生产者收到Ack消息后,认为消息已经投递成功
队列自身不弄丢消息
队列开启持久化,消息的diliveryMode = 2
队列如何将消息可靠投递到消费者?
手动确认:
- 队列将消息push给消费者(或消费者来pull消息)
- 消费者得到消息并做完业务逻辑
- 消费者发送Ack消息给队列 ,通知队列删除该消息(队列会一直等待直到得到ack消息,队列通过消费者的连接是否中断来确认是否需要重新发送消息,只要连接不中断,消费者有足够长的时间来处理消息,保证数据的最终一致性)
- 队列将已消费的消息删除
RabbitMQ如何保证消息的可靠性?如何避免消息重复投递或重复消费?
重复投递的原因:等待超时后,需要重试。
避免重复投递:消息生产时,生产者发送的消息携带一个Message ID
(全局唯一ID),作为去重和幂等的依据,避免重复的消息进入队列
重复消费的原因:消费者接收消息后,在确认之前断开了连接或者取消订阅,消息会被重新分发给下一个订阅的消费者。
避免重复消费:消息消费时,要求消息体中必须要有一个全局唯一ID,作为去重和幂等的依据,避免同一条消息被重复消费
RabbitMQ消息的状态
-
alpha
:消息内容(包括消息体、属性和headers) 和消息索引都存储在内存中 -
beta
:消息内容保存在磁盘中,消息索引保存在内存中 -
gamma
:消息内容保存在磁盘中,消息索引在磁盘和内存中都有 -
delta
:消息内容和索引都在磁盘中
高级特性
死信
消息成为死信消息的原因:
- 消息被拒(
Basic.Reject/Basic.Nack
)且不重新入队 - 消息TTL过期
- 队列满了,无法再添加
死信消息的headers
字段名 | 含义 |
---|---|
x-first-death-exchange | 第一次成为死信前交换机的名称 |
x-first-death-reason | 第一次成为死信的原因。rejected:消息被拒,且 default-requeue-rejected 参数为false。expired:消息过期。maxlen: 队列内消息数量超过队列最大容量 |
x-first-death-queue | 第一次成为死信前所在队列名称 |
x-death | 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新 |
https://cloud.tencent.com/developer/article/1463065
如何使用?
为队列配置死信交换机,再为死信交换机配置死信队列,消息成为死信信息后,会由死信交换机投递到死信队列中。死信交换机、死信队列和普通的交换机、队列没有区别。例子如下:
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(4);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key。如果不设置的话,保留原来的路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
// 绑定
return QueueBuilder.durable(QUEUEA_NAME).withArguments(args).build();
}
延迟队列
存储延迟消息,消息被发送后不想让消费者立即拿到消息,要等待特定时间后才能拿到消息。
实现方式
-
3.6.x之前,死信队列+TTL过期时间可以实现延迟队列。消息先进入一个队列,该队列没有消费者,那么消息过期后进入死信队列
这种方式存在的缺陷:如果队头消息没有过期而队中的某个消息过期了,队中的该消息只有走到队头才会进入死信队列。解决的方法是多级队列。
如果所有消息的延迟时间是相同的话,可以设置队列的消息过期时间(
x-expires
) -
3.6.x开始,延迟队列插件
应用场景
- x天自动确认收货、自动默认评价。签收商品后物流系统在x天的延迟后发送消息给支付系统,通知系统打款给商家。x天后通知评价系统进行默认评价
- 12306购票支付确认页面,如果30分钟没有付款将自动取消订单。在下订单的那一刻购票系统会发送一个延时消息给订单系统。如果在30分钟内完成了订单,则可以通过逻辑代码忽略消息
优先级队列
优先级高的队列会先被消费(设置 x-max-priority
参数)。当消费速度大于生成速度且 Broker 没有堆积的情况下,没有意义。
集群
普通集群
每台机器上启动一个RabbitMQ实例,而创建的queue只放在一个实例上,其他实例同步queue的元数据。消费时如果连接到了另一个实例上,则实例会从queue所在的实例上拉取数据
多个实例服务一个queue
镜像集群
RabbitMQ的元数据和queue里的消息都会存在于多个实例上,每次消息进入队列,会自动把消息同步到多个实例的队列中
分为一个master和多个slave。所有的操作最终都会到master上操作
- 生产者可任意选择一个节点连接,如果该节点不是master,则转发给master。master向slave发送消息,收到半数以上回复后本地提交,再让slave提交
- 消费者可任意选择一个节点连接,如果该节点不是master,则转发给master。消费者消费后进行 ack 确认,master收到ack后删除,并让slave删除。
- 如果master掉线,自动选出一个节点(slave中消息队列最长的节点)作为新的master
https://blog.csdn.net/jing956899449/article/details/107064395
场景题
如何保证消息的顺序性?
- 只有一个消费者,可以保证顺序
- 多个队列,每个队列对应一个消费者,同一个用户的操作hash到同一个队列上
- 每个消息有一个全局ID,同时去关联一个parentMsgId,在前一条消息未消费时不处理下一条消息
消息积压在消息队列中会导致什么结果?产生的原因是?如何解决?
原因:消费者消费速度慢,或者出现了问题
导致的结果:1)磁盘空间满了;2)海量消息堆积,消费者需要很长时间消费
解决办法:
- 磁盘空间满的情况:在其他机器上建立临时的消息队列,再写一个临时的消费者,把积压的消息放到临时队列里去
- 海量消息堆积的情况:修复消费者问题,停掉现有的消费者,临时建立10倍的消息队列,再用一个临时的消费者将消息分发到临时消息队列中,临时征用10倍的机器部署消费者。等积压消息消费完成后,再恢复成之前的架构
Kafka和RabbitMQ的对比
保证消息可靠性:
-
Kafka
1)使用acks=all,Leader收到消息后,等待所有ISR列表的Follower返回后再发送ack给生产者(分区副本数和ISR最少副本数配置成大于1)
2)生产者使用同步发送消息(默认是异步,多个请求先放在缓冲区,再合并发送)
3)消费者使用手动提交offset,从而保证消息至少消费一次 -
RabbitMQ:RabbitMQ如何保证消息的可靠性?
RabbitMQ的优缺点:
优点
- 延迟很低(微秒级)
- 可靠性:使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
- 灵活的路由:对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
- 管理界面 : RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP、MQTT等多种消息中间件协议。
- 支持多种语言:如Java、Python、Ruby、PHP、C#、JavaScript等。
- 插件机制 : RabbitMQ提供了许多插件,以实现从多方面进行扩展,也可以自定义插件。
缺点
- erlang开发,难以维护
- 相比较于其他消息中间件,吞吐量较低(万级)
Kafka的优点和缺点
优点:
- 吞吐量十万级
- 可用性非常高(一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用)
- 消息可以做到0丢失
缺点:
- topic增多会导致吞吐量大幅下降(几百个topic),如果要支持大规模topic,需要更多的机器资源
- 依赖ZooKeeper进行元数据管理(额外的复杂性)