RabbitMQ 的优化
channel
生产者,消费者和 RabbitMQ 都会建立连接。为了避免建立过多的 TCP 连接,减少资源额消耗。
AMQP 协议引入了信道(channel),多个 channel 使用同一个 TCP 连接,起到对 TCP 连接的复用。
不过 channel 的连接数是有上限的,过多的连接会导致复用的 TCP 拥堵。
const (
maxChannelMax = (2 << 15) - 1
defaultChannelMax = (2 << 10) - 1
)
通过http://github.com/streadway/amqp
这个client来连接 RabbitMQ,这里面定义了最大值65535和默认最大值2047。
prefetch Count
什么是prefetch Count
,先举个栗子:
假定 RabbitMQ 队列有 N 个消费队列,RabbitMQ 队列中的消息将以轮询的方式发送给消费者。
消息的数量是 M,那么每个消费者得到的数据就是 M%N。如果某一台的机器中的消费者,因为自身的原因,或者消息本身处理所需要的时间很久,消费的很慢,但是其他消费者分配的消息很快就消费完了,然后处于闲置状态,这就造成资源的浪费,消息队列的吞吐量也降低了。
这时候prefetch Count
就登场了,通过引入prefetch Count
来避免消费能力有限的消息队列分配过多的消息,而消息处理能力较好的消费者没有消息处理的情况。
RabbitM 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP/IP
中的"滑动窗口"。
所以消息不会被处理速度很慢的消费者过多霸占,能够很好的分配到其它处理速度较好的消费者中。通俗的说就是消费者最多从 RabbitMQ 中获取的未消费消息的数量。
prefetch Count
数量设置为多少合适呢?大概就是30吧,具体可以参见Finding bottlenecks with RabbitMQ 3.3
谈到了prefetch Count
,我们还要看了 global 这个参数,RabbitMQ 为了提升相关的性能,在 AMQPO-9-1
协议之上重新定义了 global 这个参数
global 参数 | AMQPO-9-1 | RabbitMQ |
---|---|---|
false | 信道上所有的消费者都需要遵从 prefetchC unt 的限 | 信道上新的消费者需要遵从 prefetchCount 的限定值定值 |
true | 当前通信链路(Connection) 上所有的消费者都要遵从 prefetchCount 的限定值,就是同一Connection上的消费者共享 | 信道上所有的消费者都需要遵从 prefetchCunt 的上限,就是同一信道上的消费者共享 |
prefetchSize:预读取的单条消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限,单位为 B。
如果prefetch Count
为 0 呢,表示预读取的消息数量没有上限。
举个错误使用的栗子:
之前一个队列的消费者消费速度过慢,prefetch Count
为0,然后新写了一个消费者,prefetch Count
设置为30,并且起了10个pod,来处理消息。老的消费者还没有下线也在处理消息。
但是发现消费速度还是很慢,有大量的消息处于 unacked 。如果明白prefetch Count
的含义其实就已经可以猜到问题的原因了。
老的消费者prefetch Count
为0,所以很多 unacked 消息都被它持有了,虽然新加了几个新的消费者,但是都处于空闲状态,最后停掉了prefetch Count
为0的消费者,很快消费速度就正常了。
死信队列
什么是死信队列
一般消息满足下面几种情况就会消息变成死信
-
消息被否定确认,使用
channel.basicNack
或channel.basicReject
,并且此时 requeue 属性被设置为false; -
消息过期,消息在队列的存活时间超过设置的 TT L时间;
-
队列达到最大长度,消息队列的消息数量已经超过最大队列长度。
当一个消息满足上面的几种条件变成死信(dead message)之后,会被重新推送到死信交换器(DLX ,全称为 Dead-Letter-Exchange)。绑定 DLX 的队列就是死信队列。
所以死信队列也并不是什么特殊的队列,只是绑定到了死信交换机中了,死信交换机也没有什么特殊,我们只是用这个来处理死信队列了,和别的交换机没有本质上的区别。
对于需要处理死信队列的业务,跟我们正常的业务处理一样,也是定义一个独有的路由key,并对应的配置一个死信队列进行监听,然后 key 绑定的死信交换机中。
使用场景
当消息的消费出现问题时,出问题的消息不被丢失,进行消息的暂存,方便后续的排查处理。
代码实现
死信队列的使用,可参看下文,配合延迟队列实现消息重试的机制。
延迟队列
什么是延迟队列
延迟队列就是用来存储进行延迟消费的消息。
什么是延迟消息?
就是不希望消费者马上消费的消息,等待指定的时间才进行消费的消息。
使用场景
1、关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭;
2、清理过期数据业务上。比如缓存中的对象,超过了空闲时间,需要从缓存中移出;
3、任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求;
4、下单之后如果三十分钟之内没有付款就自动取消订单;
5、订餐通知:下单成功后60s之后给用户发送短信通知;
6、当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存;
7、定期检查处于退款状态的订单是否已经退款成功;
8、新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信;
9、定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。
总结下来就是一些延迟处理的业务场景
实现延迟队列的方式
RabbitMQ 中本身并没有直接提供延迟队列的功能,可以通过死信队列和 TTL 。来实现延迟队的功能。
先来了解下过期时间 TTL,消息一旦超过设置的 TTL 值,就会变成死信。这里需要注意的是 TTL 的单位是毫秒。设置过期时间一般与两种方式
-
1、通过队列属性设置,队列中的消息有相同的过期时间;
-
2、通过消息本身单独设置,每条消息有自己的的过期时间。
如果两种一起设置,消息的 TTL 以两者之间较小的那个数值为准。
上面两种 TTL 过期时间,消息队列的处理是不同的。第一种,消息一旦过期就会从消息队列中删除,第二种,消息过期了不会马上进行删除操作,删除的操作,是在投递到消费者之前进行判断的。
第一种方式中相同过期时间的消息是在同一个队列中,所以过期的消息总是在头部,只要在头部进行扫描就好了。第二种方式,过期的时间不同,但是消息是在同一个消息队列中的,如果要清理掉所有过期的时间就需要遍历所有的消息,当然这也是不合理的,所以会在消息被消费的时候,进行过期的判断。这个处理思想和 redis 过期 key 的清理有点神似。
Queue TTL
通过 channel.queueDeclare
方法中的 x-expires
参数可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get
命令。
if _, err := channel.QueueDeclare("delay.3s.test",
true, false, false, false, amqp.Table{
"x-dead-letter-exchange": b.exchange,
"x-dead-letter-routing-key": ps.key,
"x-expires": 3000,
},
); err != nil {
return err
}
Message TTL
对于 Message TTL
设置有两种方式
Per-Queue Message TTL
通过在 queue.declare
中设置 x-message-ttl
参数,可以控制在当前队列中,消息的过期时间。不过同一个消息被投到多个队列中,设置x-message-ttl
的队列,里面消息的过期,不会对其他队列中相同的消息有影响。不同队列处理消息的过期是隔离的。
if _, err := channel.QueueDeclare("delay.3s.test",
true, false, false, false, amqp.Table{
"x-dead-letter-exchange": b.exchange,
"x-dead-letter-routing-key": ps.key,
"x-message-ttl": 3000,
},
); err != nil {
return err
}
Per-Message TTL
通过 expiration 就可以设置每条消息的过期时间,需要注意的是 expiration 是字符串类型。
delayQ := "delay.3s.test"
if _, err := channel.QueueDeclare(delayQ,
true, false, false, false, amqp.Table{
"x-dead-letter-exchange": b.exchange,
"x-dead-letter-routing-key": ps.key,
},
); err != nil {
return err
}
if err := channel.Publish("", delayQ, false, false, amqp.Publishing{
Headers: amqp.Table{"x-retry-count": retryCount + 1},
Body: d.Body,
DeliveryMode: amqp.Persistent,
Expiration: "3000",
}); err != nil {
return err
}
通过延迟队列来处理延迟消费的场景,可以借助于死信队列来处理
延迟队列通常的使用:消费者订阅死信队列 deadQueue,然后需要延迟处理的消息都发送到 delayNormal 中。然后 delayNormal 中的消息 TTL 过期时间到了,消息会被存储到死信队列 deadQueue。我们只需要正常消费,死信队列 deadQueue 中的数据就行了,这样就实现对数据延迟消费的逻辑了。
使用 Queue TTL 设置过期时间
举个线上处理消息重传的的栗子:
消费者处理队列中的消息,一个消息在处理的过程中,会出现错误,针对某些特性的错误,希望这些消息能够退回到队列中,过一段时间在进行消费。当然,如果不进行 Ack,或者 Ack 之后重推到队列中,消费者就能再次进行重试消费。但是这样会有一个问题,消费队列中消息消费很快,刚重推的消息马上就到了队列头部,消费者可能马上又拿到这个消息,然后一直处于重试的死循环,影响其他消息的消费。这时候延迟队列就登场了,我们可以借助于延迟队列,设置特定的延迟时间,让这些消息的重试,发生到之后某个时间点。并且重试一定次数之后,就可以选择丢弃这个消息了。
来看下流程图:
具体的处理步骤:
1、生产者推送消息到 work-exchange 中,然后发送到 work-queue 队列;
2、消费者订阅 work-queue 队列,这是正常的业务消费;
3、对于需要进行延迟重试的消息,发送到延迟队列中;
4、延迟队列会绑定一个死信系列,死信队列的 exchange 和 routing-key,就是上面正常处理业务 work-queue 消息队里的 exchange 和 routing-key,这样过期的消息就能够重推到业务的队列中,每次重推到延迟队列的时候会记录消息重推的次数,如果达到我们设定的上限,就可以丢弃数据,落库或其他的操作了;
5、所以消费者只需要监听处理 work-queue 队列就可以了;
6、无用的延迟队列,到了删除的时间节点,会进行自动的删除。
上代码,文中 Demo 的地址