RocketMq学习记录 (一) ----基础知识梳理

文章目录

引言

之前对Kafka有了一轮浅尝辄止的学习,但是Kafka相对薄弱的业务功能在业务开发中较为暗淡,更大的使用场景在于需要高吞吐的场景。恰巧目前任职公司jms选型有RocketMq,遂对此进行学习。

RocketMq的单机吞吐率、HA方案以及开发语言、提供的特异性功能均是非常适合在Java业务系统中进行架构使用的。


一)RocketMq架构概述

以下是RocketMq官方提供的文档表述,可以看到其中跟本体kafka确实有很多类似的设计理念在。

RocketMq学习记录 (一) ----基础知识梳理
RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:

    • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
    • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
      (与Kafka中对于zk的使用类似)
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

    1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
    3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

RocketMq学习记录 (一) ----基础知识梳理

结合部署架构图,描述集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

二)RocketMq中的NameServer、BrokerServer说明:

1) NameServer

  • NameServer支持集群化部署 ,提供HA。
  • Broker将自身信息注册至NameServer

需要说明的是假如有10台broker , 两台NameServer ; 并不是不靠谱的5台broker注册至NameServer1 , 另外5台注册到NameServer2。
而是每台Broker启动都得向所有的NameServer进行注册。

  • 生产者、消费者会主动去NamsServer拉取Broker信息,类似于Kafka中获取MetaData。
  • NameServer依靠心跳机制与Broker进行探活 , Broker默认大概是30s进行一次心跳,告诉每一个NameServer自己还活着。
  • NameServer每隔10S,主动监测broker上次登记心跳的时间,如果某个broker超过120S没有新的心跳,呢么判该Broker失活。
  • broker宕机后的容错

120S NameServer未监听broker进行移除后,呢客户端又是如何感知到相关Broker挂掉的呢,此时并不是NameServer广播至客户端某太Broker宕机。
一方面这个时间间隔显然是超过业务容忍的 , 另一方面nameServer去播报这条broker宕机的消息给谁呢?
这些是依赖于生产的发送消息的容错机制,与这里的心跳探活没有必然关系。

呢么,NameServer宕机后消息能否被正常的投递与消费呢?

其实这里跟dubbo很像,本地都会有缓存的存根,将无法在感知新的注册信息,依赖于本地存根的数据。

2) BrokerServer

作为消息存储的单元一定是具备高可用性的。在Kafka中,是基于分区(Partition)下的Leader、Follower副本通过ISR等机制进行保证的,呢么RocketMq是怎么做到的呢?

  • Broker 具备 master 与 slave , 每次当master收取数据后,将会同步至Slave(本质是Slave向master进行拉取)。同样slave也会同步自己的信息向nameSever进行注册与心跳汇报。
  • 在Kafka中,follower副本是不会对外提供读能力的,所有的请求仅能由Leader副本进行处理;
    而在RocketMq中,是可以从slave中进行读取消息的,但是否从slave读取还是基于Master的判断进行决策判断。既每次读取到的消息,有可能来自于master,也有可能来自于slave。
  • Kafka当Leader副本宕机后,从ISR副本中进行新一轮leader选举,呢么RocketMq也是从slave进行选举吗?

这一点两款jms差异很大, 4.5之前是需要人肉该slave配置进行重启在可以复用;之后版本加入了Dledger机制实现自动切换 ;Dledger机制 暂时理解为从slave进行了一次基于raft算法的leader选举。


3) MessageQueue-基础知识:

RocketMq学习记录 (一) ----基础知识梳理
Kafka中,根据其三层架构 主题->分区->副本->消息 ,消息是被存储于topic层级下,若干个topic就会有N*若干个消息文件。

但是RocketMq是将所有主题的消息存储在同一个文件中。

要说清楚这里还是比较麻烦的,又需要引入一些新的概念进来。

MessageQueue: rocketmq独有的一个属性,类似于Kafka中的Partition 。在创建topic的时候需要指定的一个关键参数。一个Topic对应了多少队列,也就是多少个MessageQueue。
比如现在有一个Topic,对应4个MessageQueue呢么其在Broker域中又是如何分布的呢?
RocketMq学习记录 (一) ----基础知识梳理
引入的MessageQueue本质也是做一次数据分片。 比如一个topic有4个messageQueue,呢么每个messageQueue将会有 1/4 条消息。当然这也不是绝对的,具体取决于写入MessageQueue的策略(也是类似于Kafka中的分区策略)。

RocketMq学习记录 (一) ----基础知识梳理

呢么,生产者当发送一条消息的时候,会写入到哪个MessageQueue中?

首先,生产者会跟NameServer进行通信获取Topic的路由信息,呢么生产者便从NameServer中得知该topic有几个messageQueue,哪些messageQueue在哪台Broker上。比如此时我们就考虑为是平摊写入到对应的messageQueue中,则如下;

RocketMq学习记录 (一) ----基础知识梳理
(通过该方式也进行分布式系统的伸缩性能力。)

在前文提到中,说如果一台Broker宕机了该怎么办,因为nameServer并不会广播某一个太Broker心跳失活。这一点也是在Producer发送消息时,做的分区容错。

如果Producer访问Broker发现网络延迟大于500ms,则会自动回避访问该Broker一段时间(sendLatencyFalultEnable)。这样的话,就从上面的均匀访问将流量切换至访问仍旧可提供服务的Broker中了。


4)CommitLog与ConsumerQueue:

消息引擎中有一个很重要的点就是如何存储消息。

当上述生产者将一条消息发送到Broker上的时候,收到消息后后续会做什么呢?

