Rocketmq-尝试理解

普通的信息发送和消费

首先要启动nameserver和broker,nameserver是一个几乎无状态节点。broker分为master和slave,master和slave的对应关系通过指定相同的BrokerName,不同的BorkerId来定义,BrokerId为0表示Master,其他为Slave。每个Broker和Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

Producer在发送消息前,获取DefaultMQProducer,指定groupName,设置namesrvAddr。开始启动producer,用本身实例defaultMQProducer生成MQClinetManager的实例mQClinetFactory,向mQClinetFactory注册producer也就是向ConcurrentHashMap类型的producerTable中put。然后mQClientFactory.start()。
  获取并设置NameServer Addr。
  启动通讯服务。
  启动定时任务(1:定时获取NameServerAddr,2:定时更新路由信息(也就是更新DefaultMQProducerImpl中的topicPushlishInfoTable)。3:持久化消费记录,4:动态调整线程池)。
  pullMessageService.start()。
  负载均衡启动
  再次调用DefaultMQProducerImp.start(false)这次不走mQClientFactory.start()。
向所有Broker发送心跳,还有一个方法uploadFilterClassSource()(应该是加载过滤器的)
获取完producer(DefaultMQProducer)后,初始化要发送的消息,消息中设置topic,tag,key,消息体(body)。
发送消息。producer.send()
获取路由信息(从topicPublishInfoTable获取topic的TopicPushlishInfo,没有路由信息或 !topicPublishInfo.ok()从NameServer获取,构建请求头,remoteClinic调nameServer获取,看本地是否需要更新,如果需要,更新到本地,获取过来的TopicRouteData转成TopicPublishInfo(producer用的)跟新到本地,也就是向topicPublishInfoTable中put。转成subscribeInfo(consumer用的)向RebalanceImpl.topicSubscribeInfoTable中put)
获取重试次数(timesTotal),循环这么多次发送消息。获取上一次发送失败的lastBrokerName,第一次发送时,mq为空,如果是轮训,依次轮训topic下的MessageQueue,跳过上次失败的,选取一个mq。如果是顺序消息,就是自己根据算法选择一个MessageQueue列表的中的一个mq。
开始发送。根据上面获取的brokername从MQClientInstance.brokerAddrTable获取brokerAddr,如果为空重复上面的获取路由信息。继续获取。构建SendMessageRequestHeader发送。
=========================================================================================
broker端接受到消息,将消息写入到CommitLog中。
=========================================================================================
Consumer启动前,获取Consumer对象(这里是DefaultMQPushConsumer),指定groupName,设置NamesrvAddr,指定ConsumeFromWhere,指定订阅topic,push的方法指定MessageListener。consumer.start()启动
复制订阅关系。defaultMQPushConsumer.subscription复制到RebalanceImpl.subscriptionInner。初始化rebalanceImpl对象。构建offsetStore消费对象。consumeMessageService.start()启动消费消息服务。mQClientFactory注册这个消费者(也就是MQClientInstance.consumerTable中put)。mQClientFactory.start()和上面producer启动的过程一样。
在rebalanceService.start()中,进行负载均衡。大体是遍历所有的topic,构建PullRequest,经过多步调用,放在PullMessageService的pullRequestQueue队列中,让pullMessageService线程去获取。
pullMessageService.start()这个是在rebalanceService.start()上面一行启动的。而pullMessageService.start()才是正真的消费。在DefaultMQPushConsumerImpl.pullMessage()方法中,构造回调函数PullCallback,对拉取消息结果PullResult做处理,具体是,从PullResult中解码出拉取的消息列表,如果消息的订阅tag不为空且不是classFilter过滤模式,则进行tag过滤,然后把过滤后的消息列表装入PullResult,取出pullResult的nextBeginOffset装入当前的pullRequest的NextOffset中,更新统计数据,异步提交ConsumeRequest进行消息消费,接着提交pullRequest准备做下一次拉取消息的请求。PullMessageProcessor.processRequest()接收到拉消息的请求,做一些简单的判断,如检查Broker权限,确保订阅组存在,检查topic是否存在,然后去messageStore里取消息。详细说明:DefaultMessageStore根据请求的Topic和queueId获取对应的ConsumerQueue,根据传入的queueOffset从consumerQueue里取出目标buffer,然后以20个字节为单位循环从目标buffer里取,取出偏移量offsetPy(占8个字节),消息长度sizePy(占4个字节),过滤标识tagCode(占8个字节),判断如果订阅信息匹配tagCode,则以offsetPy和sizePy从commitLog中以取出消息体buffer,存入GetMessageResult,然后再进行下一次取,最后返回GetMessageResult。取出GetMessageResult的NextBeginoffset,minOffset,maxOffet3个属性,设置到responseHeader中,然后把GetMessageResult打包进response后发送到Consumer端。获取到brokeraddr并获取到结果。回到PullCallBack中的onSuccess方法。更新从哪个broker拉去消息,消息过滤,消息中放入最大最小offset。将消息加入正在处理队列ProcessQueue。提交消息到ConsumeMessageService。ConsumerRequest是ConsumeMessageConcurrentlyService的内部类,里面的run方法就是调用了listenner的consumeMessage方法,也就是开始时自己写的那个消费方法。返回提交结果(ConsumeOrderlyStatus),调用ConsumeMessageOrderlyService.this.processConsumeResult继续处理,如果是success,如果自动提交(非事务方式),调用processQueue.commit(),调用msgTreeMapTemp.lastKey()获取提交的offset,清空msgTreeMapTemp,成功消费。非事务提交是更新内存的offsetTable,让定时任务更新到broker上。
ConsumerRequest是一个内部类,分别在ConsumeMessageConcurrentlyService(并发)和ConsumeMessageOrderlyService(顺序)中是不一样的。

上一篇:Linux 平台GCC使用小结


下一篇:矩阵乘法的MapReduce实现