RabbitMQ消息可靠性分析

消息中间件的可靠性是指对消息不丢失的保障程度;而消息中间件的可用性是指无故障运行的时间百分比,通常用几个 9 来衡量。不存在绝对的可靠性只能尽量趋向完美。并且通常可靠性也意味着影响性能和付出更大的成本,因此实际应用时还要根据业务需求,对真正关键的信息来做可靠性保证,并要从生产者、消息队列、消费者三个维度来努力。

1、生产者发送信息的可靠性 

生产者客户端发送出去之后可以发生网络丢包、网络故障等造成消息丢失。一般情况下如果不采取措施,生产者无法感知消息是否已经正确无误的发送到交换器中。如果消息在传输到Exchange的过程中发生失败而可以让生产者感知的话,生产者可以进行进一步的处理动作,比如重新投递相关消息以确保消息的可靠性。为此AMQP协议在建立之初就考虑到这种情况而提供了事务机制。

事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则我们便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制的话会“吸干”RabbitMQ的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP协议层面来看并没有更好的办法,但是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)。事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。事务机制和publisher confirm机制两者是互斥的,不能共存。

 2、消息路由到队列的可靠性

消息发送到交换器,如果没有和它匹配队列的话,消息也会丢失,mandatory或者AE可以让消息在路由到队列之前得到极大的可靠性保障。当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形的话,消息直接被丢弃。

备份交换器,英文名称Alternate Exchange,简称AE,或者更直白的可以称之为“备胎交换器”。生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失,如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂化。如果你不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。 可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange参数来实现,也可以通过策略的方式实现。如果两者同时使用的话,前者的优先级更高,会覆盖掉Policy的设置。

3、消息在队列中持久化

持久化可以提高队列的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。队列的持久化是通过在声明队列时将durable参数置为true实现的,如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据将会丢失,此时数据也会丢失。

队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。

如果在发送消息时采用了事务机制或者publisher confirm机制的话,服务端的返回是在消息落盘之后执行的,这样可以进一步的提高了消息的可靠性。但是即便如此也无法避免单机故障且无法修复(比如磁盘损毁)而引起的消息丢失,这里就需要引入镜像队列。镜像队列相当于配置了副本,绝大多数分布式的东西都有多副本的概念来确保HA。在镜像队列中,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全的保证RabbitMQ消息不丢失(比如机房被炸。。。),但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。

4、消息消费阶段的可靠性

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了这些消息。RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认,如果只是简单的拒绝那么消息会丢失,需要将相应的requeue参数设置为true,那么RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。如果requeue参数设置为false的话,RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

还有一种情况需要考虑: requeue的消息是存入队列头部的,即可以快速的又被发送给消费,如果此时消费者又不能正确的消费而又requeue的话就会进入一个无尽的循环之中。对于这种情况,笔者的建议是在出现无法正确消费的消息时不要采用requeue的方式来确保消息可靠性,而是重新投递到新的队列中,比如设定的死信队列中,以此可以避免前面所说的死循环而又可以确保相应的消息不丢失。对于死信队列中的消息可以用另外的方式来消费分析,以便找出问题的根本。

RabbitMQ消息可靠性分析

上一篇:同一台服务器启动多个driver负载机实例


下一篇:oracle的db link