文章目录
Broker启动
入口
public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController start(BrokerController controller) {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
创建BrokerController
对象
与Namesrv
的启动方式类似,首先调用createBrokerController
方法创建BrokerController
对象:
public static BrokerController createBrokerController(String[] args) {
//设置版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//设置netty发送缓存区的大小
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}
//设置netty接收缓存区的大小
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
//broker配置相关对象
final BrokerConfig brokerConfig = new BrokerConfig();
//作为netty服务端,与Producer/Consumer通信
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//作为netty的客户端,与namesrv通信
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
//服务端监听10911端口
nettyServerConfig.setListenPort(10911);
//消息存储配置
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
//如果是从节点
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
//AccessMessageInMemoryMaxRatio: 表示 RocketMQ 所能使用的最大内存比例,超过该内存,消息将被置换出内存 master ==> 40% slave ==> 30%
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
//-c 指定配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
//将配置文件中对应的值填充到brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig四个对象中
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
//必须设置ROCKETMQ_HOME环境变量
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//解析namesrv的地址
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
//brokerId=0 ==> master brokerId > 0 slave
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
//是否选择 dleger技术 后续了解
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
//Master监听Slave请求的端口,默认为服务端口+1
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
//-p: 启动时候日志打印配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {//-m:启动时候日志打印导入的配置信息
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
//创建BrokerController对象
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
//调用BrokerController的initialize方法
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//注册JVM钩子,优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
上面这段代码主要可以分为下边几个部分:
-
配置文件的解析及加载
一共生成了
brokerConfig
,nettyServerConfig
,nettyClientConfig
,messageStoreConfig
保存配置信息,并生成BrokerController
对象;其中
brokerConfig
对象对应的配置文件如下:#所属集群名字 brokerClusterName = DefaultCluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName = broker-a #0代表master,>0代表slave brokerId = 0 #nameServer地址,分号分隔 namesrvAddr=rocketmq-namesrv1:9876;rocketmq-namesrv2:9876 #在发送消息时,自动创建服务器不存在的Topic,默认创建队列数 defaultTopicQueueNums=4 #是否允许broker自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许broker自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup-true #broker 对外提供服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨四点 deleteWhen = 04 #文件保留时间,默认48小时 fileReservedTime = 48 # commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30w条, 根据业务情况调整 mapedFileSizeConsumeQueue=30000 # destroyMapedFileIntervalForcibly=12000 # redeleteHangedFileInterval=12000 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 # 存储路径 storePathRootDir=/usr/local/server/mq/rocketmq/store # commitLog存储路径 storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog # 消息队列储存路径 storePathConsumeQueue=/usr/local/server/mq/rocketmq/store/consumequeue # 消息索引粗存路径 storePathIndex=/usr/local/server/mq/rocketmq/store/index # checkpoint 文件储存路径 storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint # abort 文件存储路径 abortFile=/usr/local/server/mq/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 # flushCommitLogLeastPages=4 # flushConsumeQueueLeastPages=2 # flushCommitLogThoroughInterval=10000 # flushConsumeQueueThoroughInterval=60000 # Broker的角色 # -ASYNC_MASTER 异步复制Master # -SYNC_MASTER 同步双写Master # -SLAVE brokerRole=ASYNC_MASTER # 刷盘方式 # - ASYNC_FLUSH 异步刷盘 # - SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH # checkTransactionMessageEnable=false # 发消息线程池数量 # sendMessageTreadPoolNums=128 # 拉消息线程池数量 # pullMessageTreadPoolNums=128lushDiskType=ASYNC_FLUSHH
其余配置不做重点学习.
-
创建
BrokerController
对象并初始化BrokerController
构造函数:public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { //四个配置对象的赋值 this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; this.nettyClientConfig = nettyClientConfig; this.messageStoreConfig = messageStoreConfig; //管理每个consumer group消息消费的进度。Cluster的consumer会在消息消费成功后把offset信息同步给broker this.consumerOffsetManager = new ConsumerOffsetManager(this); //TopicConfigManager:管理所有broker上存在的topic以及queue的信息。topic的数据会定时和nameserv做同步,以更新Nameserv上的topic路由信息。 this.topicConfigManager = new TopicConfigManager(this); //针对consumer的请求拉取消息的事件处理,基于netty框架,解析并执行内部业务功能,最后将数据返回 this.pullMessageProcessor = new PullMessageProcessor(this); //针对客户端请求的服务保持,是java的Thread模式,主要是监听消息有新的时候通知客户端执行拉取操作 this.pullRequestHoldService = new PullRequestHoldService(this); //新消息到达的监听服务,联合PullRequestHostService进行消息到达的通知功能 this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); //consumer的管理和维护,提供consumer的注册,取消,关闭等 this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); //consumer的过滤管理,针对consumer端的消息过滤,主要关系是topic,consumer,expression this.consumerFilterManager = new ConsumerFilterManager(this); //producer的管理和维护,提供producer的注册,取消,关闭等 this.producerManager = new ProducerManager(); //基于netty的框架实现,主要监听客户端的网络操作,网络的链接、关闭、异常、空闲等事件操作 this.clientHousekeepingService = new ClientHousekeepingService(this); //broker对请求处理的封装类,处理对应的操作,主要有通知,重置,转换,状态等 this.broker2Client = new Broker2Client(this); this.subscriptionGroupManager = new SubscriptionGroupManager(this); //broker请求外部的封装,主要是通过netty的底层通信完成和namesrv的交互 this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); //基于shell的服务端过滤管理 this.filterServerManager = new FilterServerManager(this); this.slaveSynchronize = new SlaveSynchronize(this); //各类队列的初始化 this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity()); this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity()); //broker的服务状态管理,实时记录broker的操作性能 this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( log, BrokerPathConfigHelper.getBrokerConfigPath(), this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); }
ProducerManager:producer的管理和维护,提供producer的注册,取消,关闭等:
public class ProducerManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3; //记录producer的物理连接 private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>(); private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter(); public ProducerManager() { } ... }
ConsumerManager:consumer的管理和维护,提供consumer的注册,取消,关闭等:
public class ConsumerManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; //维护消费者组名以及具体信息之间的关系 private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); //处理消费者的连接,端开,变更等操作 private final ConsumerIdsChangeListener consumerIdsChangeListener; public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) { this.consumerIdsChangeListener = consumerIdsChangeListener; } ... }
通过
Map
维护消费者组名及具体内容,其中ConsumerGroupInfo
的数据结构如下:public class ConsumerGroupInfo { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); //组名 private final String groupName; //topic及其订阅关系 SubscriptionData主要记录了topic的tag信息等 private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>(); //物理连接信息 private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16); //消费类型 PULL(推送) || PUSH(拉取) private volatile ConsumeType consumeType; //消息模式 BROADCASTING(广播) || CLUSTERING(集群) private volatile MessageModel messageModel; //消费位点 private volatile ConsumeFromWhere consumeFromWhere; //最后一次更新时间 private volatile long lastUpdateTimestamp = System.currentTimeMillis(); }
ConsumerOffsetManager:管理每个consumer group消息消费的进度。Cluster的consumer会在消息消费成功后把offset信息同步给broker:
public class ConsumerOffsetManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); private transient BrokerController brokerController; public ConsumerOffsetManager() { } public ConsumerOffsetManager(BrokerController brokerController) { this.brokerController = brokerController; } ... public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; this.commitOffset(clientHost, key, queueId, offset); } //更新偏移量 private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap<Integer, Long>(32); map.put(queueId, offset); this.offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); } } } //查询偏移量 public long queryOffset(final String group, final String topic, final int queueId) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; } return -1; } }
可以清晰的看到第一个
Map
的键为topic@group
,值是一个Map
;第二个Map
的值是queueId
,值是offset
;TopicConfigManager:管理所有broker上存在的topic以及queue的信息。topic的数据会定时和nameserv做同步,以更新Nameserv上的topic路由信息:
public class TopicConfigManager extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18; private transient final Lock lockTopicConfigTable = new ReentrantLock(); //key:topic value:TopicConfig ==> 读写队列数量等信息 private final ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(1024); private final DataVersion dataVersion = new DataVersion(); private transient BrokerController brokerController; public TopicConfigManager() { } public TopicConfigManager(BrokerController brokerController) { this.brokerController = brokerController; { String topic = TopicValidator.RMQ_SYS_SELF_TEST_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } } { String topic = TopicValidator.RMQ_SYS_BENCHMARK_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1024); topicConfig.setWriteQueueNums(1024); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName(); TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; } topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { String topic = this.brokerController.getBrokerConfig().getBrokerName(); TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); int perm = PermName.PERM_INHERIT; if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) { perm |= PermName.PERM_READ | PermName.PERM_WRITE; } topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { String topic = TopicValidator.RMQ_SYS_OFFSET_MOVED_EVENT; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); topicConfig.setWriteQueueNums(SCHEDULE_TOPIC_QUEUE_NUM); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) { String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName(); TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } } { String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX; TopicConfig topicConfig = new TopicConfig(topic); TopicValidator.addSystemTopic(topic); topicConfig.setReadQueueNums(1); topicConfig.setWriteQueueNums(1); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } } }
Map
中存储的是topicName
及其对应的topicConfig
;初始化
initialize
方法:public boolean initialize() throws CloneNotSupportedException { //配置文件的加载 boolean result = this.topicConfigManager.load(); result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try { //加载存储服务,默认使用CommitLog this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig); //是否启用Dleger 默认不启用 if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); } //用于统计数据 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin 上下文 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } //从持久化文件加载CommitLog、ConsumeQueue、IndexFile,并恢复进度 result = result && this.messageStore.load(); if (result) { //启动netty服务端 clientHousekeepingService监听消费者(Producer/Consumer)的连接存活情况 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); //vip通道 不处理Consumer的PullRequest fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); //处理Producer消息的线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); //处理Consumer的线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); //消息应答的线程池 this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getProcessReplyMessageThreadPoolNums(), this.brokerConfig.getProcessReplyMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.replyThreadPoolQueue, new ThreadFactoryImpl("ProcessReplyMessageThread_")); //消息查询的线程池 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); //Broker管理的线程池 this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); //Client管理的线程池 this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); //处理心跳的线程池 this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.getHeartbeatThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_", true)); //处理事务的线程池 this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getEndTransactionThreadPoolNums(), this.brokerConfig.getEndTransactionThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.endTransactionThreadPoolQueue, new ThreadFactoryImpl("EndTransactionThread_")); //Consumer管理的线程池 this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); //注册消息处理器以及处理的线程池 this.registerProcessor(); //计算第一次延时时间 final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; //定时线程,broker统计数据相关,每天打印一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); //每十秒一次,持久化ConsumerOffset this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); //每十秒一次,持久化ConsumerFilter this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); //每3分钟一次,如果Consumer耗时过大,禁用该Consumer this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); //每一秒钟一次,打印生产者消费队列信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); //每分钟一次,打印Dispatch落后情况 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); //配置文件中已经加载了NamesrvAddr的地址,则更新本地NameSrv列表缓存 if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { //如果设置从指定端口获取 则每两分钟执行一次 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } //开启DLeger 默认不开启 if (!messageStoreConfig.isEnableDLegerCommitLog()) { if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } } else { //打印slave落后信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } } if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); ((NettyRemotingServer) fastRemotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } //加载事务实例 initialTransaction(); //Access Controler List initialAcl(); //注册一些钩子 initialRpcHooks(); } return result; }
首先创建
DefaultMessageStore
:public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; this.brokerConfig = brokerConfig; this.messageStoreConfig = messageStoreConfig; this.brokerStatsManager = brokerStatsManager; //文件分配管理服务 this.allocateMappedFileService = new AllocateMappedFileService(this); //是否启用Dleger提交日志,默认使用CommitLog if (messageStoreConfig.isEnableDLegerCommitLog()) { this.commitLog = new DLedgerCommitLog(this); } else { this.commitLog = new CommitLog(this); } //消费者对列表 {topic:{queueId:consumeQueue}} this.consumeQueueTable = new ConcurrentHashMap<>(32); //刷新队列服务 this.flushConsumeQueueService = new FlushConsumeQueueService(); //清理磁盘服务,清理CommitLog文件,主要磁盘占用率过高的时清理 this.cleanCommitLogService = new CleanCommitLogService(); //清理消费者队列服务 this.cleanConsumeQueueService = new CleanConsumeQueueService(); //存储计次服务 this.storeStatsService = new StoreStatsService(); //索引服务 this.indexService = new IndexService(this); if (!messageStoreConfig.isEnableDLegerCommitLog()) { // 做高可用,主要为了保证主从同步 this.haService = new HAService(this); } else { this.haService = null; } //计算消息偏移量服务 this.reputMessageService = new ReputMessageService(); //延迟消息服务 this.scheduleMessageService = new ScheduleMessageService(this); this.transientStorePool = new TransientStorePool(messageStoreConfig); if (messageStoreConfig.isTransientStorePoolEnable()) { this.transientStorePool.init(); } //启动文件管理服务 this.allocateMappedFileService.start(); //启动索引服务 this.indexService.start(); //调度列表 this.dispatcherList = new LinkedList<>(); this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue()); this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex()); //获取消息存储根路径 File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir())); MappedFile.ensureDirOK(file.getParent()); //读写权限 lockFile = new RandomAccessFile(file, "rw"); }
DefaultMessageStore
启动了多个服务用来管理broker
的存储;对象创建完毕之后调用load
方法:public boolean load() { boolean result = true; try { //判断user.home.store文件是否存在 此文件存在则不正常 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); //加载延时消息的消费进度 if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); } // load Commit Log 加载Commit Log result = result && this.commitLog.load(); // load Consume Queue 加载ConsumeQueue result = result && this.loadConsumeQueue(); if (result) { //加载检查点 this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); //根据上次是否是正常退出加载索引目录 this.indexService.load(lastExitOK); // 根据上次是否正常退出,恢复ConsumeQueue,CommitLog的指针和TopicQueue目录 this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } } catch (Exception e) { log.error("load exception", e); result = false; } if (!result) { this.allocateMappedFileService.shutdown(); } return result; }
CommitLog
:public CommitLog(final DefaultMessageStore defaultMessageStore) { //目录 ${user.home}/store/commitLog //单文件默认大小1024*1024*1024 this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); this.defaultMessageStore = defaultMessageStore; //同步刷盘 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); //异步刷盘 } else { this.flushCommitLogService = new FlushRealTimeService(); } //异步刷盘 this.commitLogService = new CommitRealTimeService(); //在回调中执行 this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() { @Override protected MessageExtBatchEncoder initialValue() { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); }
CommitLog
由MappedFile
文件组成,默认大小1G
,初始化会通过MappedFile
对象将相关信息加载进内存:CommitLog.load
public boolean load() { boolean result = this.mappedFileQueue.load(); log.info("load commit log " + (result ? "OK" : "Failed")); return result; }
MappedFileQueue.load
public boolean load() { //读取文件目录下所有文件 File dir = new File(this.storePath); File[] files = dir.listFiles(); if (files != null) { // ascending order MappedFile文件根据第一个文件的messageId命名,可以通过文件名直接排序 Arrays.sort(files); for (File file : files) { //文件大小不对 if (file.length() != this.mappedFileSize) { log.warn(file + "\t" + file.length() + " length not matched message store config value, please check it manually"); return false; } try { //创建MappedFile文件映射 MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); //初始指针都设置为最大值 mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); log.info("load " + file.getPath() + " OK"); } catch (IOException e) { log.error("load file " + file + " error", e); return false; } } } return true; }
DefaultMessageStore.loadConsumeQueue
private boolean loadConsumeQueue() { // ${user.home}/store/consumequeue目录 // 结构是${user.home}/store/consumequeue/{topic}/{queueid}/{filename} File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())); //Topic目录列表 File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { for (File fileTopic : fileTopicList) { // Topic名称为目录名 String topic = fileTopic.getName(); // 该Topic目录下的Queue目录列表 File[] fileQueueIdList = fileTopic.listFiles(); if (fileQueueIdList != null) { // 遍历该Topic目录下的Queue目录列表 for (File fileQueueId : fileQueueIdList) { int queueId; try { // 队列Id为目录名 queueId = Integer.parseInt(fileQueueId.getName()); } catch (NumberFormatException e) { continue; } //根据topic,queueId创建ConsumeQueue ConsumeQueue logic = new ConsumeQueue( topic, queueId, StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(), this); // 加入this.consumeQueueTable // 结构为ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> this.putConsumeQueue(topic, queueId, logic); // 加载ConsumeQueue,类似CommitLog的加载方式 if (!logic.load()) { return false; } } } } } log.info("load logics queue all over, OK"); return true; }
indexService.load
public boolean load(final boolean lastExitOK) { // ${user.home}/store/index目录 File dir = new File(this.storePath); // 目录下的IndexFile文件列表 File[] files = dir.listFiles(); if (files != null) { // ascending order Arrays.sort(files); for (File file : files) { try { IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0); // 加载IndexFile,恢复索引 f.load(); // 上次是非正常退出 if (!lastExitOK) { // IndexFile的最新一条消息的时间 > 检查点的时间 if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint() .getIndexMsgTimestamp()) { // 销毁并跳过 f.destroy(0); continue; } } // 加入文件列表 log.info("load index file OK, " + f.getFileName()); this.indexFileList.add(f); } catch (IOException e) { log.error("load file {} error", file, e); return false; } catch (NumberFormatException e) { log.error("load file {} error", file, e); } } } return true; }
IndexFile
的功能是通过Key
或时间来查询消息 :- 默认位置
${user.home}/store/index/{fileName}
,文件名为创建时间戳 - 一个
IndexFile
大小为40 + 5000000 * 4 + 20000000 * 20 = 420000040
,依次包括: -
IndexHeader:共40字节
-
BeginTimestamp:第一个
Index
的时间戳,8字节 -
EndTimestamp:最后一个
Index
的时间戳,8字节 -
BeginPhyOffset:第一个
Index
对应的消息在CommitLog的物理偏移,8字节 -
EndPhyOffset:最后一个
Index
对应的消息在CommitLog的物理偏移,8字节 -
HashSlotCount:当前
Slot
的数量,4字节 -
IndexCount:当前
Index
的数量,4字节
-
BeginTimestamp:第一个
-
SlotTable:
Slot
列表,存放该Slot
对应的最新的Index的序号,每个Slot
共4字节,默认共5000000个Slot
-
IndexLinkedList:
Index
列表,每个Index
都有同Slot
的前一个Index
的位置偏移,形成一个链表,每个Index共20字节,默认共20000000个Index
DefaultMessageStore.recover
:private void recover(final boolean lastExitOK) { // 恢复ConsumeQueue的刷盘指针,返回最大的物理偏移 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); //正常退出 if (lastExitOK) { //正常恢复CommitLog的刷盘指针 this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); //非正常退出 } else { this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this.recoverTopicQueueTable(); }
DefaultMessageStore#recoverConsumeQueue
:private long recoverConsumeQueue() { long maxPhysicOffset = -1; //遍历所有topic for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { //遍历所有消息队列 for (ConsumeQueue logic : maps.values()) { //检查ConsumeQueue的内容 恢复刷盘指针 logic.recover(); if (logic.getMaxPhysicOffset() > maxPhysicOffset) { //更新最大偏移量 maxPhysicOffset = logic.getMaxPhysicOffset(); } } } return maxPhysicOffset; }
DefaultMessageStore#recover
:更新MappedFileQueue的偏移量public void recover() { //获取所有MappedFile文件 final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { //检查最后三个文件 int index = mappedFiles.size() - 3; if (index < 0) index = 0; //获取当前文件的大小 int mappedFileSizeLogics = this.mappedFileSize; //获取对应位置的文件 MappedFile mappedFile = mappedFiles.get(index); //创建新的子缓冲区 ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); //获取起始偏移量 long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; long maxExtAddr = 1; while (true) { //遍历所有的消息 大小为20个字节 for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); long tagsCode = byteBuffer.getLong(); if (offset >= 0 && size > 0) { mappedFileOffset = i + CQ_STORE_UNIT_SIZE; //计算出最大物理偏移量 this.maxPhysicOffset = offset + size; if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } } else { log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " + offset + " " + size + " " + tagsCode); break; } } //for循环结束 当前文件检查结束 //当前文件检查结束 所有单元都合法 if (mappedFileOffset == mappedFileSizeLogics) { index++; if (index >= mappedFiles.size()) { //所有文件都检查结束 log.info("recover last consume queue file over, last mapped file " + mappedFile.getFileName()); break; } else { //继续检查下一个文件 mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next consume queue file, " + mappedFile.getFileName()); } } else { //文件异常 不需要再继续检查 log.info("recover current consume queue queue over " + mappedFile.getFileName() + " " + (processOffset + mappedFileOffset)); break; } } //文件检查结束 获取最大偏移量 processOffset += mappedFileOffset; //设置刷盘及提交位置的指针 this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); //截断后面的无效文件 this.mappedFileQueue.truncateDirtyFiles(processOffset); if (isExtReadEnable()) { this.consumeQueueExt.recover(); log.info("Truncate consume queue extend file by max {}", maxExtAddr); this.consumeQueueExt.truncateByMaxAddress(maxExtAddr); } } }
ConsumeQueue#recoverTopicQueueTable
:更新消费队列的最小偏移量public void recoverTopicQueueTable() { HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); long minPhyOffset = this.commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); logic.correctMinOffset(minPhyOffset); } } //设置消息消费队列偏移量 this.commitLog.setTopicQueueTable(table); }
//注册处理线程
BrokerController.registerProcessor
public void registerProcessor() { /** * SendMessageProcessor */ SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); //处理producer发送消息 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); //处理producer发送批量消息 this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); //处理consumer发回broker的消息,要求重新消费 this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); // VIPChannel共用处理器和业务线程池,只是监听的端口不同 this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); /** * PullMessageProcessor */ // 处理Consumer拉取消息请求,VIPChannel没有这个 this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); //钩子 this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * ReplyMessageProcessor 消息重试 */ ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); replyMessageProcessor.registerSendMessageHook(sendMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); /** * QueryMessageProcessor 查询消息 */ NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); /** * ClientManageProcessor 客户端请求处理 */ ClientManageProcessor clientProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); /** * ConsumerManageProcessor 消费端请求处理 */ ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); /** * EndTransactionProcessor 事务请求处理 */ this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); /** * Default */ AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); }
- 默认位置
-
注册
JVM
钩子函数并调用BrokerController.shutdown()
函数实现优雅关闭public void shutdown() { if (this.brokerStatsManager != null) { this.brokerStatsManager.shutdown(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.shutdown(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.shutdown(); } if (this.remotingServer != null) { this.remotingServer.shutdown(); } if (this.fastRemotingServer != null) { this.fastRemotingServer.shutdown(); } if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } if (this.messageStore != null) { this.messageStore.shutdown(); } this.scheduledExecutorService.shutdown(); try { this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } this.unregisterBrokerAll(); if (this.sendMessageExecutor != null) { this.sendMessageExecutor.shutdown(); } if (this.pullMessageExecutor != null) { this.pullMessageExecutor.shutdown(); } if (this.replyMessageExecutor != null) { this.replyMessageExecutor.shutdown(); } if (this.adminBrokerExecutor != null) { this.adminBrokerExecutor.shutdown(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.shutdown(); } this.consumerOffsetManager.persist(); if (this.filterServerManager != null) { this.filterServerManager.shutdown(); } if (this.brokerFastFailure != null) { this.brokerFastFailure.shutdown(); } if (this.consumerFilterManager != null) { this.consumerFilterManager.persist(); } if (this.clientManageExecutor != null) { this.clientManageExecutor.shutdown(); } if (this.queryMessageExecutor != null) { this.queryMessageExecutor.shutdown(); } if (this.consumerManageExecutor != null) { this.consumerManageExecutor.shutdown(); } if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } if (this.transactionalMessageCheckService != null) { this.transactionalMessageCheckService.shutdown(false); } if (this.endTransactionExecutor != null) { this.endTransactionExecutor.shutdown(); } }
调用start
方法
资源加载完毕,调用start
方法真正启动:
public void start() throws Exception {
//启动消息存储服务
if (this.messageStore != null) {
this.messageStore.start();
}
//启动netty服务端,监听请求
if (this.remotingServer != null) {
this.remotingServer.start();
}
//VIP通道
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
//启动TLS证书检测服务
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
//启动netty客户端,连接namesrv
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
// 启动PushConsumer的PullRequestHoldService
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
//启动监控服务,每十秒钟检测producer,consumer,filterserver是否正常
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
// 启动FilterServer管理计划任务,每三十秒运行一次启动脚本,保持一定数量的FilterServer运行
// FilterServer会向Broker注册,在Broker和Consumer间起过滤消息的作用,由直接拉取消息变成了:Consumer -> FilterServer -> Broker
// 大概是想减轻Consumer的过滤压力?或者是过滤掉不应该由Consumer看到到敏感消息?
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 如果是Master,启动事务消息检查服务
startProcessorByHa(messageStoreConfig.getBrokerRole());
// 如果是Slave,通过计划任务,每十秒执行一次,通过VIP通道向Master同步配置并更新本地缓存及持久化,包括TopicConfig,ConsumerOffset,DelayOffset及SubscriptionGroupConfig
// 这里只同步配置,CommitLog在HAService同步
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
// 发送心跳,向NameServer广播REGISTER_BROKER的单向请求,包含Topic信息,NameServer据此决定路由信息
this.registerBrokerAll(true, false, true);
}
// 计划任务,每十秒一次,向NameServer注册
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
//启动统计服务
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
//启动快速失败,定时清理超时客户端请求
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
DefaultMessageStore#start
:读取配置,启动核心线程
public void start() throws Exception {
// 启动前先获取锁
lock = lockFile.getChannel().tryLock(0, 1, false);
// 启动失败说明MQ已经正常启动
if (lock == null || lock.isShared() || !lock.isValid()) {
throw new RuntimeException("Lock failed,MQ already started");
}
//加锁刷盘
lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
lockFile.getChannel().force(true);
{
//通过commitLog的getMinOffset方法获取最小偏移量 maxPhysicalPosInLogicQueue的最大偏移量
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
//遍历topic
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
//队列的最大物理偏移量为当前ConsumeQueue的最大物理偏移量
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
//设置分消息到ConsumeQueue和IndexService的起始偏移量
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//启动分发服务
this.reputMessageService.start();
while (true) {
// 不断检查,直到CommitLog的MaxOffset和ReputMessageService的ReputFromOffset差值小于等于零
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
}
this.recoverTopicQueueTable();
}
//默认
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//启用高可用服务
this.haService.start();
//如果Broker是master,启动ScheduleMessageService,处理延迟消息,否则关闭
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
//将ConsumeQueue刷到磁盘
this.flushConsumeQueueService.start();
//启动commitLog服务
this.commitLog.start();
//启动统计服务
this.storeStatsService.start();
//创建${user.home}/store/abort
this.createTempFile();
//添加定时任务
this.addScheduleTask();
this.shutdown = false;
}
CommitLog#getMinOffset
public long getMinOffset() {
//MappedFile类主要是持有文件相关的属性
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
if (mappedFile != null) {
//获取第一个有效文件的offset
if (mappedFile.isAvailable()) {
return mappedFile.getFileFromOffset();
} else {
return this.rollNextFile(mappedFile.getFileFromOffset());
}
}
return -1;
}
reputMessageService.start
的是启动ReputMessageService
类中定义好的run
方法,而其核心是调用doReput
方法:
从CommitLog读取消息发送至ConsumeQueue
private void doReput() {
//如果分发的偏移量小于当前commitLog的最小偏移量
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
//获取数据
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
//存在待reput的数据
if (result != null) {
try {
//获取文件起始偏移量
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
//调用DefaultMessageStore.doDispatch方法
DefaultMessageStore.this.doDispatch(dispatchRequest);
//如果broker是master&&启用了长轮询
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//分发消息到messageArrivingListener,唤醒等待的PullRequest接收消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
// 累计更新Reput的起始字节偏移
this.reputFromOffset += size;
readSize += size;
//如果Broker是Slave,累计
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
// 是BLANK,读到了MappedFile文件尾
} else if (size == 0) {
//切换到新的文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
//检验消息出错 可能是commitlog文件损坏
} else if (!dispatchRequest.isSuccess()) {
//消息 跳过
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
//空白 填充
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
// 释放MappedFile的引用计数
result.release();
}
} else {
doNext = false;
}
}
}
首先看循环结束条件isCommitLogAvailable
方法:
private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
获取当前commitLog
对象的最大偏移量:
public long getMaxOffset() {
return this.mappedFileQueue.getMaxOffset();
}
public long getMaxOffset() {
//获取最后一个文件
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
//返回物理偏移量+读指针位置
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
return 0;
}
CommiLog.getData
方法:
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
//获取单个文件的大小 默认1024*1024*1024 = 1G
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
//根据偏移量获取mappedFile对象
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
//偏移量与文件大小取余
int pos = (int) (offset % mappedFileSize);
//读取数据到缓冲区
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
MappedFileQueue.findMappedFileByOffset
方法:
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
//首先判断offset是否在当前MappedFileQueue的范围内
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
//获取当前偏移量在MappedFileQueue中文件所对应的下表
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
//获取文件
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
//进一步检查
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
//如果上面找到的不是目标文件,遍历所有的文件
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
//没找到 是否返回第一个文件
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
进入循环,调用Commit.checkMessageAndReturnSize
方法封装DispatchRequest
对象:
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
try {
// 1 TOTAL SIZE
int totalSize = byteBuffer.getInt();
// 2 MAGIC CODE
int magicCode = byteBuffer.getInt();
switch (magicCode) {
case MESSAGE_MAGIC_CODE:
break;
case BLANK_MAGIC_CODE:
return new DispatchRequest(0, true /* success */);
default:
log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
return new DispatchRequest(-1, false /* success */);
}
byte[] bytesContent = new byte[totalSize];
int bodyCRC = byteBuffer.getInt();
int queueId = byteBuffer.getInt();
int flag = byteBuffer.getInt();
long queueOffset = byteBuffer.getLong();
long physicOffset = byteBuffer.getLong();
int sysFlag = byteBuffer.getInt();
long bornTimeStamp = byteBuffer.getLong();
ByteBuffer byteBuffer1;
if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
byteBuffer1 = byteBuffer.get(bytesContent, 0, 4 + 4);
} else {
byteBuffer1 = byteBuffer.get(bytesContent, 0, 16 + 4);
}
long storeTimestamp = byteBuffer.getLong();
ByteBuffer byteBuffer2;
if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
byteBuffer2 = byteBuffer.get(bytesContent, 0, 4 + 4);
} else {
byteBuffer2 = byteBuffer.get(bytesContent, 0, 16 + 4);
}
int reconsumeTimes = byteBuffer.getInt();
long preparedTransactionOffset = byteBuffer.getLong();
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
if (readBody) {
byteBuffer.get(bytesContent, 0, bodyLen);
if (checkCRC) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
} else {
byteBuffer.position(byteBuffer.position() + bodyLen);
}
}
byte topicLen = byteBuffer.get();
byteBuffer.get(bytesContent, 0, topicLen);
String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
long tagsCode = 0;
String keys = "";
String uniqKey = null;
short propertiesLength = byteBuffer.getShort();
Map<String, String> propertiesMap = null;
if (propertiesLength > 0) {
byteBuffer.get(bytesContent, 0, propertiesLength);
String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);
propertiesMap = MessageDecoder.string2messageProperties(properties);
keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (tags != null && tags.length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}
// Timing message processing
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);
if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
}
int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
doNothingForDeadCode(flag);
doNothingForDeadCode(bornTimeStamp);
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}
return new DispatchRequest(
topic,
queueId,
physicOffset,
totalSize,
tagsCode,
storeTimestamp,
queueOffset,
keys,
uniqKey,
sysFlag,
preparedTransactionOffset,
propertiesMap
);
} catch (Exception e) {
}
return new DispatchRequest(-1, false /* success */);
}
上面这个方法的功能是从bugger
中读取一条消息,消息的结构如下:
1 | totalSize(4Byte) | 消息大小 |
---|---|---|
2 | magicCode(4) | 设置为daa320a7 |
3 | bodyCRC(4) | 当broker重启recover时会校验 |
4 | queueId(4) | 消息对应的consumeQueueId |
5 | flag(4) | rocketmq不做处理,只存储后透传 |
6 | queueOffset(8) | 消息在consumeQueue中的偏移量 |
7 | physicalOffset(8) | 消息在commitlog中的偏移量 |
8 | sysFlg(4) | 事务标示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE |
9 | bronTimestamp(8) | 消息产生端(producer)的时间戳 |
10 | bronHost(8) | 消息产生端(producer)地址(address:port) |
11 | storeTimestamp(8) | 消息在broker存储时间 |
12 | storeHostAddress(8) | 消息存储到broker的地址(address:port) |
13 | reconsumeTimes(4) | 消息重试次数 |
14 | preparedTransactionOffset(8) | 事务消息的物理偏移量 |
15 | bodyLength(4) | 消息长度,最长不超过4MB |
16 | body(body length Bytes) | 消息体内容 |
17 | topicLength(1) | 主题长度,最长不超过255Byte |
18 | topic(topic length Bytes) | 主题内容 |
19 | propertiesLength(2) | 消息属性长度,最长不超过65535Bytes |
20 | properties(properties length Bytes) | 消息属性内容 |
然后调用ReputMessageService.doDispatch
方法:
public void doDispatch(DispatchRequest req) {
//遍历分发器列表
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
//执行分发
dispatcher.dispatch(req);
}
}
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
//获取事务类型
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE://非事务消息
case MessageSysFlag.TRANSACTION_COMMIT_TYPE://事务消息已提交
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
dispatcherList
是在DefaultMessageStore
初始化时创建的:
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
先看CommitLogDispatcherBuildConsumeQueue
中的实现,当满足TRANSACTION_NOT_TYPE
,TRANSACTION_COMMIT_TYPE
这两个条件时,调用DefaultMessageStore.putMessagePositionInfo
方法:
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//根据topic和queueid查找ConsumeQueue,不存在则直接创建
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
调用ConsumeQueue.putMessagePositionInfoWrapper
方法:
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
//是否额外写入信息 默认false
if (isExtWriteEnable()) {
//计算扩展的偏移量
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
//判断是否可以扩展
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
//调用putMessagePositionInfo
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
//写入成功
if (result) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
//设置StoreCheckpoint为消息存储时间
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
//写入失败 重试
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me 重试无效 记录日志
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
ConsumeQueue.putMessagePositionInfo
:
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
//判断是否已经处理过了
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
//充值索引
this.byteBufferIndex.flip();
//ConsummeQueue单元大小,20个字节
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
//消息在CommitLog的偏移量 8个字节
this.byteBufferIndex.putLong(offset);
//消息的大小 4个字节
this.byteBufferIndex.putInt(size);
//
this.byteBufferIndex.putLong(tagsCode);
//预期的队列偏移量 cqOffset:记录的消息数量 CQ_STORE_UNIT_SIZE:记录一条消息所占用的内存
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//获取存储消息信息的mappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
//如果是第一个文件 且mappedFile还没有被写过
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
//当前的实际逻辑偏移量
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
//期望的逻辑偏移量小于实际的逻辑偏移量 表明重复写入直接返回
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
//如果期望的逻辑偏移量不等于实际的逻辑偏移量 可能是个bug
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
//更新maxPhysicOffset,并将暂存在ByteBuffer中的消息偏移信息,追加到MappedFile中
this.maxPhysicOffset = offset + size;
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
MappedFile.appendMessage
:
public boolean appendMessage(final byte[] data) {
//获取当前写指针的位置
int currentPos = this.wrotePosition.get();
//追加数据
if ((currentPos + data.length) <= this.fileSize) {
try {
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
//移动写指针
this.wrotePosition.addAndGet(data.length);
return true;
}
return false;
}
CommitLogDispatcherBuildIndex.dispatch
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
//IndexFile是另一种形式的索引文件 可以根据消息的id等查询消息
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
回到DefaultMessageStore.start
方法,启动一个循环直到消息分发完毕:
public long dispatchBehindBytes() {
return this.reputMessageService.behind();
}
public long behind() {
//commitLog最大偏移减去当前偏移
return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}
接着调用DefaultMessageStore.recoverTopicQueueTable
方法:
//消费队列
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
public void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQueue());
//矫正最小偏移量
logic.correctMinOffset(minPhyOffset);
}
}
this.commitLog.setTopicQueueTable(table);
}
然后在非Deleger
模式下会启用高可用服务.
接着开始执行flushConsumeQueueService.start()
方法,启动刷盘线程,:
private void doFlush(int retryTimes) {
// 默认2
int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
// 关闭时传入
if (retryTimes == RETRY_TIMES_OVER) {
// 强制Flush
flushConsumeQueueLeastPages = 0;
}
long logicsMsgTimestamp = 0;
// 默认60秒
int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
long currentTimeMillis = System.currentTimeMillis();
// 离上次刷盘时间超出了间隔
if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
// 更新刷盘时间戳
this.lastFlushTimestamp = currentTimeMillis;
// 强制Flush
flushConsumeQueueLeastPages = 0;
// 更新检查点
logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
}
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// 遍历Topic
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
// 遍历ConsumeQueue
for (ConsumeQueue cq : maps.values()) {
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
// 刷盘
result = cq.flush(flushConsumeQueueLeastPages);
}
}
}
// 如果是强制刷盘
if (0 == flushConsumeQueueLeastPages) {
if (logicsMsgTimestamp > 0) {
DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
}
// 检查点也要刷盘
DefaultMessageStore.this.getStoreCheckpoint().flush();
}
}
然后调用ConsumeQueue.flush
方法:
public boolean flush(final int flushLeastPages) {
//队列映射文件刷新
boolean result = this.mappedFileQueue.flush(flushLeastPages);
//存在consume_ext文件 也要刷盘
if (isExtReadEnable()) {
result = result & this.consumeQueueExt.flush(flushLeastPages);
}
return result;
}
MappedFileQueue.flush
方法:
public boolean flush(final int flushLeastPages) {
boolean result = true;
//根据偏移量找到MappedFile对象
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
//更新flushedWhere
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}
return result;
}
调用MappedFile.flush
:
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
//获取读指针偏移量
int value = getReadPosition();
try {
//追加数据到fileChannel或者mappedByteBuffer
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
刷盘机制
回到DefaultMessageStore.start()
方法,ConsumeQueue
刷盘启动后,调用CommitLog.start
方法,启动CommitLog
:
public void start() {
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YXqAiAgc-1624718853632)(E:\源码\学习笔记\RocketMQ学习随笔\img\同步刷盘和异步刷盘.png)]
FlushCommitLogService
拥有两个实现类GroupCommitService
和FlushRealTimeService
分别对应了同步刷盘和异步刷盘.
提交刷盘请求
GroupCommitService
中有两个List
,用于刷盘时解耦:
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
public static class GroupCommitRequest {
private final long nextOffset;
private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
private final long startTimestamp = System.currentTimeMillis();
private long timeoutMillis = Long.MAX_VALUE;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
this.timeoutMillis = timeoutMillis;
}
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
public long getNextOffset() {
return nextOffset;
}
public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
this.flushOKFuture.complete(putMessageStatus);
}
public CompletableFuture<PutMessageStatus> future() {
return flushOKFuture;
}
}
GroupCommitRequest
对象在CommitLog.handleDiskFlush
方法中被创建,当Broker
接收到Producer
的消息后会调用此方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//获取刷盘服务
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
//封装刷盘请求 获取写指针位置以及字节数计算出nextOffset
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
//提交刷盘请求
service.putRequest(request);
//获取请求结果
CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
PutMessageStatus flushStatus = null;
try {
//阻塞五秒,等待刷盘结束
flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//flushOK=false;
}
if (flushStatus != PutMessageStatus.PUT_OK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// 异步刷盘
else {
//未开启对堆内存
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
//开启外堆内存
commitLogService.wakeup();
}
}
}
调用GroupCommitService.putRequest
方法:
public synchronized void putRequest(final GroupCommitRequest request) {
//加锁添加请求
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
//唤醒线程
this.wakeup();
}
public void wakeup() {
//原子操作 将hasNotified的值从false改为true
if (hasNotified.compareAndSet(false, true)) {
//计数器减一
//protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
//唤醒刷盘线程
waitPoint.countDown(); // notify
}
}
同步刷盘
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Sm7Sb1IZ-1624718853635)(E:\源码\学习笔记\RocketMQ学习随笔\img\同步刷盘流程.png)]
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
GroupCommitService.run()
:
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//调用waitForRunning方法
this.waitForRunning(10);
//调用doCommit方法
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
异步刷盘
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-K2dRDrWF-1624718853636)(E:\源码\学习笔记\RocketMQ学习随笔\img\异步刷盘流程.png)]
开启transientStorePoolEnable后异步刷盘步骤:
- 将消息直接追加到ByteBuffer(堆外内存)
- CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中
- MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向后移动
- commit操作成功返回,将committedPosition位置恢复
- FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
-
未开启堆外内存
FlushRealTimeService.run
:public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { //是否使用定时刷盘 boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); //间隔时间 默认500毫秒 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //刷盘页数 默认4页 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); //彻底刷盘时间间隔 默认10s int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); //打印刷盘进度 boolean printFlushProgress = false; // Print flush progress 获取系统当前时间 long currentTimeMillis = System.currentTimeMillis(); //系统当前时间大于等于最后一次刷盘时间点+彻底刷盘时间间隔 则进行一次彻底刷盘 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { //启用定时刷盘 if (flushCommitLogTimed) { //休眠500毫秒 Thread.sleep(interval); } else { //调用waitForRunning方法 刷盘线程是否被唤醒 进行500毫秒的阻塞 this.waitForRunning(interval); } //打印刷盘进程 if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); //调用flush方法 开始刷盘 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { //更新刷盘时间点 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit //正常关闭的情况下 完成刷盘操作 boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }
与同步刷盘不同的是,异步刷盘会传入页数:
public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
MappedFile.flush
方法中:public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { ... }
调用
isAbleToFlush
方法,判断是否进行刷盘:private boolean isAbleToFlush(final int flushLeastPages) { int flush = this.flushedPosition.get(); int write = getReadPosition(); //写指针的位置==当前文件的大小 if (this.isFull()) { return true; } //最少刷盘页数大于0(同步刷盘不存在这种情况) if (flushLeastPages > 0) { //异步刷盘必须满足大于等于最小刷盘页数 //public static final int OS_PAGE_SIZE = 1024 * 4; // 即在默认情况下 每次刷盘的数据量大于等于 4 * 4K return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } return write > flush; }
-
开启堆外内存
CommitRealTimeService.run
public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { //间隔时间,默认200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); //一次提交的至少页数 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); //两次真实提交的最大间隔,默认200ms int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); //上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataThoroughInterval参数,直接提交 long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { //执行提交操作,将待提交数据提交到物理文件的内存映射区 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. //唤醒刷盘线程 flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } //调用waitForRunning方法 this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }
刷盘操作
调用MappedFile.commit
方法:
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//开启堆外内存的清空下 writeBuffer!= null
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
//判断是否满足提交条件
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
//所有的数据都被写入 回收writeBuffer
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
MappedFile.isAbleToCommit
private boolean isAbleToFlush(final int flushLeastPages) {
//刷盘位置的指针
int flush = this.flushedPosition.get();
//有效数据的最大指针
int write = getReadPosition();
if (this.isFull()) {
return true;
}
if (flushLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;
}
MappedFile.commit0
:
//将writerBuffer中的数据写入fileChannel中 更新指针的位置
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - lastCommittedPosition > commitLeastPages) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
主从复制HA
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
(1)同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。
(2)异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
(3)配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
Master
HAService#Start
服务启动:
public void start() throws Exception {
// 打开ServerSocketChannel,监听端口,注册OP_ACCEPT事件
this.acceptSocketService.beginAccept();
// 启动AcceptSocketService,接收Slave的连接请求
this.acceptSocketService.start();
// 启动GroupTransferService,处理同步Master情况下的主从同步,只是起检查通知的作用
this.groupTransferService.start();
// 启动HAClient,主动连接Master
this.haClient.start();
}
HAService$AcceptSocketService#beginAccept
:初始化netty服务端设置
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
HAService$AcceptSocketService#run
启动服务端监听线程
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//获取就绪事件
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
//遍历事件
for (SelectionKey k : selected) {
//连接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
//接收连接请求 返回slave的SocketChannel
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
//生成高可用连接对象
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
HAConnection#start
高可用连接启动
public void start() {
//处理slave的读请求
this.readSocketService.start();
//处理slave的写请求
this.writeSocketService.start();
}
HAConnection$ReadSocketService#run ` :
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//事件处理
boolean ok = this.processReadEvent();
//处理失败 关闭
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
//计算间隔事件
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
//超时未收到slave的请求 关闭
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
//关闭后续处理 资源回收逻辑
this.makeStop();
writeSocketService.makeStop();
haService.removeConnection(HAConnection.this);
HAConnection.this.haService.getConnectionCount().decrementAndGet();
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
读请求处理
HAConnection$ReadSocketService#processReadEvent
处理读事件:
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
//缓冲已读完
if (!this.byteBufferRead.hasRemaining()) {
//重置索引
this.byteBufferRead.flip();
this.processPosition = 0;
}
//直到缓冲读完
while (this.byteBufferRead.hasRemaining()) {
try {
// 从SocketChannel读取数据到读缓冲
int readSize = this.socketChannel.read(this.byteBufferRead);
//读到了数据
if (readSize > 0) {
readSizeZeroTimes = 0;
//更新时间戳
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 读缓冲的写索引 - 上次处理位置 >= 8字节的Slave的偏移说明收到了足够的数据包
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
//可能有多个数据包 从第一个未处理的数据包开始
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 从读缓冲读取Slave的CommitLog同步偏移,8字节
long readOffset = this.byteBufferRead.getLong(pos - 8);
//更新处理未知
this.processPosition = pos;
//应该响应的slave的CommitLog偏移
HAConnection.this.slaveAckOffset = readOffset;
//第一次接收到请求
if (HAConnection.this.slaveRequestOffset < 0) {
//更新slave发送请求时CommitLog的偏移
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
//根据Slave响应的同步偏移进度,通知HAService的GroupTransferService判断同步进度,是否解除Broker的Slave同步请求的阻塞
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
//最多重试三次
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
HAService#notifyTransferSome
: 更新已同步的偏移量
public void notifyTransferSome(final long offset) {
// Slave回复确认的CommitLog偏移 > 上次Slave回复确认的CommitLog最大偏移
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 尝试更新偏移
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
// 通知已经传输了数据
this.groupTransferService.notifyTransferSome();
break;
} else {
// 再次获取当前已向Slave传输的CommitLog最大偏移
value = this.push2SlaveMaxOffset.get();
}
}
}
写请求处理
HAConnection$WriteSocketService#run
:
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
//初始值=-1 未收到slave的同步请求
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 初始值
if (-1 == this.nextTransferFromWhere) {
//已同步偏移量为0 之前从未同步过
if (0 == HAConnection.this.slaveRequestOffset) {
// 获取当前Master的CommitLog的最大物理偏移
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
// 截掉后面的不够单个CommitLog大小的偏移
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
// 更新下次传输的偏移起点为最后一个CommitLog文件的起始偏移
this.nextTransferFromWhere = masterOffset;
} else {
//更新下次传输的偏移起点为Slave发送请求时的偏移设置
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 上次向Slave写完了数据
if (this.lastWriteOver) {
// 写间隔时间
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
//间隔时间大于设置的时间 向slave写
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
//心跳包
// 重置Header的写索引
this.byteBufferHeader.position(0);
//Header大小 12个字节
this.byteBufferHeader.limit(headerSize);
//偏移量 8字节
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
//消息体大小 4字节
this.byteBufferHeader.putInt(0);
//切换为读模式
this.byteBufferHeader.flip();
// 向Slave传输数据/心跳
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
//未写完则继续写入
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 根据起始偏移获取CommitLog数据
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
// CommitLog数据的大小
int size = selectResult.getSize();
//大于传输限制大小 截断
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
//本次传输数据起始偏移
long thisOffset = this.nextTransferFromWhere;
//下次传输数据起始偏移
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header 封装消息头
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
//向slave传输数据
this.lastWriteOver = this.transferData();
} else {
// 等待CommitLog有消息唤醒或超时
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
//服务关闭 资源释放
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
HAConnection$WriteSocketService#transferData
向slave传输数据:
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
//向SocketChannel写入header 直到写完
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
//更新写入时间戳
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
//最多重试三次
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
//没有数据就不用写消息体了
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body // 如果Header已经读完,写消息体
if (!this.byteBufferHeader.hasRemaining()) {
// 直到要传输的CommitLog数据全部读完
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
//做多重试三次
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
//消息头和消息体全部读完
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
//消息体已读完
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
//释放
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
Slave
HAService$HAClient#run
客户端启动
public void run() {
while (!this.isStopped()) {
try {
// 连接Master成功
if (this.connectMaster()) {
// 离上次向Master上报进度的间隔到期了,默认5s
if (this.isTimeToReportOffset()) {
// 向Master发送当前Slave的CommitLog的最大偏移,同时也是心跳
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
// 阻塞超时至可读事件就绪
this.selector.select(1000);
// 处理读事件,即Master传输的CommitLog数据
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 如果当前Slave的CommitLog最大物理偏移大于上次上报的偏移,说明本次同步成功,继续向Master上报
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// 上次向Master发送数据的时间间隔
long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 认为连接超时,关闭连接
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
this.closeMaster();
}
} else {
// 唤醒后再尝试
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
this.waitForRunning(1000 * 5);
}
}
}
HAService$HAClient#connectMaster
连接master
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 连接Master,返回SocketAddress
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 关心可读事件
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 当前Slave的CommitLog最大物理偏移
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
写请求处理
HAService$HAClient#reportSlaveMaxOffset
上报当前CommitLog最大偏移量:
private boolean reportSlaveMaxOffset(final long maxOffset) {
//重置写索引
this.reportOffset.position(0);
this.reportOffset.limit(8);
//写入最大偏移量
this.reportOffset.putLong(maxOffset);
//重置读索引
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
//上报偏移量
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
//更新时间戳
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
//返回缓冲是否发完
return !this.reportOffset.hasRemaining();
}
读请求处理
HAService$HAClient#processReadEvent
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 读缓冲可写
while (this.byteBufferRead.hasRemaining()) {
try {
// 将数据从SocketChannel写入读缓冲
int readSize = this.socketChannel.read(this.byteBufferRead);
// 读取到了数据
if (readSize > 0) {
readSizeZeroTimes = 0;
// 分发读请求
boolean result = this.dispatchReadRequest();
if (!result) {
return false;
}
}
// 没读到数据
else if (readSize == 0) {
// 重试三次没读到数据就退出
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
return false;
}
} catch (IOException e) {
return false;
}
}
return true;
}
HAService$HAClient#dispatchReadRequest
分发读请求:
private boolean dispatchReadRequest() {
//消息头大小 12个字节
final int msgHeaderSize = 8 + 4; // phyoffset + size
//起始偏移量
int readSocketPos = this.byteBufferRead.position();
while (true) {
dispatchPosition初始为0,可理解为当前消息包的在读缓冲的起始偏移,因为多条消息包是累加到读缓冲上的,而不是读完一条清一条
// 上条消息处理后,读缓冲有新的数据进来
int diff = this.byteBufferRead.position() - this.dispatchPosition;
// 包含了一个完整的Header
if (diff >= msgHeaderSize) {
// 从读缓冲的指定位置读取8字节的Master传输的CommitLog片段的起始物理偏移
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
//获取消息体大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
// Slave当前CommitLog的最大物理偏移
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
// 增量同步,如果Slave的CommitLog最大物理偏移和本次Master同步的CommitLog片段起始偏移不一致
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
//发生异常 关闭连接
return false;
}
}
// 包含了一个完整的消息头和消息体
if (diff >= (msgHeaderSize + bodySize)) {
//初始化数组 用于存储消息体
byte[] bodyData = new byte[bodySize];
// 设置读索引
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
//获取消息体
this.byteBufferRead.get(bodyData);
// 把Master同步过来的CommitLog添加到Slave的CommitLog
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
// 恢复读缓冲的写索引
this.byteBufferRead.position(readSocketPos);
// 更新处理偏移
this.dispatchPosition += msgHeaderSize + bodySize;
// 如果当前Slave的CommitLog最大物理偏移大于上次上报的偏移,说明本次同步成功,继续向Master上报
if (!reportSlaveMaxOffsetPlus()) {
//发送失败 关闭连接
return false;
}
continue;
}
}
// 读缓冲写满了
if (!this.byteBufferRead.hasRemaining()) {
//重新分配空间
this.reallocateByteBuffer();
}
break;
}
return true;
}
HAService$HAClient#reportSlaveMaxOffsetPlus
同步成功后向Master上报:
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 当前Slave的CommitLog的最大物理偏移 > 上次向Master上报的偏移
if (currentPhyOffset > this.currentReportedOffset) {
// 说明本次同步成功,更新下次向Master上报的偏移
this.currentReportedOffset = currentPhyOffset;
// 继续上报Master
result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
return result;
}
主从同步复制
CommitLog#putMessage
消息接收
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 消息写入CommitLog的MappedFile,略
// 处理刷盘
handleDiskFlush(result, putMessageResult, msg);
// 处理高可用,如果Broker是SYNC_MASTER,则等SLAVE接收到数据后才返回,如果是ASYNC_MASTER,交给HAService线程执行同步
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
CommitLog#handleHA
同步情况下处理高可用
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//只有设置主从同步才会执行
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
// 如果要等待存储结果是否OK
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
// 存在Slave连接 && Slave和Master进度差不超过1024 * 1024 * 256 = 256MB
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
// 创建提交请求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
// 加入HAService的GroupTransferService的队列
service.putRequest(request);
// 唤醒HAConnection的WriteSocketService线程,向Slave同步新的CommitLog
service.getWaitNotifyObject().wakeupAll();
PutMessageStatus replicaStatus = null;
try {
// 阻塞等同步完成被HAService的GroupTransferService线程唤醒或超时
replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
}
//同步失败 通知生产者 同步slave超时
if (replicaStatus != PutMessageStatus.PUT_OK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem 没有slave 通知生产者slave不可用
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
HAService$GroupTransferService#putRequest
提交同步复制请求
/**
* 请求先提交到写队列,每次处理前,先交换读写队列,然后从读队列获取请求,提交后再清空读队列,避免多线程操作问题
* CommitLog的内部类GroupCommitService也是同样的处理
*/
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
// 获取写队列的同步锁,因为swapRequests方法会交换写队列和读队列
synchronized (this.requestsWrite) {
// 添加到写队列
this.requestsWrite.add(request);
}
// 新请求加入时,没唤醒就唤醒GroupTransferService线程
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
HAService$GroupTransferService#run
public void run() {
while (!this.isStopped()) {
try {
// 如果唤醒标志为已唤醒(有请求加入),重置标记为未唤醒,并交换读写队列
// 如果唤醒标记为未唤醒,则阻塞超时等待唤醒,最后重置标记为未唤醒,并交换读写队列
this.waitForRunning(10);
// 执行等待传输
this.doWaitTransfer();
} catch (Exception e) {
}
}
}
HAService$GroupTransferService#doWaitTransfer
:
private void doWaitTransfer() {
// 获取读队列的同步锁
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
// 遍历Slave的同步请求
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// Slave回复确认的CommitLog最大偏移 > 请求的消息在CommitLog的结束位置
// 说明同步传输成功
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
// 重试5次或同步传输成功
for (int i = 0; !transferOK && i < 5; i++) {
// 先阻塞1秒,或者HAConnection的ReadSocketService线程收到Slave的同步偏移确认时唤醒
this.notifyTransferObject.waitForRunning(1000);
// 再检查一次
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
// ...
// 唤醒提交同步Slave请求的Broker线程,是否同步成功
req.wakeupCustomer(transferOK);
}
// 处理完就清空读队列
this.requestsRead.clear();
}
}
}
HAConnection$ReadSocketService#processReadEvent
private boolean processReadEvent() {
// ...
// 直到读完读缓冲
while (this.byteBufferRead.hasRemaining()) {
try {
// 从SocketChannel读取数据到读缓冲
int readSize = this.socketChannel.read(this.byteBufferRead);
// 读取到了数据
if (readSize > 0) {
// ...
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// ...
// 从读缓冲读取Slave的CommitLog同步偏移,8字节
long readOffset = this.byteBufferRead.getLong(pos - 8);
// ...
HAConnection.this.slaveAckOffset = readOffset;
// ...
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
// ...
} else {
return false;
}
} catch (IOException e) {
return false;
}
}
return true;
}
HAService#notifyTransferSome
public void notifyTransferSome(final long offset) {
// Slave回复确认的CommitLog偏移 > 上次Slave回复确认的CommitLog最大偏移
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 尝试更新偏移
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
// 通知已经传输了数据
this.groupTransferService.notifyTransferSome();
break;
} else {
// 再次获取当前已向Slave传输的CommitLog最大偏移
value = this.push2SlaveMaxOffset.get();
}
}
}
HAService$GroupTransferService#notifyTransferSome
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}