RabbitMQ入门
本篇博客的思维导图
本篇解决的问题:
- RabbitMQ的模型架构是什么?
- AMQP协议是什么?
- 两者之间有什么紧密关系?
- 消息从生产者发出到消费者这一过程经历了什么?
1. RabbitMQ的模型架构
RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
我们可以把消息传递的过程想象成:你把一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人手上,RabbitMQ就好比邮局、邮箱、邮递员组成的一个系统。
从计算机术语层面来说,RabbitMQ模型更像一种交换机模型。
1.1 生产者和消费者
生产者 Producer
定义:创建/投递消息的一方
消息包含两个部分:
- 消息体(
payload
)---带有业务逻辑结构的数据,例如一个JSON字符串 - 标签(
Label
)----用来表述这条消息,例如交换机名称和路由键
消费者 Consumer
定义:接收消息的一方
消费者订阅队列,并且消费消息体(payload
),消费者并不知道此条消息的生产者是谁,当然也不需要知道
服务节点 Broker
定义:消息中间件的服务节点
大多数情况下一台RabbitMQ服务器就是一个Broker
消息队列运转流程
1.2 队列 Queue
定义
RabbitMQ内部对象,用于【存储消息】
特点:
- RabbitMQ中的消息都只能存储在队列中
- 消费者从队列中获取消息并消费
- 多个消费者可以订阅同一个队列
- 当多个消费者订阅同一个队列的时候消息被
平均分摊(轮询)
,而不是每个消费者都收到所有消息
声明一个队列
C#语言声明一个队列
//声明队列
channel.QueueDeclare(
queue: QUEUE_NAME,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
1.3 交换机、路由键、绑定
交换机 Exchange
定义:RabbitMQ中一个简单的实体。
作用:
- 生产者将消息发送到Exchange
- Exchange将消息路由到一个或者多个队列中
类型:
1.fanout
会把发到该交换机上的所有消息都路由到与该交换机绑定的队列中
说明消息的RoutingKey在这种交换机中并没有什么作用
2.direct
将发送至该交换机的消息路由到RoutingKey和BindingKey完全匹配的队列中
3.topic (经常使用)
RoutingKey和BindingKey【模糊匹配】
特点:
单词
被"."隔开的独立字符串称为单词
com.rabbiq.client(三个单词)
分隔符 【 . 】
分割字符串
通配符【 # 】
匹配任意多个单词(可以是0个)
例如 com.hidden.client会被路由到 com.#
通配符【 * 】
匹配一个单词
例如 test.rabbitmq.client
会被路由到 *.rabbitmq.client
上
4.headers
不依赖路由键匹配,而是依赖消息的headers
性能很差,不实用
声明一个交换机:
声明一个Direct
类型的交换机
channel.ExchangeDeclare(
exchange: EXCHANGE_NAME,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
路由键 RoutingKey
作用:指定消息路由规则
消息发送给交换机的时候,一般会指定一个RoutingKey
(相当于邮件上的地址),RoutingKey
需要与交换机类型
和BindingKey
联合使用才能最终生效。
绑定 BindingKey
作用:RabbitMQ通过绑定(Bind)
将交换机和队列关联起来,在绑定的时候一般指定一个BindingKey
BindingKey
可以和RoutingKey
相同。
1.4 生产者完整代码
下例使用C#作为代码演示,这个例子中我们申明了一个Direct
类型的交换机,并且用于队列和交换机绑定的BindingKey
与RoutingKey
相同,这样做的潜台词是:这里的 RoutingKey
和 BindingKey
是同个东西。在direct
交换器类型下,RoutingKey
和 BindingKey
需要完全匹配才能使用,所以上面代码中采用了此种写法会显得方便许多。
//定义交换机名称、队列名称、路由键
const string EXCHANGE_NAME = "exchange_name";
const string QUEUE_NAME = "QUEUE_NAME";
const string ROUTING_KEY = "ROUTING_KEY";
//声明一个Direct类型交换机
channel.ExchangeDeclare(
exchange: EXCHANGE_NAME,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
//声明队列
channel.QueueDeclare(
queue: QUEUE_NAME,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//通过交换机名称、队列名称和绑定键 绑定交换机和队列 (可以看到这里绑定键就是路由键)
channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, ROUTING_KEY);
//发布消息至指定交换机,通过路由键路由消息至匹配的队列
string message = "Hello World";
channel.BasicPublish(
exchange: EXCHANGE_NAME,
routingKey: ROUTING_KEY,
basicProperties: channel.CreateBasicProperties(),
Encoding.UTF8.GetBytes(message));
1.5 Connection和Channel
Connection
- 消费者和生产者通过Connection连接Broker
- 是一条TCP连接
- 多个Channel复用一个Connection
Channel
- 客户端可以创建AMQP信道(Channel)
- 信道被指派一个唯一的ID
- 建立在Connection之上的虚拟连接
- RabbitMQ通过Channel处理AMQP指令
引入信道的意义:
因为建立和关闭一个Connection
开销很大,如果遇到使用高峰,性能瓶颈也随之显现,RabbitMQ采用类似NIO
的做法,选择TCP复用,减少性能开销,也便于管理。
每个线程把持一个信道,如果当信道的流量本身就很大时,就需要开辟多个Connection
,将信道均摊到这些Connection
中。
代码演示
下面的代码演示了使用C#语言创建一个Connection和Channel
//创建连接工厂,提供用户名等信息
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "test";
factory.Password = "123465";
factory.VirtualHost = "virtual";
factory.HostName = "127.0.0.1";
factory.Port = 2020;
//创建连接
IConnection connection = factory.CreateConnection();
//创建信道
var channel = connection.CreateModel();
//关闭Channel和Connection
connection.Close();
channel.Close();
1.6 RabbitMQ的运转流程
生产者发送消息过程
- 生产者连接到
Broker
,建立一个连接(Connection)
,开启一个信道(Channel)
- 生产者声明一个
Exchange
,设置交换机属性,例如交换机类型,是否持久化等 - 生产者声明一个
Queue
,设置队列属性,例如是否排他,是否持久化等 - 生产者通过路由键将交换机和队列进行
绑定(Bind)
- 生产者发送消息至
Broker
,消息的Label中包含路由键、交换机等信息 - 相应交换机根据消息的路由键来查找匹配对应的队列
- 如果找到,就将消息存入相应队列
- 没有找到,则根据生产者配置的属性选择丢弃或者回退消息给生产者
- 关闭信道
- 关闭连接
消费者接受消息过程
- 消费者连接到
Broker
,并且建立一个连接,开启一个信道 - 消费者向
Broker
请求消费队列中的消息,可能会设置相应的回调函数,并且做一些准备工作 - 等待
Broker
回应并投递订阅的队列消息,消费者接收消息 - 消费者处理消息后确认
(ack)
消息。 - RabbitMQ从队列重删除相应的已经被确认的消息
- 关闭信道
- 关闭连接
2.AMQP协议
RabbitMQ是遵从AMQP
协议的,换句话说,RabbitMQ就是AMQP
协议的Erlang
语言的实现,(当然RabbitMQ也支持STOMP、MQTT等协议),AMQP
的模型架构和RabbitMQ的模型架构是一样的。
2.1 AMQP的三层
Module Layer
协议最高层
定义了一些供客户调用的命令,实现业务逻辑
Session Layer
中间层
接受转发响应命令
为客户端与服务器之间提供可靠性同步机制和错误处理
Transport Layer
最底层
传输二进制数据流,提供帧处理、信道复用、错误检测和数据表示