rocketmq核心源码分析第二十三篇一顺序消息

文章目录

顺序消息

使用demo

  • 通过MessageQueueSelector对mqs进行选择
  • 一般按业务维度保障分区顺序
defaultMQProducer.send(msg,new MessageQueueSelector(){
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    	通常用法: msg的业务唯一键相同的消息发送到同一队列
    	保障分区顺序性
        int i = (int)arg % mqs.size();
        return mqs.get(i);
    }
},16/* 为select方法的arg参数*/ );

源码分析

  • 顺序消息通过sendSelectImpl实现发送
  • 获取topic对应的TopicPublishInfo
  • 获取topic消息队列集合
  • 根据selector选择指定的消息队列
  • 消息发送
private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    long beginStartTime = System.currentTimeMillis();
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);
    获取topic发布信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        try {
            获取topic消息队列集合
            List<MessageQueue> messageQueueList =
                    mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());
            Message userMessage = MessageAccessor.cloneMessage(msg);
            String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());
            userMessage.setTopic(userTopic);
            根据selector选择指定的消息队列
            mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));
        } catch (Throwable e) {
            throw new MQClientException("select message queue throwed exception.", e);
        }

        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeout < costTime) {
            throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
        }
        消息发送
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
        } else {
            throw new MQClientException("select message queue return null.", null);
        }
    }
    validateNameServerSetting();
    throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

总结

  • 顺序消息是指将同一业务键的消息发往同一消息队列MessageQueue
  • 只保障分区顺序性
  • 消费端不管是并发消费还是顺序消费都是按照MessageQueue的维度进行拉取,但并发消费由于多线程干扰[所以顺序消费最好是采用ConsumeMessageOrderlyService]
上一篇:SpringBoot实战(二)之计划任务


下一篇:for循环定义数组