概述
在上文中,我们讨论了消费者对于消息拉取的实现,对于 RocketMQ 这个黑盒的心脏部分,我们顺着消息的发送流程已经将其剖析了大半部分。本章我们不妨乘胜追击,接着讨论各种不同的消息的原理与实现。
事务消息
概念
RocketMQ 中的事务消息功能,实际上是 分布式事务中的本地事务表 的实现,只不过,在这里用消息中间件来代替了数据库,同时也帮我们做好了回查的操作。
在这点上,RocketMQ 和 Kafka 是截然不同的,kafka 的事务是用来实现 Exacltly Once 语义,且该语义主要用来流计算中,即在 "从 Topic 中读 -> 计算 -> 存到 Topic" 保证不被重复计算。
事务流程
- 客户端发送 half 消息
吐槽一下为什么要叫半消息(half message),叫 prepare 消息不是更直观吗
- Broker 将 half 消息持久化
- 客户端根据事务执行结果,发送 Commit / Rollback 消息
- Broker 收到 Commit 时,将事务消息对消费者可见。收到 Rollback 时,将消息丢弃
补偿
- Broker 过久未收到事务执行结果,询问客户端执行结果
- 客户端收到结果查询请求,执行回查方法,发送 Commit / Rollback 方法
- Broker 根据事务执行结果做出对应处理
源码流程
第一步
在设置好了事务监听器后(执行事务 与 事务回查),就可以发送事务消息
在将事务消息交给发送方法后,客户端首先会为消息添加事务消息的标识
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
然后将该事务消息会像普通的同步消息一样发送(且是同步发送)
sendResult = this.send(msg);
第二步
在 Broker 端接收到消息以后,会走与普通消息相同的底层通道(因为这个消息本身就只是个加上了 事务flag 的普通消息),然后由 TransactionalMessageService
来对这个消息进行额外处理。
首先会对该消息放入 real topic
属性和 real queue
属性,然后将消息 Topic 替换为用于处理所有事务消息的特殊的 Topic,当然该 Topic 对消费者是不可见的。
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
// 设置标记为未收到结果
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 替换到特殊的 Topic (RMQ_SYS_TRANS_HALF_TOPIC)
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
完成后,会送到 MessageStore 像普通消息一样处理
普通消息的具体流程见 RocketMQ源码详解 | Broker篇 · 其二:文件系统
第三步
回到 Producer 端,在事务消息发送完成后,该方法会使用专门的线程池执行事务
// 2.执行本地事务,更新事务获取状态
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
然后对本地的事务执行状态进行处理,也就是将该执行状态上报
this.endTransaction(msg, sendResult, localTransactionState, localException);
这里会发送一条 oneway 命令给 Broker 端,且使用的是 RequestCode.END_TRANSACTION
请求码
// 事务结果报告(可能是 commit 或 rollback)
public static final int END_TRANSACTION = 37;
完成处理后,该方法会将事务的发送结果和本地事务的执行结构都返回给上层 API
第四步
在 Broker 端,这里会由 EndTransactionProcessor
处理器来处理该请求码
然后,根据事务的执行结果来做不同的处理
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 事务执行成功,尝试完成事务
// 获取 half 消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
if (res.getCode() == ResponseCode.SUCCESS) {
// 将 half 消息取出,构造真实消息,然后投入实际上的 Topic
/* pass */
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
/*
* 找到半消息,进行删除
* 删除并不是物理上的删除,因为物理上的删除的代价十分的高昂,而是写入一条具有相同事务id的消息到 op Topic
*/
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
如果需要回滚,则对相应的半消息进行删除,且和上面一样,并不是物理上的删除,而是发送具有相同事务 id 的消息到 OP Topic,来标记这个事务已经完成了(Commit/Rollback), OP Topic 也是一个特殊的 Topic,同样对消费者不可见。
if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 事务执行失败,进行 half 消息的回滚
// 首先找到 half 消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 进行删除
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
当这些都做完后,一次事务就完成了。
补偿
当然啦,以上是顺利的情况,我们当然不能指望事务每一次都能执行成功、网络分区和宕机事件永远不会发生。
在一段时间后,如果客户端没有对事务的状态进行上报(或者上报的状态不是 Commit 或 Rollback,而是 Unknown), Broker 端当然就要进行事务状态的回查。
在 BrokerController
启动的时候,会开启事务状态检测服务,该服务会通过循环调用 TransactionalMessageServiceImpl.check()
方法,不断的扫描未结束的事务,同时对超过指定时间还不知道状态的事务进行回查操作。
check()
方法是事务回查的核心,由于很长,我们先来看第一部分(删减了没人在意的 Log)
// 首先找到存储所有 half 消息的 Topic
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
// 对其中每一个 queue 进行检查
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
// 获得对应的 op 消息所在的 queue
MessageQueue opQueue = getOpQueue(messageQueue);
// 获取未处理的 half 消息的起始偏移量
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
// 获取 op 消息的 queue 的起始偏移量
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
// 用来记录已经被处理了的 op 消息的偏移量
List<Long> doneOpOffset = new ArrayList<>();
// 用来记录已经完成了的 half 消息的偏移量
// key: halfOffset, value: opOffset
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
在 fillOpRemoveMap
方法中,主要是将 op 消息取出,来标记可以被移除的 half 消息(op 消息的存在代表对应事务的结束)
/**
* 读取op消息,解析op消息,填充removeMap
*
* @param removeMap 要删除的半消息,key: halfOffset,value: opOffset
* @param opQueue Op message queue.
* @param pullOffsetOfOp op message queue 的起始偏移量
* @param miniOffset half message queue 的当前最小偏移量
* @param doneOpOffset 存储已处理的 op 消息
* @return 获取到的 Op 消息
*/
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
// 首先通过 queue 获取 op 消息,最大数量为 32 条
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
/* pass: pullResult 消息的意外状态的处理 */
List<MessageExt> opMsg = pullResult.getMsgFoundList();
for (MessageExt opMessageExt : opMsg) {
// op 消息的 body 存储的是对应的 half 消息的偏移量, 现在将其取出
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
// 感觉这里的 Tag 并没有什么意义,无论是 Commit 还是 Rollback 都会加入这个 Tag
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
// 在 已处理偏移量 之前的话则可直接放入 已处理偏移量集合
if (queueOffset < miniOffset) {
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
// 否则放入需要移除的 half 的消息的集合
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
}
}
return pullResult;
}
然后进入到 check
方法的第二部分
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break;
// 推进最小已处理偏移量
if (removeMap.containsKey(i)) /* 如果该 half 消息存在对应的 op 消息,说明已经被处理了(commit/rollback) */ {
// 取出放入到已处理偏移量队列
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else /* 否则说明当前 half 消息悬而未决 */ {
// 取出对应的半消息
GetResult getResult = getHalfMsg(messageQueue, i);
/* pass: 半消息不存在时的意外处理 */
/*
* 检测是否要丢弃或跳过
* 丢弃条件: 当前事务已经超过了最大回查次数(15次)
* 跳过条件: 已经超过了过期文件最大保留时间(72小时)
*/
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
// 处理并推进偏移量
// 具体的处理方法是: 投入 TRANS_CHECK_MAX_TIME_TOPIC 这个 Topic,等待手动处理
listener.resolveDiscardMsg(msgExt);
// 进入到下一个 half 消息
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
break;
}
上面的方法很好理解,只是对于已经被标记结束的事务的处理、和未结束事务的补足
接下来是第三部分,这里将继续对未结束事务的补足,与进行可能的回查操作
// half 消息具有最小的检查时间(免疫时间), 检测时间以内可以跳过回查, 重新投入 half 消息的 Topic
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
break;
}
}
/*
* 对于当前事务的回查操作,需要满足三个条件之一
* 1.当前 op 消息的集合为空,且已经超过了最小检查时间(免疫时间)
* 2.最大偏移量的 op 消息的生成时间 已经超过了 最小检查时间
* 3.关闭最小检查时间
*/
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
// 先将当前 half 消息放回
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 然后向 Product 发送检测消息
listener.resolveHalfMsg(msgExt);
} else {
// 否则更新 op 消息集合,以确保能够断言该 half 消息的状态
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
continue;
}
}
newOffset = i + 1;
i++;
}
上面这段代码主要围绕 "是否进行回查" 展开,且涉及到 "免疫时间"。
在一个事务消息被发送后,对应事务的执行当然需要一定的执行时间,如果我们不设置这个时间立刻进行回查,那么很有可能时候事务还没执行完,对于大多数情况下还没执行完的事务进行回查,毫无疑问带来的收益很低。所以我们需要设定一个时间,在这个时间内的事务先暂时不回查,这个时间就叫做"免疫时间"。
然后再来看下需要进行回查的三种情况:
- 当 op 消息的集合为空,说明当前还没有收到让当前事务结束的通知,且超过了"免疫时间",故回查
- 当前 op 消息最大偏移量的生成时间超过了"免疫时间",说明该事务的提交消息可能丢失了,故回查
- 不启用 "免疫时间"
其中发送的回查消息的请求码为 RequestCode.CHECK_TRANSACTION_STATE
,发送的也是 oneway 消息
最后的第四部分,同时更新 half 和 op 消息在 Queue 中的偏移量
// 对所有的 half 消息计算完成后,更新偏移量
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
// 根据已经被标记为完成的 op 消息更新偏移量
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
// 如果不等,说明并不是所有的 op 消息都被标记为完成了
// 所以我们只将偏移量更新到第一个未完成的 op 消息的位置,其后面的 op 消息会在下次重复处理
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
然后在 Producer 这边,将由 ClientRemotingProcessor.checkTransactionState()
来处理回查操作
// 获取事务 ID
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
// 从 MQClientFactory 找到注册的对应 Producer
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 让 Producer 检查在对应 IP 上的事务状态
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
再进入 producer.checkTransactionState()
看看 Producer 是怎样检查事务状态的
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
// 取出当前 Producer 的事务监听器
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
// 调用其的事务回查方法
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
// 再将事务执行结果其发回给 Broker
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
最后发回的方法做的事情和在一开始发送事务状态的方法,所做的事情是一样的。Broker 做的处理也是一样的。
这样,补偿流程就执行完了。
批量消息
概念
在消息队列中,批量消息也是一个重要的部分,将消息压缩在一起发送不仅可以减少带宽的消耗,还能节省头部占用的空间。
有点失望的是,RocketMQ 对于批量消息的实现有点"粗糙"了
源码流程
首先,在调用 send()
的 batch 版本后,会先对批量消息进行校验
批量消息不允许延时、不允许发送到重试 Topic,且要求发送到的 Topic 必须是同一个 Topic
List<Message> messageList = new ArrayList<Message>(messages.size());
Message first = null;
for (Message message : messages) {
if (message.getDelayTimeLevel() > 0) {
throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
}
if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
throw new UnsupportedOperationException("Retry Group is not supported for batching");
}
if (first == null) {
first = message;
} else {
if (!first.getTopic().equals(message.getTopic())) {
throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
}
if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
}
}
messageList.add(message);
}
MessageBatch messageBatch = new MessageBatch(messageList);
在校验完成,且都放到一个 List 之后,接下来的步骤和普通的消息发送都差不多,只是在编码上理所当然的存在着不同
public static byte[] encodeMessages(List<Message> messages) {
//TO DO refactor, accumulate in one buffer, avoid copies
List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
int allSize = 0;
for (Message message : messages) {
// 编码每一个消息
byte[] tmp = encodeMessage(message);
encodedMessages.add(tmp);
allSize += tmp.length;
}
// 放到最后的大集合中
byte[] allBytes = new byte[allSize];
int pos = 0;
for (byte[] bytes : encodedMessages) {
System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
pos += bytes.length;
}
return allBytes;
}
然后使用 RequestCode.SEND_BATCH_MESSAGE
这个状态码发送出去。
在 Broker 端,其投入的过程大体上和普通消息类似,但是其最后的持久化到硬盘时,这块批量消息被拆分为了普通的单条消息。
即 RocketMQ 使用批量消息只减少了发送时的宽带传输,对于存储与交给消费者的部分并没有获得优化
// 拆分批量消息为每一个普通消息
while (messagesByteBuff.hasRemaining()) {
// 1 TOTALSIZE
final int msgPos = messagesByteBuff.position();
final int msgLen = messagesByteBuff.getInt();
final int bodyLen = msgLen - 40; //only for log, just estimate it
/* pass: 当作普通消息存储 */
queueOffset++;
msgNum++;
messagesByteBuff.position(msgPos + msgLen);
}
延时消息
概念
在业务中,有时候有一些延时提交任务的需求,这时候就可以使用延时消息,即在投递一部分时间后才对消费者可见。
不过,在 RocketMQ 中,延迟级别并不支持自定义,而是具有固定的延迟级别。
不过商业版的 阿里云MQ 可以支持秒精度的自定义延迟时间,果然是为了阉割社区版来赚钱吗
源码流程
RocketMQ 对于延时消息的处理主要在于 Broker 端,所以我们只需要看在 Broker 对延时级别的处理。
首先,在 CommitLog
的 put 中,会对延迟级别进行判断,如果存在,会在这进行进行 Topic 的替换,将其存储到对应的延迟级别的 Queue
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
然后会被在 DefaultMessageStore
中初始化的 ScheduleMessageService
处理
首先,该服务在启动时会进行初始化
public void start() {
// 保证只被执行一次
if (started.compareAndSet(false, true)) {
// 加载本地快照
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
// 取出每一个级别
Integer level = entry.getKey();
// 当前延迟级别对应的延迟时间
Long timeDelay = entry.getValue();
// 该延迟级别之前消费到的自己的队列的偏移量
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
// 每一个延迟级别设置一个定时任务
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 定时持久化各个延迟级别的偏移量
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
每一个延迟级别的 Queue 都有对应的定时任务,且都会执行以下方法
public void executeOnTimeup() {
// 找到自己延迟级别的消费队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
// 根据消费偏移量将指定的 MappedFile 文件加载进来
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
// 遍历每一个消息的索引
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
/* pass */
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) /* 目标时间小于当起时间,可以执行 */ {
// 根据偏移量取出消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
// 将延迟消息恢复成原本消息的样子
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
/* pass */
// 投入真实的 Topic
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
/* pass: 更新度量信息 */
} catch (Exception e) {
/* pass */
}
}
} else /* 否则,这个消息需要被消费的时间到了再通知我 */ {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
// 更新消费偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
// 走到这里,说明暂时没有需要消费的延时消息
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 小睡一会
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
/* pass */
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
可以看出,延迟消息的实现还是十分简单的,由于先投入的延时消息必先快于后投入的消息的到期,所以只需要不断的拉取各个延迟级别对应的队列 的头部的延迟消息即可。这也是只支持固定级别的延迟消息带来的好处。