RocketMQ——NameServer和Broker

RocketMQ——NameServer和Broker

文章目录


NameServer

NameServer功能

NameServer负责维护Producer和Consumer的配置信息、状态信息,并且协调各个角色的协同执行。通过NameServer各个角色可以了解到集群的整体信息,并且他们会定期向NameServer上报状态。
在 org.apache.rocketmq.namesrv.routeinfo 包下的RouteInfoManager类中,定义了许多变量,通过5个HashMap变量存储和维护集群的状态信息

    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
  • BROKER_CHANNEL_EXPIRED_TIME定义了Broker向NameServer的汇报超时时长,默认是两分钟,如果超过两分钟则关闭Broker和NameServer连接通道,并将该Broker从brokerLiveTable中移除
  • HashMap<String/* topic */, List> topicQueueTable。该变量存储了所有的Topic,并以Topic的名字作为key,value是这个Topic下的消息列表,列表的长度即Master Broker的数量
public class QueueData implements Comparable<QueueData> {
    private String brokerName;//broker名
    private int readQueueNums;//读取queue数量
    private int writeQueueNums;//写入queue数量
    private int perm;
    private int topicSynFlag;//同步标识
}
  • HashMap<String/* brokerName */, BrokerData> brokerAddrTable。变量存储同一个Broker的数据信息。以brokerName作为key,BrokerData作为value。在BrokerData类中包含了cluster属性、brokerName属性和brokerAddrs属性。由于在一个集群中,同一个brokerName中可能包含多台机器(一个Master和多个Slave),因此在BrokerData中用HashMap存储这些机器的brokerId和地址
public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    }
  • HashMap<String/* clusterName /, Set<String/ brokerName */>> clusterAddrTable。这个变量存储了在集群下对应的brokerName有哪些
  • HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable。该变量存储每台broker机器对应的实时状态,通过上次更新时间戳(lastUpdateTimestamp)来计算Broker更新是否超时,判断该Broker是否失效。
class BrokerLiveInfo {
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;
    }
  • HashMap<String/* brokerAddr /, List/ Filter Server */> filterServerTable。该变量存储每台Broker关联的过滤服务器。

为什么不用zookeeper?

zookeeper是apache旗下用于分布式服务协调的开源软件,并且拥有选举机制,能够在master宕机时从slave中通过选举机制选出一台slave变成master。但是在NameServer的设计中,MasterBroker中没有一台拥有全部的Topic信息,消息分布平均,失去选举机制意义。其次,NameServer仅仅被用于存储集群的配置信息、元数据信息,不需要太复杂的功能,所以放弃重量级的zookeeper选择轻量级的NameServer。

Broker

Broker消息存储

在Broker中存在着两个角色:CommitLog和ConsumeQueue。

  • CommitLog:实际储存消息的物理文件
  • ConsumeQueue:消费队列,队列中存储的是待消费消息的地址偏移量,类似于索引

通过CommitLog和ConsumeQueue相互配合完成消息的存储。
RocketMQ——NameServer和Broker

消息的存储地址在配置文件中通过 storePathRootDir 进行配置。在存储路径下的存储目录结构如下

/home
└── rocketmq
    ├── store-a
    │   ├── abort
    │   ├── checkpoint
    │   ├── commitlog
    │   │   └── 00000000000000000000
    │   ├── config
    │   │   ├── consumerFilter.json
    │   │   ├── consumerFilter.json.bak
    │   │   ├── consumerOffset.json
    │   │   ├── consumerOffset.json.bak
    │   │   ├── delayOffset.json
    │   │   ├── delayOffset.json.bak
    │   │   ├── subscriptionGroup.json
    │   │   ├── topics.json
    │   │   └── topics.json.bak
    │   ├── consumequeue
    │   │   ├── OFFSET_MOVED_EVENT
    │   │   │   └── 0
    │   │   │       └── 00000000000000000000
    │   │   ├── OrderTest
    │   │   │   ├── 0
    │   │   │   │   └── 00000000000000000000
    │   │   │   ├── 1
    │   │   │   │   └── 00000000000000000000
    │   │   │   ├── 2
    │   │   │   │   └── 00000000000000000000
    │   │   │   └── 3
    │   │   │       └── 00000000000000000000
    │   ├── index
    │   │   └── 20190722164917530
    │   └── lock

可以看到在存储目录中有Commit Log、ConsumeQueue、index等, ConsumeQueue 目录中存储了所有的Topic信息,以及index索引文件。CommitLog采用顺序写、随机读的方式加快写入效率,并且由于ConsumeQueue中仅存储20字节的偏移量、Tags等,所以能够存进内存当中,读取速度也很快。
从生产者读取到消息后会将消息存储到本地磁盘,RocketMQ提供两个存储方式:同步刷盘和异步刷盘。

  • 同步刷盘:同步刷盘方式在Broker读取到生产者发送的消息后立即从内存写入磁盘,并且返回成功状态,生产者接收到成功状态时消息已经被写入磁盘。
  • 异步刷盘:异步刷盘的方式将收到的信息在内存中存储,并且返回成功状态,生产者收到成功状态时消息还在内存中并未写入磁盘。当内存中消息堆叠到阈值时,批量将消息写入磁盘。

Broker的HA

Broker的集群有master和slave两种角色,通过在配置文件中配置实现。Broker集群中master和slave区别如下:

  • master配置文件brokerId为0;slave为其他值
  • master支持读和写;slave只能读
  • 生产者只能和master连接写消息;消费者可以连接master也可以链接slave读消息
    RocketMQ并没有自动将slave转成master的机制,因此当master资源不足或失效时,需要手动修改配置文件,利用新的配置文件重新启动Broker

在Broker组中,需要将master中的消息复制各个slave中以达到消息同步的目的。RocketMQ提供两种消息复制的机制:同步复制和异步复制。

  • 同步复制:等到master和slave均写入成功才返回成功状态。该方式如果master出现故障数据易恢复,但是吞吐量低
  • 异步复制:只要master写入成功就返回成功状态。该方式延迟低、吞吐量高,但是不知道slave是否成功写入master故障消息就可能会丢失
上一篇:分布式文件系统之Tfs是什么?


下一篇:js中自己实现bind函数的方式