RabbitMQ知识总结
AMQP协议
AMQP协议是一个提供统一消息服务的应用层标准协议,并不会受到客户端/中间件不同产品、不同开发语言等条件的影响。RabbitMQ则是基于该协议实现的。
举个例子来说,如下图,生产者将消息发送到交换机上,交换机接收到信息以后按照相应的路由键路由到队列中,这里的交换机只是起到了路由的功能,实际上消息存储在消息队列中,接下来在队列中的的消息可以将其散发到消费者或者消费者主动去获取消息。
三层协议
该协议分为三层,分别是Module Layer、Session Layer、Transport Layer,如下图:
- Module Layer:决定基本域模型所产生的行为,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。
- Session Layer:主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端和服务器之间通信提供可靠性、同步机制和错误处理。
- Transport Layer:主要用于二进制数据流的传输。
RabbitMQ
我们首先了解一些概念:
- Broker:接收和分发消息的应用,像是RabbitMQ、ZeroMQ或者Redis等等。
- Connection:publisher/consumer和Broker之间的TCP连接
- Channel:两个AMQP结点之间双向通信流,通道是多路复用的,因此单个网络连接可以支撑多个通道,各个通道之间是相互隔离的,其极大的减少了操作系统建立多个TCP连接的开销。
- Message:消息
- Exchange:服务器中接收来自生产者程序的消息的实体,并可选择将这些消息路由到服务器中的消息队列中
- Message queue:保存消息并将它们转发给消费者
- Routing key:一个虚拟地址,虚拟机可用它来确定如何路由一个特定消息
交换机
属性
交换机常用的属性有:
- Name (名称)
- Durability (消息代理重启后,交换机是否还存在)
- Auto-delete (当所有与此交换机绑定的队列都不再使用此交换机时是否自动删除)
类型
Exchange存在多种类型,其中最常使用的包括Direct
、Fanout
、Topic
三种类型,还有一种特殊的Headers
,它们分别的用处是:
-
Direct:Message中的routing key 如果和Binding中的 binding key一致的话,则Exchange会将Message散发到对应的Message queue中。
在默认的情况下创建的就是该类型的exchange,通常使用在需要发送消息到具体的队列的情况,比如下面这张图中,最后交换机会将消息路由到名字为green的消息队列中。
-
Fanout:Exchange会将所有的Message散发到关联的Message queue中。
即便是已经提供了routing key,该类型交换机也会忽略掉,并且将消息发送到所有关联的队列中,通常用来实现发布/订阅模式,比如说体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端,分发系统使用它来广播各种状态和配置更新等等。
-
Topic:根据正则表达式将Message散发到匹配的Message queue中。
比如在下图中,根据Routing key
first.green.fast
能够匹配到的队列有.green.
和.*.fast
,该模式主要用来实现消息多播路由。比如说分发有关于特定地理位置的数据,例如销售点,涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)等等。 -
Headers:类似于Direct,但是当涉及到多个属性的时候,一个Routing key 无法完全表达,其只能是个字符串,而多属性需要使用到多个键值对表示,因此使用消息属性来代替路由键作为路由规则,通过判断消息头中的值来和队列中的值来确定消息的路由位置。
如果x-match为any的时候, 表示消息头的任意一个值被匹配后就可以满足条件,而当x-match为 all的时候,表示消息头的所有的值都需要被匹配到才能满足条件,比如在下图中,因为第一个队列为any,并且存在匹配的值,所以可以路由到绿色的路由中,因为第三个队列为all,消息头并没有都匹配所有的键值对,因此消息只会发送到绿色的消息队列中。
队列
同样的,和交换机一样,队列也有常用的属性:
- Name (名称)
- Durability (消息代理重启后,交换机是否还存在)
- Auto-delete (当所有与此交换机绑定的队列都不再使用此交换机时是否自动删除)
这里需要注意的是,如果在消息代理中队列并不存在,则会根据声明去定义;如果已经存在,但是这次声明的队列的属性和之前已经存在的属性存在差异,则会返回一个通道级异常!这里的持久化设置并不会让在队列中的消息也同样持久化,其只会在消息代理重启后重新定义该队列,如果没有设置消息持久化,那么队列中的消息则不会存在。
消息
如果消息无法路由到具体的队列中,那么该消息可能会被销毁或是重新返回给发布者,这取决于设置的属性。
消息存在三个动作,分别是确认、拒绝、预取
消息确认
消息确认存在两种模式,分别是1. 自动确认模式:消息队列发送完后就销毁;2.显示确认模式:在消费者进行确认后再进行销毁;
在显示确认模式情况下,如果消费者不幸挂掉了,该机制会使消息重新回到消息队列中,等待下一位消费者。
拒绝消息
分为重新放回队列还是立即销毁,像是在kombo中,它们分别通过message.requeue()
和message.reject()
实现。
预取
如果在多个消费者共享一个消息队列的情况下,可以在消费者发送消息的时候设置预取的消息个数,起到一个负载均衡的效果。
连接和通道
连接即Connection
,内部则是通过TCP长连接实现,关闭连接时最好优雅的关闭AMQP连接,即connection.close()
,而不是简单粗暴的关闭TCP连接,否则会报出相关的连接错误信息。
如果存在多个连接的情况下,为了避免创建多个TCP而造成系统资源的浪费和超载,因此使用Channel
通道,其本质是共享了TCP的连接,将其连接分为多个互不影响的连接通道,从而有效的利用TCP连接。
使用
具体参考官方文档
比较
现在比较流行的消息队列有ZeroMQ和RabbitMQ,其中ZeroMQ拥有更出色的氢能,不过是建立在允许消息数据丢失的情况下,适用于高吞吐量/低延迟的应用场景中。与ZeroMQ不同,RabbitMQ完全实现了AMQP协议,使用上类似于邮箱服务,支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。
功能 | RabbitMQ | ZeroMQ |
---|---|---|
消息持久化 | 支持 | 不支持 |
事务 | 支持 | 不支持 |
类似 | 邮箱 | socket |
性能 | 低 | 高 |
稳定性 | 高 | 不稳定 |
支持AMQP协议 | 支持 | 不支持 |
适用场景 | 不允许数据丢失 | 高吞吐 |