《RocketMQ源码系列》nameserver启动流程

创建nameserver

  可以看到我们启动 nameserver,就是执行 NamesrvStartup 类的main方法。看起来比较简单,应该就是创建了一个nameserver的控制器然后启动,这样 broker  就可以注册上来了。

《RocketMQ源码系列》nameserver启动流程

  首先,我们就看看 createNamesrvController() 方法,他具体是怎么创建的。

《RocketMQ源码系列》nameserver启动流程

   我们启动 nameserver,就是执行 runserver.sh 脚本,它里面封装了一堆 jvm 启动命令,其实就和我们自己部署 jar 到服务器上没什么两样。整段命令简化出来差不多就是 java -server -Xms4g -Xmx4g -Xmn2g org.apache.rocketmq.namesrv.NamesrvStartup 这么一个意思。

  一开是就是解析我们的 jvm 命令,然后创建了 namesreConfig、NettyServerConfig 对象,然后就基于这两个对象创建完 NamesrvController 直接返回了。其他的代码看起来似乎不重要,就是解析我们的启动参数、然后赋值到config对象而已。

  现在我们就来看看,上面创建的 namesreConfig、NettyServerConfig 2个对象是干嘛的。

 《RocketMQ源码系列》nameserver启动流程

  可以看到,这两个类里面就是一些基本的参数属性,没有其他逻辑了,那么基于 这两个对象 创建的 NamesrvController 应该就是拿到这些配置信息进行初始化,底层我们猜测应该是基于 nettyserver 监听 9876 端口,然后 brocker 进行注册、producer 拉取元数据。

《RocketMQ源码系列》nameserver启动流程

  也就是说到此位置,namesrv 已经创建完成,但此时还没有办法对外提供服务,所以我们接下来应该看看 start() 方法了。

启动netty服务器

  整个代码也比较简洁,initialize() 完之后,就直接 start() 启动了。一个好的开源框架、主流程一定是比较清晰的。我之前看 eureka-client 代码的时候,那代码是真的一言难尽,层次含糊不清、到处硬编码、逻辑也不严谨。

《RocketMQ源码系列》nameserver启动流程

initialize()

  我们看看 NamesrvController 的 initialize() 方法,首先是load()方法,应该是在之前创建 NamesrvController  的时候给的一些配置,然后直接就创建出  NettyRemotingServer 网络服务组件 扔到线程池里面去了,NettyRemotingServer 在创建的时候,构造方法 new 了一个 ServerBootstrap ,他才是netty服务器的核心,所以可以猜测应该它是 对外提供服务的组件。

《RocketMQ源码系列》nameserver启动流程

   然后后面代码就是 心跳机制线程了,和我们启动没啥关系,现在我们就可以看看 start()干了些啥了。

《RocketMQ源码系列》nameserver启动流程

start()

  我们刚刚初始化的时候,就创建了 NettyRemotingServer ,现在刚好就是启动这个组件,那么我们启动 nameserver 的核心逻辑,必定就是这个无疑了。

《RocketMQ源码系列》nameserver启动流程

    @Override
    public void start() {
        // 又是一个后台执行的线程,主要是netty网络上的配置,先跳过
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
        // 准备环境
        prepareSharableHandlers();

        ServerBootstrap childHandler =
            // 这里都是netty的一些配置。
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                // 是设置了Netty服务器要监听的端口号,默认就是9876
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                // 这里是一大堆网络请求处理器。netty服务器收到一个请求,就会一次使用下面处理器来处理请求
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                // 这是负责编码解码的
                                new NettyDecoder(),
                                // 这是负责连接空闲管理的
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                // 这是负责网络连接管理的
                                connectionManageHandler,
                                // 这是负责关键的网络请求处理的
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            // 这里就是启动netty服务器了,bind方法就是绑定和监听一个端口号
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

可以看到 nameserver 的启动并不复杂,画个图简单梳理下。

《RocketMQ源码系列》nameserver启动流程

 。

上一篇:rocketmq常见问题及使用 新手篇


下一篇:redis专题九:redis持久化(上)