RabbitMQ入门

RabbitMQ入门

本篇博客的思维导图

RabbitMQ入门

本篇解决的问题:

  • RabbitMQ的模型架构是什么?
  • AMQP协议是什么?
  • 两者之间有什么紧密关系?
  • 消息从生产者发出到消费者这一过程经历了什么?

1. RabbitMQ的模型架构

RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

我们可以把消息传递的过程想象成:你把一个包裹送到邮局邮局会暂存并最终将邮件通过邮递员送到收件人手上,RabbitMQ就好比邮局、邮箱、邮递员组成的一个系统。

从计算机术语层面来说,RabbitMQ模型更像一种交换机模型

RabbitMQ入门

1.1 生产者和消费者

生产者 Producer

定义:创建/投递消息的一方

消息包含两个部分:

  • 消息体(payload)---带有业务逻辑结构的数据,例如一个JSON字符串
  • 标签(Label)----用来表述这条消息,例如交换机名称和路由键

消费者 Consumer

定义:接收消息的一方

消费者订阅队列,并且消费消息体(payload),消费者并不知道此条消息的生产者是谁,当然也不需要知道

服务节点 Broker

定义:消息中间件的服务节点

大多数情况下一台RabbitMQ服务器就是一个Broker

消息队列运转流程

RabbitMQ入门

1.2 队列 Queue

定义

RabbitMQ内部对象,用于【存储消息】

特点:

  • RabbitMQ中的消息都只能存储在队列中
  • 消费者从队列中获取消息并消费
  • 多个消费者可以订阅同一个队列
  • 当多个消费者订阅同一个队列的时候消息被平均分摊(轮询),而不是每个消费者都收到所有消息

声明一个队列

C#语言声明一个队列

        //声明队列
        channel.QueueDeclare(
            queue: QUEUE_NAME,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null);

1.3 交换机、路由键、绑定

交换机 Exchange

RabbitMQ入门

定义:RabbitMQ中一个简单的实体。

作用:

  • 生产者将消息发送到Exchange
  • Exchange将消息路由到一个或者多个队列中

类型:

1.fanout
会把发到该交换机上的所有消息都路由到与该交换机绑定的队列中
说明消息的RoutingKey在这种交换机中并没有什么作用

2.direct
将发送至该交换机的消息路由到RoutingKey和BindingKey完全匹配的队列中

3.topic (经常使用)
RoutingKey和BindingKey【模糊匹配】
特点:
单词
被"."隔开的独立字符串称为单词
com.rabbiq.client(三个单词)
分隔符 【 . 】
分割字符串
通配符【 # 】
匹配任意多个单词(可以是0个)
例如 com.hidden.client会被路由到 com.#
通配符【 * 】
匹配一个单词
例如 test.rabbitmq.client会被路由到 *.rabbitmq.client

4.headers
不依赖路由键匹配,而是依赖消息的headers
性能很差,不实用

声明一个交换机:

声明一个Direct类型的交换机

channel.ExchangeDeclare(
            exchange: EXCHANGE_NAME,
            type: ExchangeType.Direct,
            durable: true,
            autoDelete: false,
            arguments: null);

路由键 RoutingKey

作用:指定消息路由规则

消息发送给交换机的时候,一般会指定一个RoutingKey(相当于邮件上的地址),RoutingKey 需要与交换机类型BindingKey联合使用才能最终生效。

绑定 BindingKey

作用:RabbitMQ通过绑定(Bind)交换机队列关联起来,在绑定的时候一般指定一个BindingKey

BindingKey可以和RoutingKey相同。

1.4 生产者完整代码

