写在前面:Redis的消息队列并不是专业的消息队列,没有ACK保证,没有特别多的高级特性,如果对消息的可靠性有很高的要求,就放弃它吧。
1.Redis消息队列
Redis通过内部的list数据结构来实现异步消息队列。通过`rpush`和`lpop`操作结合构成类似队列(先进先出)的效果;也可以通过`rpush`与`rpop`构成堆栈(后进先出)的效果。但一般消息队列都采用队列效果的组合形式。
>rpush hobby basketball football tennis
(integer) 3
>lpop hobby
"basketball"
>lpop hobby
"football"
>lpop tennis
"tennis"
>lpop hobby
(nil)
客户端通过队列的pop操作获取消息,然后进行处理,处理完了再接着获取消息,然后再处理,一直循环,这就是队列消费者客户端的生命周期。
如上所述,假如队列空了,pop操作拿不到数据,客户端因此陷入一直pop的死循环产生一堆无用的空轮询,同时消耗着客户端和Redis服务端的资源,怎么办?
首先能想到的方法可能会是让队列为空的请求,通过sleep
命令让线程停下来。更好的做法是通过blpop
和brpop
代替之前的lpop
和rpop
,即用阻塞读(在队列没有数据时,立即进入休眠状态;当数据到来,立刻重新请求)的方式,解决因为sleep命令带来的延迟。
但这样就会有新的问题产生:当列表长时间没有数据时,客户端线程会一直被阻塞,形成闲置连接,而这种情况下,服务器为了节省资源通常会断开这条链接。客户端的blpop
/brpop
就会报错。
所以编写客户端消费者时要捕获异常,进行重试。
2.延时队列
延时队列的出现是为了解决在分布式开发中,客户端在处理请求时加锁失败的问题。这种方式是将当前冲突的请求扔进另一个队列延后处理以避开锁的冲突。
在Redis中延时队列的实现是由zset
(有序列表)来实现,基本使用方式为zset key score value
。在延时队列中常常把消息序列化成一个字符串作为 zset
的value
,消息的过期时间作为score
,然后用多个线程(保证可用性)来轮询zset
来获取到期的任务进行处理。考虑到并发抢任务,任务就有可能被多次执行。
def delay(msg):
msg.id = str(uuid4()) #序列化保证value的唯一性
value = json.dumps(msg)
retry_ts = time.time() + 5 # 5s后重试
redis.zadd('delay-queue',retry_ts,value)
def loop():
while True:
# 最多取一条
value_list = redis.zrangebyscore("delay-queue",0,time.time(),start=0,num=1):
if not value_list:
time.sleep(1)
continue
value = value_list[0]
# 将取到的消息从redis消息队列中删除
success = redis.zrem("delay-queue",value)
# 多进程并发下,只有一个线程可以抢到消息
if success:
msg = json.loads(value)
handle_msg(msg)
Redis中的`zrem`方法是保证多线程争抢任务不发生混乱的关键。
需要注意的是在执行*handle_msg()* 时一定要进行异常捕获,避免因单个任务出现问题导致循环异常退出。