2、RocketMQ源码-NameServer启动流程

1、首先找到NameServer启动的主类

        该类在namesrv模块中,类全路径为org.apache.rocketmq.namesrv.NamesrvStartup,我们在启动NameServer的时候只需要运行该类就行了。

2、从main方法入口看NameServer启动都做了哪些事情?

public static void main(String[] args) {
    main0(args);
}

首先调用了main0方法,接着进入main0方法:

public static NamesrvController main0(String[] args) {
    try {    
        // 比较重要的方法
         NamesrvController controller = createNamesrvController(args);
         start(controller);
         String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
         log.info(tip);
         System.out.printf("%s%n", tip);
         return controller;
      } catch (Throwable e) {
          e.printStackTrace();
          System.exit(-1);
     }

   return null;
}

2.1、调用createNamesrvController方法创建NamesrvController实例

createNamesrvController方法初始化了一个Nameserver控制器,主要负责NameServer的组件管理。进入createNamesrvController方法:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();

    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }

    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876);

    // -c configFile 通过-c命令指定配置文件路径
    if (commandLine.hasOption('c')) {
         String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        // args数组传入的参数前面封装到了commandLine对象中;args参数格式为 “--属性名 属性值”
        // 如果以这种方式传入了参数,那么会将参数赋值给namesrvConfig对象上
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }

createNamesrvController方法主要做了几个事情:

    1)、解析运行命令 “-c configFile”指定的文件属性配置,并赋值到NamesrvConfig和NettyServerConfig类的属性上;

    2)、解析执行命令中是否有“-p”,有的话就打印从步骤1加载的属性配置

    3)、解析args参数格式为“--属性名 属性值”并赋值到NamesrvConfig相应的属性上

    4)、创建一个NamesrvController实例

NamesrvConfig、NettyServerConfig配置有哪些?
public class NamesrvConfig {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    // rocketmq主目录, 可以通过 -Drocketmq.home.dir=path设置
    // 也可以通过环境变量ROCKETMQ_HOME来配置rocketmq主目录
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    // NameServer存储KV配置属性的持久化路径
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    // 默认配置文件路径,默认不生效。 如果要使用配置文件配置,需要使用-c选项
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    // 顺序消息启用开关,默认为false关闭
    private boolean orderMessageEnable = false;
}



public class NettyServerConfig implements Cloneable {
    private int listenPort = 8888;
    private int serverWorkerThreads = 8;
    private int serverCallbackExecutorThreads = 0;
    private int serverSelectorThreads = 3;
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;

    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;

    /**
     * make make install
     *
     *
     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
     */
    private boolean useEpollNativeSelector = false;
}

2.2、调用start方法执行初始化操作

public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        // 出错JVM狗子函数。
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));

        controller.start();

        return controller;
    }

首先调用了NamesrvController#initialize方法进行初始化操作:

public boolean initialize() {

        this.kvConfigManager.load();

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        // 定时线程任务每隔十秒扫描一次Broker,移除处于不 激活状态的Broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        // 定时线程每隔十分钟打印一次KV配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }

然后在调用NamesrvController#start方法:

public void start() throws Exception {
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

主要做了下面几件事情:
1)、在NamesrvController#initialize中:KVConfigManager配置加载
2)、在NamesrvController#initialize中:NettyRemotingServer实例的创建
3)、在NamesrvController#initialize中:创建定时线程任务每隔十秒扫描一次Broker,移除处于不 激活状态的Broker
4)、 定时线程每隔十分钟打印一次KV配置#initialize中:创建定时线程每隔十分钟打印一次KV配置
5)、注册了一个JVM钩子函数
6)、在NamesrvController#start中:NettyRemotingServer服务启动
7)、在NamesrvController#start中:FileWatchService线程如果不为空则启动他

3、至此就完成了NamesrvServer的启动(详细后面再看),简单流程图如下:

2、RocketMQ源码-NameServer启动流程

 

上一篇:Kubernetes 之 Nameserver limits were exceeded


下一篇:分布式文件系统之Tfs是什么?