RabbitMQ问题分析

一、如何避免消息重复投递或重复消费?

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;

在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID 等)作为去重的依据,避免同一条消息被重复消费。

二、如何确保消息正确地发送至RabbitMQ?如何确保消息接收方消费了消息?

1、发送方确认模式

(1)将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。

(2)一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

(3)如果 RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(notacknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

2、接收方确认机制

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;

下面罗列几种特殊情况

  • 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
  • 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

三、消息基于什么传输?

由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。

四、消息怎么路由?

  • 消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。

  • 通过队列绑定建,可以把队列绑定到交换器上。

  • 消息到达交换器后,RabbitMQ 会将消息的路由键与队列的绑定键进行匹配(针对不同的交换器有不同的路由规则);

常用的交换器主要分为一下三种

  • fanout:如果交换器收到消息,将会广播到所有绑定的队列上

  • direct:如果路由键完全匹配,消息就被投递到相应的队列

  • topic:可以使来自不同源头的消息能够到达同一个队列。 使用topic交换器时,可以使用通配符

五、如何保证RabbitMQ消息的可靠传输?

消息不可靠的情况可能是消息丢失,劫持等原因;丢失又分为:

  • 生产者丢失消息

  • 消息列表丢失消息

  • 消费者丢失消息

1、生产者丢失消息(发送方确认模式)

从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;

  • transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

  • confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

2、消息队列丢数据(消息持久化)

消息持久化,当然前提是队列必须持久化。RabbitMQ确保持久性消息能从服务器重启中恢复的方式是,将它们写入磁盘上的一个持久化日志文件,当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到日志文件后才发送响应。一旦消费者从持久队列中消费了一条持久化消息,RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。如果持久化消息在被消费之前RabbitMQ重启,那么Rabbit会自动重建交换器和队列(以及绑定),并重新发布持久化日志文件中的消息到合适的队列。

那么如何持久化呢?其实也很容易,就下面两步:

  • 将queue的持久化标识durable设置为true,则代表是一个持久的队列

  • 发送消息的时候将deliveryMode=2这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据

3、消费者丢失消息(接收方确认模式)

消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息。

六、消息的顺序问题

拆分多个queue(消息队列),每个queue(消息队列) 一个consumer(消费者),就是多一些queue(消息队列)而已,确实是麻烦点;或者就一个queue (消息队列)但是对应一个consumer(消费者),然后这个consumer(消费者)内部用内存队列做排队,然后分发给底层不同的worker来处理。

七、为什么不对所有的message都使用持久化机制?

首先,必然导致性能的下降,因为写磁盘比写RAM慢的多,message的吞吐量可能有10倍的差距。其次,message的持久化机制用在RabbitMQ的内置cluster方案时会出现“坑爹”问题。矛盾点在于,若message设置了persistent属性,但queue未设置durable属性,那么当该queue的owner node出现异常后,在未重建该queue前,发往该queue 的message将被 blackholed;若 message 设置了 persistent属性,同时queue也设置了durable属性,那么当queue的owner node异常且无法重启的情况下,则该queue无法在其他node上重建,只能等待其owner node重启后,才能恢复该 queue的使用,而在这段时间内发送给该queue的message将被 blackholed 。所以,是否要对message进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到100,000 条/秒以上的消息吞吐量(单RabbitMQ服务器),则要么使用其他的方式来确保message的可靠delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

八、如何保证高可用的?RabbitMQ的集群?

RabbitMQ有三种模式:

  • 单机模式

  • 普通集群模式

  • 镜像集群模式

1、单机模式

就是Demo级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式

2、普通集群模式

意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。你创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。

3、镜像集群模式

这种模式,才是所谓的RabbitMQ的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据的意思。然后每次你写消息到queue的时候,都会自动把消息同步到多个实例的queue上。RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个queue的完整数据,别的consumer都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ一个queue的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个queue的完整数据。

九、消息积压在消息队列问题?

1、场景

几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,10点多,11点多。线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不行。一个消费者一秒是1000条,一秒3个消费者是3000条,一分钟是18万条,1000多万条。所以如果你积压了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。

2、解决方案

这种时候只能操作临时扩容,以更快的速度去消费数据了。具体操作步骤和思路如下:

  1. 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。

  2. 临时建立好原先10倍或者20倍的queue数量(新建一个topic,partition是原来的10倍)。

  3. 然后写一个临时分发消息的consumer程序,这个程序部署上去消费积压的消息,消费之后不做耗时处理,直接均匀轮询写入临时建好分10数量的queue里面。

  4. 紧接着征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的消息。

  5. 这种做法相当于临时将queue资源和consumer资源扩大10倍,以正常速度的10倍来消费消息。

  6. 等快速消费完了之后,恢复原来的部署架构,重新用原来的consumer机器来消费消息。

十、消息设置了过期时间,过期就丢失怎么办?

1、问题描述

rabbitmq是可以设置过期时间的,就是TTL,如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了,大量的数据会直接搞丢。

2、解决方案

这种情况下,实际上没有什么消息挤压,而是丢了大量的消息。所以第一种增加consumer肯定不适用。这种情况可以采取 “批量重导” 的方案来进行解决。在流量低峰期(比如夜深人静时),写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到mq里面,把丢失的数据重新补回来。

十一、积压消息长时间没有处理,mq放不下怎么办

1、问题描述

如果走的方式是消息积压在mq里,那么如果你很长时间都没处理掉,此时导致mq都快写满了,咋办?这个还有别的办法吗?

2、解决方案

这个就没有办法了,肯定是第一方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。首先,临时写个程序,连接到mq里面消费数据,收到消息之后直接将其丢弃,快速消费掉积压的消息,降低MQ的压力,然后走第二种方案,在晚上夜深人静时去手动查询重导丢失的这部分数据。

上一篇:RabbitMQ复习


下一篇:RabbitMQ,RocketMQ,Kafka 事务性,消息丢失和消息重复发送的处理策略