描述
RockeMQ作为一个消息队列被用来解耦,异步,削峰。
模块
RocketMQ主要涉及到了4个角色。
- producer消息发送者
- consumer消息消费者
- nameserver路由中心
- broker运行服务
broker里面的角色
- commitlog
- indexFile:发送的key的hash值,offset偏移量,timestamp,next index offset
- consumeQueue:存储内容是1.offset偏移量,2.字节长度,3.tag的hashCode
工作原理
- MQ启动时将,borker服务将自己里面负责的topic以及自己的信息推送给nameserver注册中心集群
- 当producer集群和consumer集群启动的时候,分别从注册中心拉取注册中心信息,存储在一个concurrentHashMap对象(topicPublishInfoTable)中
- 当用户需要发送信息的时候,producer通过用户的topic,从topicPublishInfoTable对象中取出这个topic对应的TopicPublishInfo(里面包含发送的消息队列messageQueueList,和服务器topic相关信息TopicRouteData(这里面包含队列List信息,broker的List信息,broker对应的filterServerTable筛选))
- 通过各个条件(*投递(默认消息投递延迟最小和队列轮询)/顺序投递(默认通过hash))取出topic对应的地址信息(这里面由对应的broker地址,以及broker其中的consumeQueue队列的id)
- 通过这个地址发送消息给对应的broker进程
- broker首先将这个数据写入电脑缓存(此时可以拿到数据在文件中偏移量),通过异步刷盘的方式写入磁盘上的commitlog文件中,之后将这些信息写入到consumeQueue消费队列(异步持久化)和IndexFile Hash索引中(异步持久化)
- broker将consumeQueue队列中的数据分配推送到对应的消费端
- 消费端消费后返回一个ACK成功标志给RocketMQ,
- 如果失败或者没有返回,rocketMq会把消息再次发给broker,broker在延迟后的时间点将这个数据再次投递到这个消费端,16次失败就会把数据投递到死信队列。
- 如果返回成功,则consumer 的offset会记录这个consumequeue队列的消费进度。
深入理解
生产者消息投递方式
无序投递
消息投递延迟最小加上Queue队列轮询。
有序投递
MessageQueueSelector消息投递选择器
- SelectMessageQueueByRandom随机分配策略
- SelectMessageQueueByHash基于hash分配策略(默认)
- SelectMessageQueueByMachineRoom基于指定机房分配策略
消费者消费方式
消费者消费,其实是服务端分配好了那个消费端消费哪个broker哪个消费队列里面的数据
AllocateMessageQueueStrategy消费者消息分配策略(topic,brokeName,queueId)
- AllocateMessageQueueAveragely :平均分配算法(默认)
- AllocateMessageQueueAveragelyByCircle:环状分配消息队列
- AllocateMessageQueueByConfig:按照配置来分配队列: 根据用户指定的配置来进行负载
- AllocateMessageQueueByMachineRoom:按照指定机房来配置队列
- AllocateMachineRoomNearby:按照就近机房来配置队列
- AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行
NameServer注册机制
功能:路由管理,服务注册,服务发现
服务注册:Broker服务启动时向所有的NameServer发送心跳,每30s向所有NameServer发送一次。NameServer收到心跳更新缓存。NameServer每10s扫描一下存储的Broker信息,如果120s没有收到信息,则移除这个Broker路由和关闭Socket。
服务发现:RocketMQ的路由是非实时的。当Topic对应的路由发生变化的时候,不会主动去通知客户端。而是客户端定时去拉取Topic最新的路由。至于那些不实时导致的问题需要客户端自己解决。
消息订阅
rocketMQ有push和pull两种订阅方式,push:,MQServer主动向消费端推送;pulll:消费端需要的时候主动到MQServer拉取。具体的实现都是消费端主动去服务端拉取的方式(consumer轮询从broker拉取信息)。
消费端的负载均衡:消费端会通过Rebalance Service 线程,10s做一次基于topic下所有队列负载。
push模式
客户端和服务端建立连接,当有数据到服务端,服务端将数据推送过来。
实现原理:消费者把轮询的方法封装成了一个连接,并注册了一个监听器MessageListener,收到消息后通过监听器调用consumeMessage()来消费
pull模式
客户端不断的轮询请求客户端。
实现原理:首先根据自己缓存的topic信息找到对应的consumeQueue,一个个去对应的队列批量拉取消息。