RabbitMQ 之消息应答

消息应答概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况?
RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了,RabbitMQ 可以把该消息删除了。
RabbitMQ 的应答模式分为两种:手动应答,自动应答

/**
 * 消费者消费消息
 * var1.消费哪个队列
 * var2.消费成功之后是否要自动应答, true 代表自动应答, false 代表手动应答
 * var3.消费者未成功消费的回调
 * var4.消费者成功消费的回调
 */
channel.basicConsume(ConnectUtil.QUEUE_NAME,Boolean.TRUE,deliverCallback,cancelCallback );

第二个参数代表是否要自动应答, true 代表自动应答, false 代表手动应答

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

手动应答的好处是可以批量(multiple 参数控制)应答并且减少网络拥堵

/** 
 * deliveryTag:该消息的index
 * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
 */
channel.basicAck(deliveryTag,multiple);

用于肯定确认,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

/**
 * deliveryTag:该消息的index
 * multiple:是否批量应答。true:将一次性拒绝所有小于deliveryTag的消息。
 * requeue:被拒绝的是否重新入队列
 */
channel.basicNack(deliveryTag,multiple,requeue);

用于否定确认

/**
 * deliveryTag:该消息的index
 * requeue:被拒绝的是否重新入队列
 */
channel.basicReject(deliveryTag,requeue);

用于否定确认,与 channel.basicReject() 的区别在于 basicNack() 可以拒绝多条消息,而 basicReject() 一次只能拒绝一条消息

参考:https://www.cnblogs.com/piaolingzxh/p/5448927.html

上一篇:WPF实现窗体移动


下一篇:dotnet 读 WPF 源代码笔记 简单聊聊文本布局换行逻辑