下例使用C#作为代码演示,这个例子中我们申明了一个Direct类型的交换机,并且用于队列和交换机绑定的BindingKeyRoutingKey相同,这样做的潜台词是:这里的 RoutingKey BindingKey 是同个东西。在direct交换器类型下,RoutingKeyBindingKey 需要完全匹配才能使用,所以上面代码中采用了此种写法会显得方便许多。

        //定义交换机名称、队列名称、路由键
        const string EXCHANGE_NAME = "exchange_name";
        const string QUEUE_NAME = "QUEUE_NAME";
        const string ROUTING_KEY = "ROUTING_KEY";
        //声明一个Direct类型交换机
        channel.ExchangeDeclare(
            exchange: EXCHANGE_NAME,
            type: ExchangeType.Direct,
            durable: true,
            autoDelete: false,
            arguments: null);
        //声明队列
        channel.QueueDeclare(
            queue: QUEUE_NAME,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null);
        //通过交换机名称、队列名称和绑定键 绑定交换机和队列 (可以看到这里绑定键就是路由键)
        channel.QueueBind(queue: QUEUE_NAME, exchange: EXCHANGE_NAME, ROUTING_KEY);

        //发布消息至指定交换机,通过路由键路由消息至匹配的队列
        string message = "Hello World";
        channel.BasicPublish(
            exchange: EXCHANGE_NAME,
            routingKey: ROUTING_KEY,
            basicProperties: channel.CreateBasicProperties(),
            Encoding.UTF8.GetBytes(message));

1.5 Connection和Channel

RabbitMQ入门

Connection

  • 消费者和生产者通过Connection连接Broker
  • 是一条TCP连接
  • 多个Channel复用一个Connection

Channel

  • 客户端可以创建AMQP信道(Channel)
  • 信道被指派一个唯一的ID
  • 建立在Connection之上的虚拟连接
  • RabbitMQ通过Channel处理AMQP指令

引入信道的意义:

因为建立和关闭一个Connection开销很大,如果遇到使用高峰,性能瓶颈也随之显现,RabbitMQ采用类似NIO的做法,选择TCP复用,减少性能开销,也便于管理。

每个线程把持一个信道,如果当信道的流量本身就很大时,就需要开辟多个Connection,将信道均摊到这些Connection中。

代码演示

下面的代码演示了使用C#语言创建一个Connection和Channel

        //创建连接工厂,提供用户名等信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.UserName = "test";
        factory.Password = "123465";
        factory.VirtualHost = "virtual";
        factory.HostName = "127.0.0.1";
        factory.Port = 2020;
        //创建连接
        IConnection connection = factory.CreateConnection();
        //创建信道
        var channel = connection.CreateModel();

        //关闭Channel和Connection
        connection.Close();
        channel.Close();

1.6 RabbitMQ的运转流程

生产者发送消息过程

  1. 生产者连接到Broker,建立一个连接(Connection),开启一个信道(Channel)
  2. 生产者声明一个Exchange,设置交换机属性,例如交换机类型,是否持久化等
  3. 生产者声明一个Queue,设置队列属性,例如是否排他,是否持久化等
  4. 生产者通过路由键将交换机和队列进行绑定(Bind)
  5. 生产者发送消息至 Broker,消息的Label中包含路由键、交换机等信息
  6. 相应交换机根据消息的路由键来查找匹配对应的队列
  7. 如果找到,就将消息存入相应队列
  8. 没有找到,则根据生产者配置的属性选择丢弃或者回退消息给生产者
  9. 关闭信道
  10. 关闭连接

消费者接受消息过程

  1. 消费者连接到Broker,并且建立一个连接,开启一个信道
  2. 消费者向Broker请求消费队列中的消息,可能会设置相应的回调函数,并且做一些准备工作
  3. 等待Broker回应并投递订阅的队列消息,消费者接收消息
  4. 消费者处理消息后确认(ack)消息。
  5. RabbitMQ从队列重删除相应的已经被确认的消息
  6. 关闭信道
  7. 关闭连接

2.AMQP协议

RabbitMQ是遵从AMQP协议的,换句话说,RabbitMQ就是AMQP协议的Erlang语言的实现,(当然RabbitMQ也支持STOMP、MQTT等协议),AMQP的模型架构和RabbitMQ的模型架构是一样的。

2.1 AMQP的三层

Module Layer

​ 协议最高层
​ 定义了一些供客户调用的命令,实现业务逻辑

Session Layer

​ 中间层
​ 接受转发响应命令
​ 为客户端与服务器之间提供可靠性同步机制和错误处理

Transport Layer

​ 最底层
​ 传输二进制数据流,提供帧处理、信道复用、错误检测和数据表示

2.2 生产者AMQP详细流转过程

RabbitMQ入门

2.3 消费者AMQP详细流转过程

RabbitMQ入门

2.4 一些AMQP命令

RabbitMQ入门

RabbitMQ入门

RabbitMQ入门

上一篇:消息中间件介绍


下一篇:alpine的docker环境中php安装amqp扩展