关于消息的Time To Live(TTL)生存时间

TTL 其实就是一个消息存在有效时间,也可以说是最大存活时间,通常单位是毫秒

RabbitMQ的TTL的设置,RabbitMQ可以针对消息也可以针对队列来设置TTL:

  • 关于消息的设置:对于特定消息的过期时间的设置,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。

  • 关于队列的设置:RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除(没配死信队列的情况下)。

  • 混合双打的情况设置:如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。

  • 不设置TT的情况:,不设置表示消息不会过期;如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。

配置TTL的时间和方式:

使用策略为队列定义消息TTL 使用命令行进行配置设置:

rabbitmqctl rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

rabbitmqctl (Windows)    
rabbitmqctl set_policy TTL ".*" "{""message-ttl"":60000}" --apply-to queues

上面的设置将对对所有队列应用60秒的TTL配置!

还可以通过使用接口请求进行设置:

curl -i -u guest:guest -H "content-type:application/json"  -XPUT 
-d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
http://localhost:15672/api/queues/{vhost}/{queuename}
代码设置python代码: queue_declare 中设置 x-message-ttl 参数,可以控制被 publish 到 queue 中的 message 被丢弃前能够存活的时间,当某个 message 在 queue 留存的时间超过了配置的 TTL 值时,我们说该 message “已死”。
import pika
import sys

# 创建用户登入的凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("guest","guest")
# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 通过连接创建信道
channel = connection.channel()
# 通过信道创建我们的队列 其中名称是task_queue,并且这个队列的消息是需要持久化的!PS:持久化存储存到磁盘会占空间,
# 队列不能由持久化变为普通队列,反过来也是!否则会报错!所以队列类型创建的开始必须确定的!
channel.queue_declare(queue='task_queue', durable=True)
# 定义需要发的消息内容
# 开始发布消息到我们的代理服务器上,注意这里没有对发生消息进行确认发生成功!!!
import time
for i in range(1,100):
    time.sleep(1)
    properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒为单位表示 TTL 值,6 秒的 message
    properties.expiration='2000'
    body = '小钟同学你好!{}'.format(i).encode('utf-8')
    print(body)
    channel.basic_publish(
        # 默认使用的/的交换机
        exchange='',
        # 默认的匹配的key
        routing_key='task_queue',
        # 发送的消息的内容
        body=body,
        # 发现的消息的类型
        properties=properties# pika.BasicProperties中的delivery_mode=2指明message为持久的,1 的话 表示不是持久化 2:表示持久化
    )

# 关于消息持久化需要注意几个点,并非百分百的可达!主要原因有几个点:
# 1:消息可达率,和网络可通性和抖动之类有关!
# 2:还有和我们的RabbitMQ写入磁盘时候的有关
# 可持久的需要条件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,)
# 关闭链接
connection.close()

上面的代码是一个生产者端的代码:其中最关键的代码设置是:

   properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒为单位表示 TTL 值,6 秒的 message
    properties.expiration='2000'

设置每个消息的过期时间是2秒,观察我们的队列的信息,一直发的情况下,他的待消费的内容还是比较少,那是因为的已经过期了!被丢弃了!

关于消息的Time To Live(TTL)生存时间

单独对某消息的设置过期时间,和队列的持久化的特性不冲突!!!

但是对队列设置过期的时间话,那么和队列的持久化的特性就会产生冲突!!!

针对队列中所有消息设置TTL的方式:

出现的问题:

pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'task_queue' in vhost '/': received the value '2000' of type 'longstr' but current is none")

原因是:

关于消息的Time To Live(TTL)生存时间

我们即设置队列为需持久化,但是又设置了过期时间!所以产生的冲突!!!

修改后还是继续出现:

pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'task_queue' in vhost '/': received 'false' but current is 'true'")

这个是因为我们的一开始创建的队列,本来就有这个属性了!它是不能动态的修改这个队列的属性的!最好的的方式就是删除这个队列咯!如果还想继续用这个队列名称的话!!或重新的新建一个! 还有一点:生产者和消费者对queue的声明函数里,这个durable也记得需要保持一致!!!!

删除队列后出现新的问题:

pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - invalid arg \'x-message-ttl\' for queue \'task_queue\' in vhost \'/\': "expected integer, got longstr"')

原因是:

关于消息的Time To Live(TTL)生存时间

不能设置为字符设置为字符串的类型!!!

完整的针对队列设置TTL的示例代码:

# !/usr/bin/env python
import pika
import sys

# 创建用户登入的凭证,使用rabbitmq用户密码登录
credentials = pika.PlainCredentials("guest","guest")
# 创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials))
# 通过连接创建信道
channel = connection.channel()
# 通过信道创建我们的队列 其中名称是task_queue,并且这个队列的消息是需要持久化的!PS:持久化存储存到磁盘会占空间,
# 队列不能由持久化变为普通队列,反过来也是!否则会报错!所以队列类型创建的开始必须确定的!
arguments = {}
# TTL: ttl的单位是us,ttl=60000 表示 60s
arguments['x-message-ttl'] = 2000
#  auto_delete=False,  # 最后一个队列解绑则删除  durable
# durable 和 x-message-ttl 不能同时的存在
channel.queue_declare(queue='task_queue', durable=False,arguments=arguments,auto_delete=False)
# 定义需要发的消息内容
# 开始发布消息到我们的代理服务器上,注意这里没有对发生消息进行确认发生成功!!!
import time
for i in range(1,100):
    time.sleep(1)
    properties = pika.BasicProperties(delivery_mode=2,)
    # expiration 字段以微秒为单位表示 TTL 值,6 秒的 message
    # properties.expiration='2000'
    body = '小钟同学你好!{}'.format(i).encode('utf-8')
    print(body)
    channel.basic_publish(
        # 默认使用的/的交换机
        exchange='',
        # 默认的匹配的key
        routing_key='task_queue',
        # 发送的消息的内容
        body=body,
        # 发现的消息的类型
        properties=properties# pika.BasicProperties中的delivery_mode=2指明message为持久的,1 的话 表示不是持久化 2:表示持久化
    )

# 关于消息持久化需要注意几个点,并非百分百的可达!主要原因有几个点:
# 1:消息可达率,和网络可通性和抖动之类有关!
# 2:还有和我们的RabbitMQ写入磁盘时候的有关
# 可持久的需要条件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,)
# 关闭链接
connection.close()

如果综合存在的话,验证一下,比如我设置队列的过期时间是1秒,消息的时间2秒: 混合双打的情况设置:如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。这个是可以看得到!不贴代码了!

上一篇:Redis进阶——事务、TTL、排序、消息通知、管道


下一篇:区块链 PBFT最多多少个节点