延时通知的方法
- 基于内存的延迟通知
你当然使用thread和timer能简单实现这一功能,但是他的缺点是明显的,你的延时是写到内存里面的,也就是说一旦重启你的定时就全都没了,如果不能重启那么你怎么发版呢?而且当有大量的延迟的话对服务器的压力也是很大的,很难的啦。 -
基于redis的延迟通知
redis可以开启键的过期通知,那里利用这一特性我们就可以在redis设置一个键并写入过期时间,过期时间的大小可以根据通知时间和现在时间算出来,当键过期的时候你会在redis的频道里面收到这个键过期的消息,然后我们就用这个键去进行后面的通知。 - 基于MySQL的延迟通知
我们可以把要通知的时间写到redis里面,并写入通知时间,然后通过一个每秒启动一次的定时任务来读取这个表里面到期的还没被处理的通知,拿到信息后去进行后面的通知 - 基于RabbitMq的延迟通知
在消息投递到Mq的时候设置过期时间,当消息过期以后投递到私信队列,只要一直消费私信队列就好了;或者使用rabbitmq_delayed_message_exchange插件。
今天就来讲一下基于redis的延迟通知
基于Redis的延迟通知实现
由于键空间通知比较耗CPU, 所以 Redis默认是关闭键空间事件通知的, 需要手动修改redis.conf 的notify-keyspace-events开启redis key的过期提醒
K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布;
E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布;
g:一般性的,非特定类型的命令,比如del,expire,rename等;
$:String 特定命令;
l:List 特定命令;
s:Set 特定命令;
h:Hash 特定命令;
z:Sorted 特定命令;
x:过期事件,当某个键过期并删除时会产生该事件;
e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件;
A:g$lshzxe的别名,因此”AKE”意味着所有事件。
notify-keyspace-events Ex 表示开启键过期事件提醒
import os
import time
import redis
import random
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
def redis_init():
redis_string = os.environ.get('REDIS_URL')
redis_conn = redis.Redis.from_url(redis_string, decode_responses=True, health_check_interval=30, retry_on_timeout=True, socket_keepalive=True)
redis_conn.ping()
return redis_conn
def set_delayed_notification(redis_conn):
for i in range(0, 10):
delayed = random.randint(2, 10)
redis_conn.set(f'n_{i}', '1', delayed)
logger.info(f'n_{i} set delayed notice time {delayed}')
def hook_notice(redis_conn):
sub = redis_conn.pubsub() #获得一个订阅者
sub.psubscribe(["__keyevent@*__:expired"]) #订阅__keyevent@*__:expired频道
for item in sub.listen(): #监听这个频道,阻塞式
key = str(item['data']) #取出通知内容
logger.info(f'notice {key}')
if __name__ == "__main__":
reids_conn = redis_init()
executor = ThreadPoolExecutor(max_workers=2)
executor.submit(hook_notice, reids_conn)
executor.submit(set_delayed_notification, reids_conn)
time.sleep(10)
关于这种方式在实际应用中的缺点主要有,强依赖于redis的稳定,如果redis出现波动可能会造成某些键过期之后不通知,并且这种方式需要对通知的时间精准度要有一定的容忍度,可能会提前或滞后几百毫秒左右。
Redis的过期
redis的过期基于redisDb数据结构
typedef struct redisDb
{
dict *dict; /* 键空间 key space */
dict *expires; /* 过期字典 */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
键空间(key space):dict字典用来保存数据库中的所有键值对
过期字典(expires):保存数据库中所有键的过期时间,过期时间用UNIX时间戳表示,且值为long long整数。
- redis在判断是否过期的时候会查看expires,如果里面没有则检查下一个db的,如果有则去除dict里面key对应的过期时间,如果大于当前时间戳则过期如果不大于则不过期。
- 过期删除策略
定时删除:使用定时任务,当定时时间到则删除这个key,对内存友好,内存中不存在超时的key,但是对cpu不友好。
惰性删除:在每次获取键值得时候先检查时候过期,如果过期则删除这个键,如果没有则返回,对cpu友好但是对内存不友好,可能内存中有很多过期但是未被删除的key。
定期删除:每隔一点时间,程序就对数据库进行一次检查,删除里面的过期键,至于要删除多少过期键,以及要检查多少个数据库,则由算法决定。是上面两种方法的折中,可以根据自己情况设置定期删除的时间间隔。 - 在get key的时候检测是否过期需要删除
int expireIfNeeded(redisDb *db, robj *key) {
/* 取出键的过期时间 */
mstime_t when = getExpire(db,key);
mstime_t now; /* 没有过期时间返回0*/
if (when < 0)
return 0; /* No expire for this key */
/* 服务器loading时*/
if (server.loading)
return 0;
/* 根据一定规则获取当前时间*/
now = server.lua_caller ? server.lua_time_start : mstime();
/* 如果当前的是从(Slave)服务器 0认为key为无效 */
if (server.masterhost != NULL)
return now > when;
/* key未过期,返回 0 */
if (now <= when)
return 0;
/* 删除键 */
server.stat_expiredkeys++;
propagateexpire(db,key,server.lazyfree_lazy_expire);
notifykeyspaceevent(notify_expired, "expired",key,db->id);
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key);
}
- 定时删除策略
for (j = 0; j < dbs_per_call; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);
current_db++; /* 超过25%的key已过期,则继续. */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples; /* 如果该db没有设置过期key,则继续看下个db*/
if ((num = dictSize(db->expires)) == 0)
{
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime(); /*但少于1%时,需要调整字典大小*/
if (num && slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1))
break;
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;// 20
while (num--)
{
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL)
break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now))
expired++;
if (ttl > 0)
{
/* We want the average TTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
}
/* Update the average TTL stats for this database. */
if (ttl_samples)
{
long long avg_ttl = ttl_sum/ttl_samples; /样本获取移动平均值 */
if (db->avg_ttl == 0)
db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
iteration++;
if ((iteration & 0xf) == 0)
{
/* 每迭代16次检查一次 */
long long elapsed = ustime()-start;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
if (elapsed > timelimit)
timelimit_exit = 1;
}
/* 超过时间限制则退出*/
if (timelimit_exit)
return; /* 在当前db中,如果少于25%的key过期,则停止继续删除过期key */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
依次遍历16个db,每个db选择20个key,判断key是否过期,如果当次选的key少于少于25%过期则进行下一个db如果多余25%再2234再来一次,并且在总用时超过250ms也停止这一个过程。
- 持久化关于过期的处理
如果是RDB的持久化方式则在文件中删除这个key,如果是AOF的持久化方式则在文件中显示的增加一条DEL命令,并且如果发生AOF重写则只需要不保存到文件中就行了。