RocketMQ

描述

RockeMQ作为一个消息队列被用来解耦,异步,削峰。

模块

RocketMQ主要涉及到了4个角色。
  • producer消息发送者
  • consumer消息消费者
  • nameserver路由中心
  • broker运行服务
broker里面的角色
  • commitlog
  • indexFile:发送的key的hash值,offset偏移量,timestamp,next index offset
  • consumeQueue:存储内容是1.offset偏移量,2.字节长度,3.tag的hashCode

工作原理

  1. MQ启动时将,borker服务将自己里面负责的topic以及自己的信息推送给nameserver注册中心集群
  2. 当producer集群和consumer集群启动的时候,分别从注册中心拉取注册中心信息,存储在一个concurrentHashMap对象(topicPublishInfoTable)中
  3. 当用户需要发送信息的时候,producer通过用户的topic,从topicPublishInfoTable对象中取出这个topic对应的TopicPublishInfo(里面包含发送的消息队列messageQueueList,和服务器topic相关信息TopicRouteData(这里面包含队列List信息,broker的List信息,broker对应的filterServerTable筛选))
  4. 通过各个条件(*投递(默认消息投递延迟最小和队列轮询)/顺序投递(默认通过hash))取出topic对应的地址信息(这里面由对应的broker地址,以及broker其中的consumeQueue队列的id)
  5. 通过这个地址发送消息给对应的broker进程
  6. broker首先将这个数据写入电脑缓存(此时可以拿到数据在文件中偏移量),通过异步刷盘的方式写入磁盘上的commitlog文件中,之后将这些信息写入到consumeQueue消费队列(异步持久化)和IndexFile Hash索引中(异步持久化)
  7. broker将consumeQueue队列中的数据分配推送到对应的消费端
  8. 消费端消费后返回一个ACK成功标志给RocketMQ,
  9. 如果失败或者没有返回,rocketMq会把消息再次发给broker,broker在延迟后的时间点将这个数据再次投递到这个消费端,16次失败就会把数据投递到死信队列。
  10. 如果返回成功,则consumer 的offset会记录这个consumequeue队列的消费进度。

深入理解

生产者消息投递方式

无序投递

消息投递延迟最小加上Queue队列轮询。

有序投递

MessageQueueSelector消息投递选择器

  1. SelectMessageQueueByRandom随机分配策略
  2. SelectMessageQueueByHash基于hash分配策略(默认)
  3. SelectMessageQueueByMachineRoom基于指定机房分配策略

消费者消费方式

消费者消费,其实是服务端分配好了那个消费端消费哪个broker哪个消费队列里面的数据
AllocateMessageQueueStrategy消费者消息分配策略(topic,brokeName,queueId)
  1. AllocateMessageQueueAveragely :平均分配算法(默认)
  2. AllocateMessageQueueAveragelyByCircle:环状分配消息队列
  3. AllocateMessageQueueByConfig:按照配置来分配队列: 根据用户指定的配置来进行负载
  4. AllocateMessageQueueByMachineRoom:按照指定机房来配置队列
  5. AllocateMachineRoomNearby:按照就近机房来配置队列
  6. AllocateMessageQueueConsistentHash:一致性hash,根据消费者的cid进行

NameServer注册机制

功能:路由管理,服务注册,服务发现
服务注册:Broker服务启动时向所有的NameServer发送心跳,每30s向所有NameServer发送一次。NameServer收到心跳更新缓存。NameServer每10s扫描一下存储的Broker信息,如果120s没有收到信息,则移除这个Broker路由和关闭Socket。
服务发现:RocketMQ的路由是非实时的。当Topic对应的路由发生变化的时候,不会主动去通知客户端。而是客户端定时去拉取Topic最新的路由。至于那些不实时导致的问题需要客户端自己解决。

消息订阅

rocketMQ有push和pull两种订阅方式,push:,MQServer主动向消费端推送;pulll:消费端需要的时候主动到MQServer拉取。具体的实现都是消费端主动去服务端拉取的方式(consumer轮询从broker拉取信息)。
消费端的负载均衡:消费端会通过Rebalance Service 线程,10s做一次基于topic下所有队列负载。

push模式

客户端和服务端建立连接,当有数据到服务端,服务端将数据推送过来。
实现原理:消费者把轮询的方法封装成了一个连接,并注册了一个监听器MessageListener,收到消息后通过监听器调用consumeMessage()来消费

pull模式

客户端不断的轮询请求客户端。
实现原理:首先根据自己缓存的topic信息找到对应的consumeQueue,一个个去对应的队列批量拉取消息。

RocketMQ

上一篇:SE基础接口和抽象类的关系


下一篇:unicloud数据聚合处理