RabbitMQ——3、使用概念

1.1 AMQP

1.1.1 AMQP是什么?

RabbitMQ就是 AMQP 协议的 Erlang 的实现(当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。

1.1.2 AMQP协议3层?

i. Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。

ii. Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。

iii. TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

1.1.3 AMQP模型的几大组件?

交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。

队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。

绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

1.1.4 Spring AMQP

在本项目中,使用的是spring AMQP,使用方法是:

1、 导入依赖

2、 配置RabbitMQ的地址

3、 生产者:可直接通过@Autowired注入AmqpTemplate,直接调用方法,用来发送消息,当然,同时要指定交换机的名字,和routing key

4、 创建消费者:直接在封装处理消息的逻辑上,使用注解指定绑定的消息队列和交换机的名字,指定是否可持久化,交换机的类型,routing key

可以在这个类中创建多个方法,添加注解后,表示多个消费者

1.2 队列

1.2.1 队列结构?

通常由以下两部分组成?

\1) rabbit_amqqueue_process:负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack) 等。

\2) backing_queue:是消息存储的具体形式和引擎,并向 rabbit amqqueue process提供相关的接口以供调用。

1.2.2 死信队列?

DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

1.2.2.1 导致的死信的几种原因?

\1) 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。

\2) 消息TTL过期。

\3) 队列满了,无法再添加。

1.2.3 优先级队列?

优先级高的队列会先被消费。

可以通过x-max-priority参数来实现。

当消费速度大于生产速度,且Broker没有堆积的情况下,优先级显得没有意义。

1.2.4 延迟队列?

存储对应的延迟消息,指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

1.2.4.1 实现延时队列的两种方式

Rabbitmq本身是没有延迟队列的,要实现延迟消息,一般有两种方式:

1、 通过Rabbitmq本身队列的特性来实现,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)

若想不借助插件实现rabbitMQ的延迟消息,实际就是利用一个没有消费者的Queue1,等待消息过期后,通过交换机转发到Queue2来进行消费,消息的延迟时间就是消息在Queue1中的存活时间

RabbitMQ——3、使用概念

这里相当于利用了死信队列,即当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列

2、 在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上

1.2.4.2 完成半小时再支付的设计

利用死信队列

导致的死信的几种原因?

\1) 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。

\2) 消息TTL过期。

\3) 队列满了,无法再添加。

理解:

代码编写流程是:

有一个(n个)正常业务的Exchange,比如为user-exchange。

有一个(n个)正常业务的Queue,比如为user-queue。(因为该队列需要绑定死信交换机,所以需要加俩参数:死信交换机:x-dead-letter-exchange,死信消息路由键:x-dead-letter-routing-key)

进行正常业务的交换机和队列绑定。

定义一个死信交换机,比如为common-dead-letter-exchange。

将正常业务的队列绑定到死信交换机(队列设置了x-dead-letter-exchange即会自动绑定)。

定义死信队列user-dead-letter-queue用于接收死信消息,绑定死信交换机。

消息会变成死信消息的场景:

\1. 消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝签收,并且重新入队为false。

有一种场景需要注意下:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack/reject。

\2. 消息过期,过了TTL存活时间。

\3. 队列设置了x-max-length最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。

上一篇:ExtJS智能提示工具spket安装与破解


下一篇:第二章 大话MQ消息中间件+JMS+AMQP核心知识