TTL 其实就是一个消息存在有效时间,也可以说是最大存活时间,通常单位是毫秒
RabbitMQ的TTL的设置,RabbitMQ可以针对消息也可以针对队列来设置TTL:
-
关于消息的设置:对于特定消息的过期时间的设置,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。
-
关于队列的设置:RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除(没配死信队列的情况下)。
-
混合双打的情况设置:如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。
-
不设置TT的情况:,不设置表示消息不会过期;如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
配置TTL的时间和方式:
使用策略为队列定义消息TTL 使用命令行进行配置设置:
rabbitmqctl rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues rabbitmqctl (Windows) rabbitmqctl set_policy TTL ".*" "{""message-ttl"":60000}" --apply-to queues
上面的设置将对对所有队列应用60秒的TTL配置!
还可以通过使用接口请求进行设置:
curl -i -u guest:guest -H "content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' http://localhost:15672/api/queues/{vhost}/{queuename}代码设置python代码: queue_declare 中设置 x-message-ttl 参数,可以控制被 publish 到 queue 中的 message 被丢弃前能够存活的时间,当某个 message 在 queue 留存的时间超过了配置的 TTL 值时,我们说该 message “已死”。
import pika import sys # 创建用户登入的凭证,使用rabbitmq用户密码登录 credentials = pika.PlainCredentials("guest","guest") # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials)) # 通过连接创建信道 channel = connection.channel() # 通过信道创建我们的队列 其中名称是task_queue,并且这个队列的消息是需要持久化的!PS:持久化存储存到磁盘会占空间, # 队列不能由持久化变为普通队列,反过来也是!否则会报错!所以队列类型创建的开始必须确定的! channel.queue_declare(queue='task_queue', durable=True) # 定义需要发的消息内容 # 开始发布消息到我们的代理服务器上,注意这里没有对发生消息进行确认发生成功!!! import time for i in range(1,100): time.sleep(1) properties = pika.BasicProperties(delivery_mode=2,) # expiration 字段以微秒为单位表示 TTL 值,6 秒的 message properties.expiration='2000' body = '小钟同学你好!{}'.format(i).encode('utf-8') print(body) channel.basic_publish( # 默认使用的/的交换机 exchange='', # 默认的匹配的key routing_key='task_queue', # 发送的消息的内容 body=body, # 发现的消息的类型 properties=properties# pika.BasicProperties中的delivery_mode=2指明message为持久的,1 的话 表示不是持久化 2:表示持久化 ) # 关于消息持久化需要注意几个点,并非百分百的可达!主要原因有几个点: # 1:消息可达率,和网络可通性和抖动之类有关! # 2:还有和我们的RabbitMQ写入磁盘时候的有关 # 可持久的需要条件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,) # 关闭链接 connection.close()
上面的代码是一个生产者端的代码:其中最关键的代码设置是:
properties = pika.BasicProperties(delivery_mode=2,) # expiration 字段以微秒为单位表示 TTL 值,6 秒的 message properties.expiration='2000'
设置每个消息的过期时间是2秒,观察我们的队列的信息,一直发的情况下,他的待消费的内容还是比较少,那是因为的已经过期了!被丢弃了!
单独对某消息的设置过期时间,和队列的持久化的特性不冲突!!! 但是对队列设置过期的时间话,那么和队列的持久化的特性就会产生冲突!!!
针对队列中所有消息设置TTL的方式:
出现的问题:
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'task_queue' in vhost '/': received the value '2000' of type 'longstr' but current is none")
原因是:
我们即设置队列为需持久化,但是又设置了过期时间!所以产生的冲突!!!
修改后还是继续出现:
pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'task_queue' in vhost '/': received 'false' but current is 'true'")
这个是因为我们的一开始创建的队列,本来就有这个属性了!它是不能动态的修改这个队列的属性的!最好的的方式就是删除这个队列咯!如果还想继续用这个队列名称的话!!或重新的新建一个! 还有一点:生产者和消费者对queue的声明函数里,这个durable也记得需要保持一致!!!!
删除队列后出现新的问题:
pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - invalid arg \'x-message-ttl\' for queue \'task_queue\' in vhost \'/\': "expected integer, got longstr"')
原因是:
不能设置为字符设置为字符串的类型!!!
完整的针对队列设置TTL的示例代码:
# !/usr/bin/env python import pika import sys # 创建用户登入的凭证,使用rabbitmq用户密码登录 credentials = pika.PlainCredentials("guest","guest") # 创建连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',credentials=credentials)) # 通过连接创建信道 channel = connection.channel() # 通过信道创建我们的队列 其中名称是task_queue,并且这个队列的消息是需要持久化的!PS:持久化存储存到磁盘会占空间, # 队列不能由持久化变为普通队列,反过来也是!否则会报错!所以队列类型创建的开始必须确定的! arguments = {} # TTL: ttl的单位是us,ttl=60000 表示 60s arguments['x-message-ttl'] = 2000 # auto_delete=False, # 最后一个队列解绑则删除 durable # durable 和 x-message-ttl 不能同时的存在 channel.queue_declare(queue='task_queue', durable=False,arguments=arguments,auto_delete=False) # 定义需要发的消息内容 # 开始发布消息到我们的代理服务器上,注意这里没有对发生消息进行确认发生成功!!! import time for i in range(1,100): time.sleep(1) properties = pika.BasicProperties(delivery_mode=2,) # expiration 字段以微秒为单位表示 TTL 值,6 秒的 message # properties.expiration='2000' body = '小钟同学你好!{}'.format(i).encode('utf-8') print(body) channel.basic_publish( # 默认使用的/的交换机 exchange='', # 默认的匹配的key routing_key='task_queue', # 发送的消息的内容 body=body, # 发现的消息的类型 properties=properties# pika.BasicProperties中的delivery_mode=2指明message为持久的,1 的话 表示不是持久化 2:表示持久化 ) # 关于消息持久化需要注意几个点,并非百分百的可达!主要原因有几个点: # 1:消息可达率,和网络可通性和抖动之类有关! # 2:还有和我们的RabbitMQ写入磁盘时候的有关 # 可持久的需要条件是:durable=True+ properties=pika.BasicProperties(delivery_mode=2,) # 关闭链接 connection.close()
如果综合存在的话,验证一下,比如我设置队列的过期时间是1秒,消息的时间2秒: 混合双打的情况设置:如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。这个是可以看得到!不贴代码了!