SendMessageProcessor
asyncSendMessage
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
final RemotingCommand response = preSend(ctx, request, requestHeader); // 创建response的RemotingCommand
.........
}
preSend
判断是否还没到服务时间
final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
// 现在的时间 < startTimestamp (startAcceptSendRequestTimeStamp) 相当于现在是非工作时间 还没到服务时间
if (this.brokerController.getMessageStore().now() < startTimestamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
return response;
}
AbstractSendMessageProcessor
msgCheck
验证topic名称是否符合
// 验证名称是否合法 是否为空、是否合法的字符、长度不能大于127
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
验证当前topic是否在不允许发送队列中
默认有SCHEDULE_TOPIC_XXXX 如果存在是不允许发送
if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
return response;
}
验证topic信息最终是否存在
# 如果存在直接返回topicConfig
# 如果topicConfig 不存在,DefaultTopic也不存在直接返回null
# 如果topicConfig 不存在,DefaultTopic存在,则会创建topic配置信息,从DefaultTopic复制信息
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
# 如果是重试topic,则会创建RETRY队列
if (null == topicConfig) { // 消息消费失败时,消费者会将消息发往retry队列,等待重试
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { //topic是%RETRY%开头的
topicConfig = // 这里创建不判断
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag); // topicConfigTable存在则直接返回,不存在则创建RETRY队列配置信息
}
}
# 最终还是没有,则返回失败ResponseCode.TOPIC_NOT_EXIST
if (null == topicConfig) { // 到这里还是没有配置信息,则返回失败
response.setCode(ResponseCode.TOPIC_NOT_EXIST); // topic是否存在
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
验证queueId
验证queutId 不能大于等于该broker的读或写的最大queueId
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) { // 验证queutId 不能大于等于读或写的最大queueId 因为queueId从0开始
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
queueIdInt <0 ,则从该broker的写队列随机取1个索引
// 无效的queutId,则随机取1个
if (queueIdInt < 0) {
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
}
handleRetryAndDLQ
// 非重试Topic则返回true
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
RemotingCommand request,
MessageExt msg, TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 判断是否包含重试topic前缀(%RETRY%)
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); // 消费组
SubscriptionGroupConfig subscriptionGroupConfig = // 获取订阅组配置
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
// 获取订阅组配置的最大重试次数 默认:16 SubscriptionGroupConfig
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); // 从请求头获取最大重试次数
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); // 获取当前重试次数
if (reconsumeTimes >= maxReconsumeTimes) { //条件满足,则已经达到最大重试次数16
newTopic = MixAll.getDLQTopic(groupName); // 超过最大重试次数,生成死信队列topic, 生成规则: %DLQ% + groupName
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // 不存在则创建死信队列,存在则直接返回
DLQ_NUMS_PER_GROUP, // 1
PermName.PERM_WRITE, 0
);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
if (null == topicConfig) { // 不存在死信队列,返回错误信息
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true;
}