broker处理发送消息判断逻辑

SendMessageProcessor

asyncSendMessage

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
	final RemotingCommand response = preSend(ctx, request, requestHeader); // 创建response的RemotingCommand
	.........
}

preSend

判断是否还没到服务时间

 final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
// 现在的时间 < startTimestamp (startAcceptSendRequestTimeStamp) 相当于现在是非工作时间 还没到服务时间
if (this.brokerController.getMessageStore().now() < startTimestamp) {
	response.setCode(ResponseCode.SYSTEM_ERROR);
	response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
	return response;
}

AbstractSendMessageProcessor

msgCheck

验证topic名称是否符合

// 验证名称是否合法  是否为空、是否合法的字符、长度不能大于127
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
	return response;
}

验证当前topic是否在不允许发送队列中

默认有SCHEDULE_TOPIC_XXXX 如果存在是不允许发送

if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
	return response;
}

验证topic信息最终是否存在

# 如果存在直接返回topicConfig
# 如果topicConfig 不存在,DefaultTopic也不存在直接返回null
# 如果topicConfig 不存在,DefaultTopic存在,则会创建topic配置信息,从DefaultTopic复制信息
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
	requestHeader.getTopic(),
	requestHeader.getDefaultTopic(),
	RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
	requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
# 如果是重试topic,则会创建RETRY队列
if (null == topicConfig) { // 消息消费失败时,消费者会将消息发往retry队列,等待重试
	if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { //topic是%RETRY%开头的
		topicConfig = // 这里创建不判断
			this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
				requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
				topicSysFlag); // topicConfigTable存在则直接返回,不存在则创建RETRY队列配置信息
	}
}
# 最终还是没有,则返回失败ResponseCode.TOPIC_NOT_EXIST
if (null == topicConfig) { // 到这里还是没有配置信息,则返回失败
	response.setCode(ResponseCode.TOPIC_NOT_EXIST); // topic是否存在
	response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
		+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
	return response;
}

验证queueId

验证queutId 不能大于等于该broker的读或写的最大queueId

int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) { // 验证queutId 不能大于等于读或写的最大queueId  因为queueId从0开始
	String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
		queueIdInt,
		topicConfig.toString(),
		RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

	log.warn(errorInfo);
	response.setCode(ResponseCode.SYSTEM_ERROR);
	response.setRemark(errorInfo);

	return response;
}

queueIdInt <0 ,则从该broker的写队列随机取1个索引

// 无效的queutId,则随机取1个

if (queueIdInt < 0) {
	queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); 
}

handleRetryAndDLQ

// 非重试Topic则返回true
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
								  RemotingCommand request,
								  MessageExt msg, TopicConfig topicConfig) {
	String newTopic = requestHeader.getTopic();
	if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  // 判断是否包含重试topic前缀(%RETRY%)
		String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); // 消费组
		SubscriptionGroupConfig subscriptionGroupConfig = // 获取订阅组配置
			this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
		if (null == subscriptionGroupConfig) {
			response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
			response.setRemark(
				"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
			return false;
		}
		// 获取订阅组配置的最大重试次数  默认:16 SubscriptionGroupConfig
		int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
		if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
			maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); // 从请求头获取最大重试次数
		}
		int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); // 获取当前重试次数
		if (reconsumeTimes >= maxReconsumeTimes) { //条件满足,则已经达到最大重试次数16
			newTopic = MixAll.getDLQTopic(groupName); // 超过最大重试次数,生成死信队列topic,  生成规则: %DLQ% + groupName
			int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
			topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,  // 不存在则创建死信队列,存在则直接返回
				DLQ_NUMS_PER_GROUP, // 1
				PermName.PERM_WRITE, 0
			);
			msg.setTopic(newTopic);
			msg.setQueueId(queueIdInt);
			if (null == topicConfig) { // 不存在死信队列,返回错误信息
				response.setCode(ResponseCode.SYSTEM_ERROR);
				response.setRemark("topic[" + newTopic + "] not exist");
				return false;
			}
		}
	}
	int sysFlag = requestHeader.getSysFlag();
	if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
		sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
	}
	msg.setSysFlag(sysFlag);
	return true;
}
上一篇:centos7上搭建http服务器以及设置目录访问


下一篇:Spring Boot + Redis 实现延时队列,写得太好了!