启动过程
通过单元测试中的代码可以看到,在init()和terminate()这两个测试方法中,分别执行了Producer的start和shutdown方法。
说明RocketMQ Producer是个有状态服务,在发送消息前需要先启动Producer。这个启动过程,实际上就是为了发消息做的准备工作,所以,在分析发消息流程之前,我们需要先理清Producer中维护了哪些状态,在启动过程中,Producer都做了哪些初始化的工作。有了这个基础才能分析其发消息的实现流程。
init()
- 创建DefaultMQProducer实例
- 设置一些参数值
- 然后调用start。
跟进start方法的实现,继续分析其初始化过程。
- DefaultMQProducer#start()直接调用DefaultMQProducerImpl#start()
ocketMQ使用一个成员变量serviceState来记录和管理自身的服务状态,这实际上是(State Pattern)设计模式的变体。
状态模式允许一个对象在其内部状态改变时改变它的行为,对象看起来就像是改变了它的类。
与标准的状态模式不同的是,它没有使用状态子类,而是使用分支流程(switch-case)来实现不同状态下的不同行为,在管理比较简单的状态时,使用这种设计会让代码更加简洁。这种模式非常广泛地用于管理有状态的类,推荐你在日常开发中使用。
在设计状态的时候,有两个要点是需要注意的
不仅要设计正常的状态,还要设计中间状态和异常状态,否则,一旦系统出现异常,你的状态就不准确了,很难处理这种异常状态。比如在这段代码中,RUNNING和SHUTDOWN_ALREADY是正常状态,CREATE_JUST是一个中间状态,START_FAILED是一个异常状态。
这些状态之间的转换路径考虑清楚,并在进行状态转换的时候,检查上一个状态是否能转换到下一个状态。
比如这里,只有处于CREATE_JUST态才能转为RUNNING状,可确保这服务一次性,只能启动一次。避免了多次启动服务。
启动过程的实现:
通过一个单例模式(Singleton Pattern)的MQClientManager获取MQClientInstance的实例mQClientFactory,没有则自动创建新的实例
在mQClientFactory中注册自己
启动mQClientFactory
给所有Broker发送心跳。
其中实例mQClientFactory对应的类MQClientInstance是RocketMQ客户端中的顶层类,大多数情况下,可以简单地理解为每个客户端对应类MQClientInstance的一个实例。这个实例维护着客户端的大部分状态信息,以及所有的Producer、Consumer和各种服务的实例,想要学习客户端整体结构的同学可以从分析这个类入手,逐步细化分析下去。
我们进一步分析一下MQClientInstance#start()中的代码:
DefaultMQProducerImpl:Producer的内部实现类,大部分Producer的业务逻辑,也就是发消息的逻辑,都在这类。
消息发送过程
接下来我们一起分析Producer发送消息的流程。
在Producer的接口MQProducer中,定义了19个不同参数的发消息的方法,按照发送方式不同可以分成三类:
单向发送(Oneway):发送消息后立即返回,不处理响应,不关心是否发送成功;
同步发送(Sync):发送消息后等待响应;
异步发送(Async):发送消息后立即返回,在提供的回调方法中处理响应。
这三类发送实现基本上是相同的,异步发送稍微有一点儿区别,我们看一下异步发送的实现方法"DefaultMQProducerImpl#send()"(对应源码中的1132行):
@Deprecated public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override public void run() { long costTime = System.currentTimeMillis() - beginStartTime; if (timeout > costTime) { try { try { sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); } catch (MQBrokerException e) { throw new MQClientException("unknownn exception", e); } } catch (Exception e) { sendCallback.onException(e); } } else { sendCallback.onException(new RemotingTooMuchRequestException("call timeout")); } } }); } catch (RejectedExecutionException e) { throw new MQClientException("exector rejected ", e); } }
我们可以看到,RocketMQ使用了一个ExecutorService来实现异步发送:使用asyncSenderExecutor的线程池,异步调用方法sendSelectImpl(),继续发送消息的后续工作,当前线程把发送任务提交给asyncSenderExecutor就可以返回了。单向发送和同步发送的实现则是直接在当前线程中调用方法sendSelectImpl()。
我们来继续看方法sendSelectImpl()的实现:
// 省略部分代码 MessageQueue mq = null; // 选择将消息发送到哪个队列(Queue)中 try { List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } // 省略部分代码 // 发送消息 if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); } else { throw new MQClientException("select message queue return null.", null); } // 省略部分代码
方法sendSelectImpl()中主要的功能就是选定要发送的队列,然后调用方法sendKernelImpl()发送消息。
选择哪个队列发送由MessageQueueSelector#select决定。
RocketMQ使用策略模式解决不同场景下需要使用不同队列选择算法问题。
RocketMQ提供了很多MessageQueueSelector的实现,例如随机选择策略,哈希选择策略和同机房选择策略等
也可以自己实现选择策略。如果要保证相同key消息的严格顺序,你需要使用哈希选择策略,或提供一个自己实现的选择策略。
再看方法