RocketMQ-producer

首先调用start方法。完成各个类的初始化,启动多个定时任务,其中一个定时任务是updateTopicRouteInfoFromNameServer,这个方法里面和nameService建立长连接,同时维护了topicRouteTable和brokerAddrTable等缓存。topicRouteTable里面维护了这个topic包括有哪些queue和broker。这样producer才可以知道要发往哪里。

recketMq的长连接主要实现方式:

通过netty进行通信,会获取channel,然后会把channel缓存起来,避免每次都建立新连接。之后的通信都用这个channel,实现长连接。

启动的流程主要在这个方法中:

MQClientInstance#start

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

发送消息的过程。

  1. 先找到TopicPublishInfo。TopicPublishInfo里面有一个MessageQueue的list。
  2. 从MessageQueueList里面拿到一个messageQueue。(messageQueue是Queue和Borker的交集,通过查询nameService进行维护) 如果没有开启sendLatencyFaultEnable,默认就是采用轮询方法。具体的轮询方式就是,TopicPublishInfo里面维护了一个序号index,每次index自增1,然后通过index去MessageQueueList里面拿一个。
  3. 拿到了MessageQueue之后,里面有broker的name,根据name去找broker的ip地址,发送数据。这个ip地址也是updateTopicRouteInfoFromNameServer方法里面维护的。
上一篇:006-Golang1.17源码分析之select


下一篇:【C# Task】System.Threading.Channels 生产者和消费者模式