第一步便是把这条消息直接写入到磁盘上的一个日志文件,叫做CommitLog。为了保证消息存储不发生混乱,对 CommitLog 写之前会加锁,同时也可以使得消息能够被顺序写入到 CommitLog 。

RocketMq学习记录 (一) ----基础知识梳理
commitLog指代的是很多磁盘文件,每个文件额定大小事1GB,Broker收到消息之后就会在该文件上进行追加写入。若写满了1GB,就会创建一个新的commitLog。
RocketMq学习记录 (一) ----基础知识梳理
commitLog的写入速度绝对是影响吞吐率的重要因素,这种写入跟Kafka一样,也是基于 MMap 零拷贝顺序写异步刷盘进行优化的。

而所有的异步刷盘都会有一个问题,就是未刷盘前进行了宕机,会丢失数据(没有类似于mysql redo log的日志文件在)。同样,RocketMq也提供了同步刷盘的策略。


RocketMQ将所有主题的消息存储在同一个文件中,但由于消息中间件一般是基于主题进行消费,呢如何从一个文件中进行检索呢?以及跟之前提及的MessageQueue又有什么关系呢?

这里面又有一个新概念,叫做ConsumerQueue , 位于盘符如下:
RocketMq学习记录 (一) ----基础知识梳理
cd consumequeue = >
RocketMq学习记录 (一) ----基础知识梳理
cd TopicName = > 选取指定Topic
RocketMq学习记录 (一) ----基础知识梳理
cd 0 => 进入"MessageQueue" 0
RocketMq学习记录 (一) ----基础知识梳理
Topic下的每个MessageQueue都会有一系列的ConsumerQueue文件 ,盘符格式为:

$HOME/store/comsumequeue/{topic}/{queueId}/{fileName}

简单来说:

ConsumeQueue可以看成是Commitlog关于消息消费的“索引”。

ConsumeQueue 是消息消费的逻辑队列,消息达到 CommitLog 文件后将被异步转发到消息消费队列,供消息消费者消费。这里面包含 MessageQueue 在 CommitLog 中的【物理位置偏移量 Offset】,【消息实体内容的大小和 Message Tag 的 hash 值】。

每个文件默认大小约为 600W 个字节,如果文件满了后会也会生成一个新的文件。

每个Topic包含多个ConsumeQueue,每一个ConsumeQueue有一个文件。
RocketMq学习记录 (一) ----基础知识梳理

Producer to Broker

消息的写入大致是上述流程(Producer to Broker),呢消息的消费呢?

1. 从方式而言

拉取(MQPullConsumer):

取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

这种方式听着都麻烦,确实也没在用。

MQPushConsumer :

consumer向Broker进行消息【拉取】

优点
拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。

拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。

缺点:
消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。

消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。

针对缺点的优化

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已,因为 RocketMQ 在被背后偷偷的帮我们去 Broker 请求数据了。

当请求送达Broker的时候,发现并未有新的消息需要进行处理,则会挂起当前线程,默认是挂起15秒。在挂起的时间内,会有后台线程每隔一会检查是否有新的消息,如果在这个挂起过程中有信息的消息到达则会被唤醒。

2. 从consumerLog而言

假设一个消费者机器发送拉取请求到Broker ,本次需要拉取MessageQueue0中的消息,然后之前并没有拉取过消息,呢么此时便从这个MessageQueue中的第一条消息开始拉取。

=》 于是,Broker就会找到messageQueue0对应的ConsumerQueue0,并在里面找到第一条消息的offset。

RocketMq学习记录 (一) ----基础知识梳理

=》 紧接着,Broker就需要根据ConsumerQueue0中找到的第一条消息,去CommitLog中根据这个offset读取出来这条消息的数据,让后将消息返回给消费者机器。

本质还是根据要消费的messageQueue以及开始消费的位置,去ConsumerQueue中读取commitLog的offset,进而根据offset读取到消息数据。

ConsumerQueue 的os Cache:

从上面可以看出,ConsumerQueue一定也是一个高频访问的资源 , 而RocketMq对ConsumerQueue文件也是同样也是基于os cache进行优化的。 也就是说,对于Broker机器的磁盘上的大量ConsumerQueue文件,在写入的时候也是优先进入os Cache中。此外加上ConsumerQueue主要是存放消息的offset,30W条消息的情况下,也大概仅5.72MB。所以ConsumerQueue几乎可以完全被os cache进行存放。
RocketMq学习记录 (一) ----基础知识梳理
1.rocketmq读写分离读请求在master负载较高 consumer读取数据的offset较落后每次都可能需要从磁盘读取。
2.当读请求至Broker时,如果消费的内容处于osCache中,主节点会直接受理任务。

5 ) 网络构建

RocketMq学习记录 (一) ----基础知识梳理
跟Kafka基本一致, 实现区别Kafka是自己封装一套NIO,而RocketMq是基于Netty。

线程模型也都是独立的,最终效果为:

  • Reactor 主线程在端口上监听Producer建立连接的请求,形成长连接。
  • Reactor 线程池并发的监听多个连接的请求是否到达。
  • Worker请求并发的对多个请求进行预处理。
  • 业务线程池并发的对多个请求进行磁盘读写等业务操作。

回头结合代码认真看吧,netty都忘完了。


小总结, RocketMq与Kafka类似的地方还是很多的,二者概念接近的地方就不比较相似了,后续结合RocketMq相关代码,分析下主要功能的交互逻辑。

上一篇:httprunner3 log放到allure中显示


下一篇:java自己开发工作流