首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务是updateTopicRouteInfoFromNameServer,这个方法里面和nameService建立长连接,同时维护了topicRouteTable和brokerAddrTable等缓存。topicRouteTable里面维护了这个topic包括有哪些queue和broker。这样producer才可以知道要发往哪里。
recketMq的长连接主要实现方式:
通过netty进行通信,会获取channel,然后会把channel缓存起来,避免每次都建立新连接。之后的通信都用这个channel,实现长连接。
启动的流程主要在这个方法中:
MQClientInstance#start
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
发送消息的过程。
- 先找到TopicPublishInfo。TopicPublishInfo里面有一个MessageQueue的list。
- 从MessageQueueList里面拿到一个messageQueue。(messageQueue是Queue和Borker的交集,通过查询nameService进行维护) 如果没有开启sendLatencyFaultEnable,默认就是采用轮询方法。具体的轮询方式就是,TopicPublishInfo里面维护了一个序号index,每次index自增1,然后通过index去MessageQueueList里面拿一个。
- 拿到了MessageQueue之后,里面有broker的name,根据name去找broker的ip地址,发送数据。这个ip地址也是updateTopicRouteInfoFromNameServer方法里面维护的。