一、需求切入点
在公司做的一个系统业务需要有个定时提醒的功能(数据在mysql中),要求提醒的时间差精准到分钟
解决方案有:
- 使用定时器,每分钟执行一次,查符合提醒的数据,发起提醒(数据库连接与系统的负载都承受不住的!!)
- 将待提醒数据提前查出存进redis中,根据提醒时间设置过期时间,做redis的过期监听,监听到过期的数据再做业务处理(优点 : 不用实时查数据库,一定程度上减少系统压力 缺点: 一旦系统重启或者系统出现异常,可能导致一些过期的数据没有监听到,造成数据没有推送)
- 使用一个延时队列,利用redis的zset(sort set,有序不重复集合,关联分数score进行排序),将提醒时间作为分数,提取符合条件的score对应的集合发起提醒(本文所述也是围绕这个方案)
二、延时队列的基本操作流程- 基本流程图
- 代码实现
生产者,只关心数据进队列
- 基本流程图
public class MessageProvider {
// 延时队列的服务 通过这个bean来统一管理数据
private final DelayingQueueService delayingQueueService;
private static String APPOINTMENT_REMIND = "APPOINTMENT_REMIND";
/**
* 往队列中添加消息
* @param messageContent
*/
public void sendMessage(String messageContent, long delay) {
try {
//业务代码 ……
// 将分装好的数据写进队列
delayingQueueService.push(queueMessage);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 撤回消息 业务延伸
* @param members
*/
public void withdrawMessage(Long members){
delayingQueueService.removeByMembersId(members);
}
}
消费者,只关心需要消费的数据
// 从延伸队列拉取符合消费的数据
List<QueueMessage> msgList = delayingQueueService.pull();
```
msgList.stream().forEach(msg -> {
// 拿出已经到期的预约提示 发起提醒
if (current >= msg.getDelayTime()) {
try {
// 进行业务消费 ……
//成功消费后移除消息
delayingQueueService.remove(msg);
}
```
延时队列实现
public class DelayingQueueService {
private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
//key:membersId value:message
private static ConcurrentHashMap <Long,String> membersMap= new ConcurrentHashMap<>();
private final StringRedisTemplate redisTemplate;
/**
* 可以不同业务用不同的key
*/
public static final String QUEUE_NAME = "message:queue";
/**
* 锁key
*/
public static final String LOCK_KEY="message_lock_key";
/**
* 插入消息
*
* @param queueMessage
* @return
*/
@SneakyThrows
public Boolean push(QueueMessage queueMessage) {
String messageStr = mapper.writeValueAsString(queueMessage);
Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, messageStr, queueMessage.getDelayTime());
membersMap.put(membersId,messageStr);
return addFlag;
}
/**
* 移除消息
*
* @param queueMessage
* @return
*/
@SneakyThrows
public Boolean remove(QueueMessage queueMessage) {
Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(queueMessage));
if(remove>0){
membersMap.remove(membersId);
}
return remove > 0 ? true : false;
}
/**
* 拉取最新需要
* 被消费的消息
* rangeByScore 根据score范围获取 0-当前时间戳可以拉取当前时间及以前的需要被消费的消息
*
* @return
*/
public List<QueueMessage> pull() {
List<QueueMessage> msgList =new ArrayList<>();
try {
Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
if (strings == null) {
return null;
}
msgList = strings.stream().map(msg -> {
QueueMessage message = null;
try {
message = mapper.readValue(msg, QueueMessage.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return message;
}).collect(Collectors.toList());
} catch (Exception e) {
log.error(e.toString());
}
return msgList;
}
//获得锁
public Boolean getLock(){
boolean lock = false;
//获得锁
lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY,QUEUE_NAME+"is locking !",30, TimeUnit.SECONDS);
return lock;
}
public void releaseLock(){
redisTemplate.delete(LOCK_KEY);
}
}
三、结束语
本文所述的方法也是存在一些小的缺点,比如,数据的正常操作依赖于第三方组件,如果redis挂掉了,这个服务就down掉了,实现延时队列的方法有很多种,基于业务与系统本身的情况,兼容利弊去做一些取舍,以达到最好的效果