RabbitMQ 是遵从AMQP 协议的, 换句话说, RabbitMQ 就是AMQP
协议的Erlang 的实现(当然RabbitMQ 还支持STOMP2 、MQTT3 等协议) 0 AMQP 的模型架构
和RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产
者发送消息时所携带的RoutingKey 与绑定时的BindingKey 相匹配时,消息即被存入相应的队
列之中。消费者可以订阅相应的队列来获取消息。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的AMQP 协议中相
应的概念。目前RabbitMQ 最新版本默认支持的是AMQP 0-9-1 。本书中如无特殊说明,都以
AQMP 0-9-1 为基准进行介绍。
STOMP. 即Simple (or Stre阻úng) Text Oriented Messaging Protocol. 简单(流〕文本面向消息协议,它提供了一个可互操作的
连接格式,运行STOMP 客户端与任意STOMP 消息代理C Broker ) 进行交互。STOMP 协议由于设计简单, 易于开发客户端,
因此在多种语言和平台上得到广泛的应用.
MQTT. 即Message Queuing Telemetry Transport. 消息队列遥澳~传输,是ffiM 开发的一个即时通信协议,有可能成为物联网
的重要组成部分.该协议支持所有平台,几乎可以把所有物联网和外部连接起来,被用来当作传感器和制动器的通信协议。
AMQP 协议本身包括三层:
- Module Layer: 位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利
用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue . Declare 命令声明
一个队列或者使用Basic.Consume 订阅消费一个队列中的消息。
Session Layer: 位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应
答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。Transport Layer: 位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错
误检测和数据表示等。
AMQP 说到底还是一个通信协议,通信协议都会涉及报文交互,从low-level 举例来说,
AMQP 本身是应用层的协议,其填充于TCP 协议层的数据部分。而从high-level 来说, AMQP
是通过协议命令进行交互的。AMQP 协议可以看作一系列结构化命令的集合,这里的命令代表
一种操作,类似于HTTP 中的方法(GET 、POST 、PUT 、DELETE 等) 。
AMQP生产者流转过程
为了形象地说明AMQP 协议命令的流转过程,这里截取代码清单1-1 中的关键代码,如代
码清单2-2 所示。
代码清单2-2 简洁版生产者代码
Connection connection = factory .n ewConnection() ; //创建连接
Channel channel = connection.createChannel() ; //创建信道
String message = "Hello World! ";
channel.basicPublish(EXCHANGE NAME , ROUTING KEY ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//关闭资源
channel.close() ;
connection . close();
当客户端与Broker 建立连接的时候,会调用factory .newC onnection 方法,这个方法
会进一步封装成Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用
的是AMQPO-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程
中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok , Connection.
Open/ .Open-Ok 这6 个命令的交互。
当客户端调用connection .createChannel 方法准备开启信道的时候,其包装
Channel . Open 命令发送给Broker ,等待Channel.Open-Ok 命令。
当客户端发送消息的时候,需要调用channel . basicPublish 方法,对应的AQMP 命
令为Basic.Publish ,注意这个命令和前面涉及的命令略有不同,这个命令还包含了Content
Header 和Content Body 0 Content Header 里面包含的是消息体的属性,例如,投递模式、优先级等,而Content Body 包含消息体本身。
当客户端发送完消息需要关闭资源时,涉及Channel.Close/.Close-Ok 与
Connection.Close/.Close-Ok 的命令交互。详细流转过程如图2-10 所示。
消费者的流转过程, 参考代码清单1-2 , 截取消费端的关键代码如代码清
单2-3 所示。
代码清单2-3 简洁版消费者代码
Connection connection = factory.newConnection(addresses);// 创建连接
final Channel channel = connection . createChannel() ; //创建信道
Consumer consumer = new DefaultConsumer(channel) ()//_省略实现
channel . basicQos(64) ;
channel.basicConsume(QUEUE NAME , consumer) ;
//等待回调函数执行完毕之后,关闭资源
TimeUnit . SECONDS . sleep(5) ;
channel . close();
connection.close();
其详细流转过程如图2-11 所示。
消费者客户端同样需要与Broker 建立连接,与生产者客户端一样,协议交互同样涉及
Connection.Start/ . Start-Ok 、Connection.Tune/.Tune-Ok 和Connection.
Open/ . Open-Ok 等,图2-11 中省略了这些步骤,可以参考图2-10 。
紧接着也少不了在Connection 之上建立Channe l,和生产者客户端一样,协议涉及
Channel . Open/Open-Oko
如果在消费之前调用了channel . basicQos(int prefetchCount) 的方法来设置消费
者客户端最大能"保持"的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-Ok 这
两个AMQP 命令。
在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用
channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执
Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。紧接着Broker 向消费者客户端推
送(Push) 消息,即Basic.Deliver 命令,有意思的是这个和Basic.Publish 命令一样会
携带Content Header 和Content Body 0
消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。
在消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及
Channel . Close/ . Close-Ok 手口Connection.Close/ . Close-Ok 。
AMQP 命令概览
AMQP 0-9-1 协议中的命令远远不止上面所涉及的这些,为了让读者在遇到其他命令的时候
能够迅速查阅相关信息,下面列举了AMQP 0-9-1 协议主要的命令,包含名称、是否包含内容
体(Content Body) 、对应客户端中相应的方法及简要描述等四个维度进行说明,具体如表2-1
所示。