先推荐3篇文章,讲解延迟队列的实现原理:
代码实现:
@Service public class KafkaServiceImpl implements KafkaService, InitializingBean { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class); @Resource private KafkaTemplate<String, String> stringKafkaTemplate; @Resource private RedissonClient redissonClient; private RBlockingDeque<String> rBlockingDeque; private RDelayedQueue<String> rDelayedQueue; @Override public void send(String topic,String key,String jsonObject) { stringKafkaTemplate.send(new ProducerRecord<>(topic,key, jsonObject)); LOGGER.info("send to topic[{}], key[{}], jsonObject[{}]",topic, key, jsonObject); } @Override public void delaySend(String topic, String key, String jsonObject, Long delay, TimeUnit timeUnit) { KafkaInfo kafkaInfo = new KafkaInfo(); kafkaInfo.setTopic(topic); kafkaInfo.setKey(key); kafkaInfo.setJsonObject(jsonObject); this.rDelayedQueue.offer(JSON.toJSONString(kafkaInfo), delay, timeUnit); LOGGER.info("send delay [{}], timeUnit[{}] to topic[{}], key[{}], jsonObject[{}]",delay , timeUnit, topic, key, jsonObject); } @Override public void afterPropertiesSet() { this.rBlockingDeque = redissonClient.getBlockingDeque(KAFKA_DELAY_QUEUE); if (this.rBlockingDeque == null) { return; } this.rDelayedQueue = redissonClient.getDelayedQueue(rBlockingDeque); if (this.rDelayedQueue == null) { return; } this.startConsumerDelayQueue(); } private static class KafkaInfo implements Serializable { private static final long serialVersionUID = -5517223779255526862L; private String topic; private String key; private String jsonObject; public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getJsonObject() { return jsonObject; } public void setJsonObject(String jsonObject) { this.jsonObject = jsonObject; } } private void startConsumerDelayQueue() { Thread thread = new Thread(() -> { while (true) { try { String jsonObject = this.rBlockingDeque.take(); LOGGER.info("--> 延迟队列获取数据:{}",jsonObject); KafkaInfo kafkaInfo = JSON.parseObject(jsonObject, KafkaInfo.class); this.send(kafkaInfo.getTopic(), kafkaInfo.getKey(), kafkaInfo.getJsonObject()); } catch (InterruptedException e) { LOGGER.error("延迟队列获取数据异常...."); } } }); thread.setDaemon(true); thread.start(); } }
注意:
放入队列是使用的RDelayedQueue,获取队列是使用RQueue而不是RDelayedQueue。