AMQP

AMQP是什么

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。

在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。


AMQP模型

AMQP模型

AMQP

1)Broker
表示消息队列服务器实体(一个进程)。
一个server,接受客户端的连接,上线AMQP实体服务。

2)Connection
连接
应用程序与broker的网络连接,TCP/IP套接字连接。

3)Channel
消息通道
几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对多个
Channel,每个Channel代表一个会话任务。

4)Message
消息,消息是不具名的,它由消息头和消息体组成。消息是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)> ,priority(相对于其他消优先权),delivery-mode(指出该消息可能需要持久性存储)等

5)Exchange
交换机,用来接受生产者发送的消息,并将这些消息路由转发到某个队列。

6)Queue
消息队列,存储消息,用于发送给消费者。
它是消息的容器,也是消息的终点。一个消息可以投入多个队列。
消息一直在队列里面,等待消费者连接到这个队列将其取走。

7)Binding
绑定,消息队列和交换器之间的关联。
一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

8)Routing Key
路由关键字
一个消息头,交换机可以用这个消息头决定如何路由某条消息。

9)Publisher
消息生产者,是一个向交换器发布消息的客户端应用程序(进程)。

10)Consumer
消息消费者,是一个从消息队列中取得消息的客户端应用程序(进程)。

11)Virtual Host
虚拟主机


工作流程

生产者工作流程

AMQP

生产者发布消息流程:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道发送消息给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)

消费者工作流程

AMQP

消费者消费消息流程:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复

Exchange交换机

交换机是用来发送消息的AMQP实体。
交换机拿到一个消息之后将它路由给一个或零个队列。
它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。

AMQP 0-9-1的代理提供了四种交换机:
AMQP

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
Name
Durability(消息代理重启后,交换机是否还存在)
Auto-delete(当所有与之绑定的消息队列都完成了对此交换机的使用后,删除它)
Arguments(依赖代理本身)

交换机可以有两个状态:持久(durable),暂存(transient)。
持久化的交换机会在消息代理(broker)重启后依旧存在。
暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。
并不是所有的应用场景都需要持久化的交换机。

默认交换机(default exchange)

默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。

它有一个特殊属性使得它对于简单应用特别有用处:
那就是每新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

举个栗子:
当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为"search-indexing-online"。因此,当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。


直连交换机(direct exchange)

直连交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。

下面介绍它是如何工作的:
1)将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
2)当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。

直连交换机经常用来循环分发任务给多个消费者(我们称之为轮询)。
比如说有3个消费者,4个任务。分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者。综上,我们很容易得出一个结论:在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

直连型交换机图例:
AMQP

当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。

如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。


扇型交换机(funout exchange)

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

因为扇型交换机投递消息会拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
1)大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
2)体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
3)分发系统使用它来广播各种状态和配置更新
4)在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)

扇型交换机图例:
AMQP

上图所示,生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue,也就是图上的两个 Queue 最后两个消费者消费。


主题交换机(topic exchanges)

主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。

主题交换机规则:
前面提到的 direct 规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.
而Topic 的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。

它的约定是:
1)binding key 中可以存在两种特殊字符 “” 与“#”,用于做模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
2)routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.> nyse”、“nyse.vmw”、“quick.> orange.rabbit”
3)binding key 与 routing key 一样也是句点号 “.” 分隔的字符串

主题交换机图例:
AMQP

当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。


使用场景

主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

使用案例:
1)分发有关于特定地理位置的数据,例如销售点
2)由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
3)股票价格更新(以及其他类型的金融数据更新)
4)涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
5)云端的不同种类服务的协调
6)分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。


头交换机(headers exchanges)

有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。
头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。

头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。
路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。


Queue队列

AMQP中的队列(Queue)跟其他消息队列或任务队列中的队列是很相似的,它们存储着即将被应用消费掉的消息。

队列属性

队列跟交换机共享某些属性,但是队列也有一些另外的属性。
1)Name
2)Durable(消息代理重启后,队列依旧存在)
3)Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
4)Auto-delete(当最后一个消费者退订后即被删除)
5)Arguments(一些消息代理用他来完成类似与 TTL 的某些额外功能)


队列创建

队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。


队列名称

队列的名字可以由应用(application)来取,也可以让消息代理(broker)直接生成一个。队列的名字可以是最多255字节的一个utf-8字符串。若希望AMQP消息代理生成队列名,需要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,我们可以使用空字符串来表示之前生成的队列名称。之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称。
以"amq."开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。


队列持久化

持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。


Binding绑定

绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。
如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

打个比方:
队列(queue)是我们想要去的位于纽约的目的地
交换机(exchange)是JFK机场
绑定(binding)就是JFK机场到目的地的路线。能够到达目的地的路线可以是一条或者多条
拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。

如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。

最终解释:绑定是消息队列和交换器之间的关联,绑定是交换器和消息队列连接起来的路由规则。


Message消息机制

消息属性

AMQP 模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于 AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。

例如:
1)Content type(内容类型)
2)Content encoding(内容编码)
3)Routing key(路由键)
4)Delivery mode (persistent or not)
5)投递模式(持久化 或 非持久化)
6)Message priority(消息优先权)
7)Message publishing timestamp(消息发布的时间戳)
8)Expiration period(消息有效期)
9)Publisher application id(发布应用的 ID)

有些属性是被 AMQP 代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。他们跟 HTTP 协议的 X-Headers 很相似。消息属性需要在消息被发布的时候定义。


消息主体

AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。

消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。


消息持久化

消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。

简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。


消息消费

消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。

在AMQP 0-9-1 模型中,有两种途径可以达到此目的:
1)将消息投递给应用 ("push API")
2)应用根据需要主动获取消息 ("pull API")

使用push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。

每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。

消息确认

消费者应用(Consumer applications)用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP 代理在什么时候删除消息才是正确的?

AMQP 0-9-1 规范给我们两种建议:
1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)

如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。


拒绝消息

当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了(或者未能在此时完成)。

当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。

当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。


预取消息

在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生产应用每分钟才发送一条消息,这说明处理工作尚在运行。)

注意,RabbitMQ 只支持通道级的预取计数,而不是连接级的或者基于大小的预取。


其他

连接

AMQP连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制并且提供TLS(SSL)保护。当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。


通道

有些应用需要与AMQP代理建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1提供了通道(channels)来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。

在涉及多线程/进程的应用中,为每个线程/进程开启一个通道(channel)是很常见的,并且这些通道不能被线程/进程共享。

一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。


虚拟主机

为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟Web servers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。


参考资料

http://rabbitmq.mr-ping.com/
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

上一篇:[RabbitMQ]AMQP 0-9-1:模型


下一篇:Linux-CPU使用率查看