RocketMQ——NameServer和Broker
文章目录
NameServer
NameServer功能
NameServer负责维护Producer和Consumer的配置信息、状态信息,并且协调各个角色的协同执行。通过NameServer各个角色可以了解到集群的整体信息,并且他们会定期向NameServer上报状态。
在 org.apache.rocketmq.namesrv.routeinfo 包下的RouteInfoManager类中,定义了许多变量,通过5个HashMap变量存储和维护集群的状态信息
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
- BROKER_CHANNEL_EXPIRED_TIME定义了Broker向NameServer的汇报超时时长,默认是两分钟,如果超过两分钟则关闭Broker和NameServer连接通道,并将该Broker从brokerLiveTable中移除
- HashMap<String/* topic */, List> topicQueueTable。该变量存储了所有的Topic,并以Topic的名字作为key,value是这个Topic下的消息列表,列表的长度即Master Broker的数量
public class QueueData implements Comparable<QueueData> {
private String brokerName;//broker名
private int readQueueNums;//读取queue数量
private int writeQueueNums;//写入queue数量
private int perm;
private int topicSynFlag;//同步标识
}
- HashMap<String/* brokerName */, BrokerData> brokerAddrTable。变量存储同一个Broker的数据信息。以brokerName作为key,BrokerData作为value。在BrokerData类中包含了cluster属性、brokerName属性和brokerAddrs属性。由于在一个集群中,同一个brokerName中可能包含多台机器(一个Master和多个Slave),因此在BrokerData中用HashMap存储这些机器的brokerId和地址
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}
- HashMap<String/* clusterName /, Set<String/ brokerName */>> clusterAddrTable。这个变量存储了在集群下对应的brokerName有哪些
- HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable。该变量存储每台broker机器对应的实时状态,通过上次更新时间戳(lastUpdateTimestamp)来计算Broker更新是否超时,判断该Broker是否失效。
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
}
- HashMap<String/* brokerAddr /, List/ Filter Server */> filterServerTable。该变量存储每台Broker关联的过滤服务器。
为什么不用zookeeper?
zookeeper是apache旗下用于分布式服务协调的开源软件,并且拥有选举机制,能够在master宕机时从slave中通过选举机制选出一台slave变成master。但是在NameServer的设计中,MasterBroker中没有一台拥有全部的Topic信息,消息分布平均,失去选举机制意义。其次,NameServer仅仅被用于存储集群的配置信息、元数据信息,不需要太复杂的功能,所以放弃重量级的zookeeper选择轻量级的NameServer。
Broker
Broker消息存储
在Broker中存在着两个角色:CommitLog和ConsumeQueue。
- CommitLog:实际储存消息的物理文件
- ConsumeQueue:消费队列,队列中存储的是待消费消息的地址偏移量,类似于索引
通过CommitLog和ConsumeQueue相互配合完成消息的存储。
消息的存储地址在配置文件中通过 storePathRootDir 进行配置。在存储路径下的存储目录结构如下
/home
└── rocketmq
├── store-a
│ ├── abort
│ ├── checkpoint
│ ├── commitlog
│ │ └── 00000000000000000000
│ ├── config
│ │ ├── consumerFilter.json
│ │ ├── consumerFilter.json.bak
│ │ ├── consumerOffset.json
│ │ ├── consumerOffset.json.bak
│ │ ├── delayOffset.json
│ │ ├── delayOffset.json.bak
│ │ ├── subscriptionGroup.json
│ │ ├── topics.json
│ │ └── topics.json.bak
│ ├── consumequeue
│ │ ├── OFFSET_MOVED_EVENT
│ │ │ └── 0
│ │ │ └── 00000000000000000000
│ │ ├── OrderTest
│ │ │ ├── 0
│ │ │ │ └── 00000000000000000000
│ │ │ ├── 1
│ │ │ │ └── 00000000000000000000
│ │ │ ├── 2
│ │ │ │ └── 00000000000000000000
│ │ │ └── 3
│ │ │ └── 00000000000000000000
│ ├── index
│ │ └── 20190722164917530
│ └── lock
可以看到在存储目录中有Commit Log、ConsumeQueue、index等, ConsumeQueue 目录中存储了所有的Topic信息,以及index索引文件。CommitLog采用顺序写、随机读的方式加快写入效率,并且由于ConsumeQueue中仅存储20字节的偏移量、Tags等,所以能够存进内存当中,读取速度也很快。
从生产者读取到消息后会将消息存储到本地磁盘,RocketMQ提供两个存储方式:同步刷盘和异步刷盘。
- 同步刷盘:同步刷盘方式在Broker读取到生产者发送的消息后立即从内存写入磁盘,并且返回成功状态,生产者接收到成功状态时消息已经被写入磁盘。
- 异步刷盘:异步刷盘的方式将收到的信息在内存中存储,并且返回成功状态,生产者收到成功状态时消息还在内存中并未写入磁盘。当内存中消息堆叠到阈值时,批量将消息写入磁盘。
Broker的HA
Broker的集群有master和slave两种角色,通过在配置文件中配置实现。Broker集群中master和slave区别如下:
- master配置文件brokerId为0;slave为其他值
- master支持读和写;slave只能读
- 生产者只能和master连接写消息;消费者可以连接master也可以链接slave读消息
RocketMQ并没有自动将slave转成master的机制,因此当master资源不足或失效时,需要手动修改配置文件,利用新的配置文件重新启动Broker
在Broker组中,需要将master中的消息复制各个slave中以达到消息同步的目的。RocketMQ提供两种消息复制的机制:同步复制和异步复制。
- 同步复制:等到master和slave均写入成功才返回成功状态。该方式如果master出现故障数据易恢复,但是吞吐量低
- 异步复制:只要master写入成功就返回成功状态。该方式延迟低、吞吐量高,但是不知道slave是否成功写入master故障消息就可能会丢失