死信消息和死信队列

死信消息和死信队列定义

Dead Letter Exchange 死信队列(DLX)队列的简称。

另外对于死信消息:通常如果我们的一个消息存在以下的情况下的话则这消息被称为死信消息:

  • 1: 消息被消费端拒绝,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false

  • 2: 消息在队列的存活时间超过设置的TTL时间。

  • 3:消息队列的消息数量已经超过最大队列长度,无法再继续新增消息到MQ中

  • 4:一个队列中的消息的TTL对其他队列中同一条消息的TTL没有影响

对于死信消息的处理,Rabbitmq会依据是否配置死信队列的配置来决定消息的去留! 如果开启了配置死信队列信息,则消息会被转移到这个 死信队列(DLX)中,如果没有配置,则此消息会被丢弃!

死信队列配置

  • 可以为每一个需要使用死信业务的队列配置一个死信交换机

  • 每个队列都可以配置专属自己的死信队列,相关消息的进入死信队列需要经过死信交换机来进程归纳处理

  • 死信交换机也只是一个普通的交换机,只是它是用来专门处理死信的交换机

  • 创建队列时可以给这个队列附带一个死信的交换机,在这个队列里因各自情况出现问题的作废的消息会被重新发到附带的交换机,然后让这个交换机重新路由这条消息。

具体的图示: 死信消息和死信队列

 

 若要使用策略指定DLX,请将键“死信交换”添加到策略定义中。例如:

rabbitmqctl    
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

Rabbitmqctl(Windows)    
rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues

上面的策略将DLX队列“my-dlx”应用于所有队列。上面只是一个例子,实际上不同的队列可能会使用不同的死字设置(或者根本不使用)。

其他配置死信队里的方式有:

x-dead-letter-exchange:出现死信(dead letter)之后将死信(dead letter)重新发送到指定exchange

x-dead-letter-routing-key:出现死信(dead letter)之后将死信(dead letter)重新按照指定的routing-key发送
PS:当指定了死信交换机后时,除了通常对声明队列的配置权限外,用户还需要对该队列具有读取权限,并对死信交换机具有写权限。权限在队列声明时进行验证。

完整的一个简单的示例:

下面的示例主要是演示里: 1:设置消息的过期的时间为2s,2s之后就变为我们的死信

2:变为死信的消息,会被转移到我们的另一个死信交换机的队列上

# !/usr/bin/env python
import pika
import sys

# 创建用户登入的凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("guest","guest")
# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 通过连接创建信道
channel = connection.channel()

# ========
#   创建异常交换器和队列,用于存放没有正常处理的消息。
channel.exchange_declare(exchange='xz-dead-letter-exchange',exchange_type='fanout',durable=True)
channel.queue_declare(queue='xz-dead-letter-queue',durable=True)
# 绑定队列到指定的交换机
channel.queue_bind(queue='xz-dead-letter-queue',exchange= 'xz-dead-letter-exchange',routing_key= 'xz-dead-letter-queue')

# =========

# 通过信道创建我们的队列 其中名称是task_queue,并且这个队列的消息是需要持久化的!PS:持久化存储存到磁盘会占空间,
# 队列不能由持久化变为普通队列,反过来也是!否则会报错!所以队列类型创建的开始必须确定的!
arguments = {}
# TTL: ttl的单位是us,ttl=60000 表示 60s
# arguments['x-message-ttl'] = 2000
# 指定死信转移到另一个交换机上具体的交换机的名称
arguments['x-dead-letter-exchange'] = 'xz-dead-letter-exchange'
#  auto_delete=False,  # 最后一个队列解绑则删除  durable
# durable 和 x-message-ttl 不能同时的存在
channel.queue_declare(queue='task_queue', durable=True,arguments=arguments,auto_delete=False)
# 定义需要发的消息内容
# 开始发布消息到我们的代理服务器上,注意这里没有对发生消息进行确认发生成功!!!
import time
for i in range(1,100):
    time.sleep(1)
    properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒为单位表示 TTL 值,6 秒的 message
    properties.expiration='2000'
    body = '小钟同学你好!{}'.format(i).encode('utf-8')
    print(body.decode('utf-8'))
    channel.basic_publish(
        # 默认使用的/的交换机
        exchange='',
        # 默认的匹配的key
        routing_key='task_queue',
        # 发送的消息的内容
        body=body,
        # 发现的消息的类型
        properties=properties# pika.BasicProperties中的delivery_mode=2指明message为持久的,1 的话 表示不是持久化 2:表示持久化
    )

connection.close()

运行上面的生产者的代码后观察我们的输出: 中国发出了8个消息

小钟同学你好!1
小钟同学你好!2
小钟同学你好!3
小钟同学你好!4
小钟同学你好!5
小钟同学你好!6
小钟同学你好!7
小钟同学你好!8

结果这个8个消息都没有人去消费的时候:最后都转移到了死信的队列里面:

死信消息和死信队列

 

 死信消息和死信队列

 

 关于死信队列需要注意的点(来自官网的说明):

消息在发布到死信队列后DLX目标队列后会立即从原始队列中删除。这确保没有可能出现过多的消息积累,从而耗尽代理资源,但这确实意味着,如果目标队列无法接受消息,消息可能会丢失。

死信队列里面的死信的消费

当我们的死信消费者去消费死信消息时候,需要注意点有:

我们的“死信”消息消息的properties里面的header字段信息中增加一个叫做“x-death"的数组内容,包含了以下字段内容:

死信消息和死信队列

<BasicProperties(['delivery_mode=2', "headers={'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}"])>

其中我们的'x-death'内容为::

{'x-death': [{'count': 1L, 'reason': 'expired', 'queue': 'task_queue', 'time': datetime.datetime(2021, 6, 22, 8, 40, 1), 'exchange': '', 'routing-keys': ['task_queue'], 'original-expiration': '2000'}], 'x-first-death-exchange': '', 'x-first-death-queue': 'task_queue', 'x-first-death-reason': 'expired'}

具体每个字段的意思是:

  • queue :进入死信队列之前来自于哪个的消息队列名称
  • reason:这个消息变为死信的原因?expired 表示是因为过期!变为死信!
  • count:这个消息在这个队列中被死了多少次
  • time:该消息发布时间
  • exchange :消息已发布到哪些交换机上,PS:如果这个消息是多次变为死信的话,这个地方最后就是死信的交换机
  • routing-keys 消息发不来来源的路由keys
  • original-expiration:原消息的过期时间属性,PS:(如果消息是死信的话)每条消息ttl):。这个过期属性将从死信中删除,以防止它在被路由到的任何队列中再次过期。
  • x-first-death-exchange:第一次变成死死信的时候来源的交换机
  • x-first-death-queue:第一次变成死信的时候来源队列
  • x-first-death-reason:第一次变成死信的原因:expired 表示是因为过期!
其他变为死信的原因的说明:
rejected: 消息被消费者拒收且回放到消息独立
expired: 消息的设置来TTL时间到期
maxlen: 超过了队列运行的最大的值

延迟队列

RabbitMQ本身没有直接支持延迟队列功能,但是通过对死信队列和过期时间的使用,其实我们可以综合起上面的两个特性来实现一个所谓的延迟队列,延迟队列的意思就是:

某个消息再某个固定的时间后失效后,则进入到死信队列里面,其他死信的消费者实时的处理这些过期的消息,这个就可以起到一个延迟处理的效果!

延迟队列加上惰性队列这种组合吧!其实也是可以考虑的!,即可以减小内存占用,又可以实现消息的延迟处理

 

 

上一篇:10个提高python水平的高级知识点


下一篇:进程间通信实现