RocketMQ源码分析之Broker(3)

初始化关于通信安全的文件监听模块,用来观察网络加密配置文件的更改。

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
    // Register a listener to reload SslContext
    try {
        fileWatchService = new FileWatchService(
            new String[] {
                TlsSystemConfig.tlsServerCertPath,
                TlsSystemConfig.tlsServerKeyPath,
                TlsSystemConfig.tlsServerTrustCertPath
            },
            new FileWatchService.Listener() {
                boolean certChanged, keyChanged = false;

                @Override
                public void onChanged(String path) {
                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                        log.info("The trust certificate changed, reload the ssl context");
                        reloadServerSslContext();
                    }
                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                        certChanged = true;
                    }
                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                        keyChanged = true;
                    }
                    if (certChanged && keyChanged) {
                        log.info("The certificate and private key changed, reload the ssl context");
                        certChanged = keyChanged = false;
                        reloadServerSslContext();
                    }
                }

                private void reloadServerSslContext() {
                    ((NettyRemotingServer) remotingServer).loadSslContext();
                    ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                }
            });
    } catch (Exception e) {
        log.warn("FileWatchService created error, can't load the certificate dynamically");
    }
}

初始化事务消息检查模块

private void initialTransaction() {
    this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    if (null == this.transactionalMessageService) {
        this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
        log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    }
    this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    if (null == this.transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
        log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    }
    this.transactionalMessageCheckListener.setBrokerController(this);
    this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

初始化访问验证模块,主要验证这个请求签名是否正确,初始化rpc钩子模块

 private void initialAcl() {
    if (!this.brokerConfig.isAclEnable()) {
        log.info("The broker dose not enable acl");
        return;
    }

    List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
    if (accessValidators == null || accessValidators.isEmpty()) {
        log.info("The broker dose not load the AccessValidator");
        return;
    }

    for (AccessValidator accessValidator: accessValidators) {
        final AccessValidator validator = accessValidator;
        this.registerServerRPCHook(new RPCHook() {

            @Override
            public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                //Do not catch the exception
                validator.validate(validator.parse(request, remoteAddr));
            }

            @Override
            public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
            }
        });
    }
}


private void initialRpcHooks() {

    List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
    if (rpcHooks == null || rpcHooks.isEmpty()) {
        return;
    }
    for (RPCHook rpcHook: rpcHooks) {
        this.registerServerRPCHook(rpcHook);
    }
}

DefaultMessageStore

初始化关于消息存储的核心类

public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
    final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    this.messageArrivingListener = messageArrivingListener;
    this.brokerConfig = brokerConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.brokerStatsManager = brokerStatsManager;
    this.allocateMappedFileService = new AllocateMappedFileService(this);
    this.commitLog = new CommitLog(this);
    this.consumeQueueTable = new ConcurrentHashMap<>(32);

    this.flushConsumeQueueService = new FlushConsumeQueueService();
    this.cleanCommitLogService = new CleanCommitLogService();
    this.cleanConsumeQueueService = new CleanConsumeQueueService();
    this.storeStatsService = new StoreStatsService();
    this.indexService = new IndexService(this);
    this.haService = new HAService(this);

    this.reputMessageService = new ReputMessageService();

    this.scheduleMessageService = new ScheduleMessageService(this);

    this.transientStorePool = new TransientStorePool(messageStoreConfig);

    if (messageStoreConfig.isTransientStorePoolEnable()) {
        this.transientStorePool.init();
    }

    this.allocateMappedFileService.start();

    this.indexService.start();

    this.dispatcherList = new LinkedList<>();
    this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
    this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

    File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
    MappedFile.ensureDirOK(file.getParent());
    lockFile = new RandomAccessFile(file, "rw");
}

AllocateMappedFileService

主要用来接收创建文件的请求以及执行创建文件MappedFile的请求。

CommitLog

消息存储的具体操作类,初始化每一种文件的综合类MappedFileQueue,同步刷盘线程GroupCommitService,异步刷盘线程FlushRealTimeService,异步刷盘并且是主broker并且transientStorePoolEnable有效情况下才有效的消息异步提交线程,他和FlushRealTimeService组合起来使用,前者是往文件通道中写消息,后者是把文件通道中的数据落盘,所以异步刷盘建议开启TransientStorePoolEnable,DefaultAppendMessageCallback主要是把消息放入内存中,文件的映射内存MappedByteBuffer或者直接内存ByteBuffer.allocateDirect(fileSize),ThreadLocal<MessageExtBatchEncoder>主要用于对批量消息进行编码,PutMessageLock是在消息放入内存时所需要添加的锁,分为重入锁和自旋锁,异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整Broker配置项useReentrantLockWhenPutMessage,默认为false;因为同步刷盘本来竞争就比较大所以使用自旋锁的话就容易占用cpu资源,还不如使用重入锁等待唤醒。

public CommitLog(final DefaultMessageStore defaultMessageStore) {
    this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
        defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
    this.defaultMessageStore = defaultMessageStore;

    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        this.flushCommitLogService = new FlushRealTimeService();
    }

    this.commitLogService = new CommitRealTimeService();

    this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
    batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
        @Override
        protected MessageExtBatchEncoder initialValue() {
            return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
        }
    };
    this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();

}

初始化每个topic对应队列id的消息消费队列。ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable,每一个ConsumeQueue代表的也是一组文件,里面主要包含的是消息在commitLog物理文件中的偏移量offset以及消息大小和消息的tagsCode,该文件主要用于消费者拉取消息时使用。

FlushConsumeQueueService

定时持久化消费队列的数据

CleanCommitLogService

定时清理过期的消息存储文件,超过磁盘对应比例大小或者超过保存时间的文件

CleanConsumeQueueService

定期清理过期的消息消费队列文件以及索引文件,因为过期的消息已经被删除,这两个依赖消息文件的索引文件也就过期了。

IndexService

索引文件,提供了一种可以通过key或时间区间来查询消息的方法

HAService

主从通信服务,主要作用是把消息同步给从服务器。

public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
    this.defaultMessageStore = defaultMessageStore;
    this.acceptSocketService =
        new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
    this.groupTransferService = new GroupTransferService();
    this.haClient = new HAClient();
}

 

上一篇:ASP.NET Core - 依赖注入


下一篇:Dynamics 365 CE中插件的调试