ROCKETMQ源码分析笔记2:client

CLIENT

之前讲过tools里面有大量调用client的东西。为了从源码层面了解rocket,决定啃下client这块骨头。

  • pom

先看pom,看看CLIENT依赖谁。看完后原来是依赖common的。common又依赖remoting的 翻开remoting看了看,都是一些接口定义。明白了,remoting应该是通信协议,公共接口。看来如果讲框架的时候,应该从remoting开始讲。 而common,看名字就知道是个公共包,里面提供了各种公用的东西。先不讲了,用到再看吧。

好的,开始分析源码。。。。。。 . . . . .

十分钟过后,撞墙了。这特么是个类库啊从哪里下手呢? 没办法已经答应你们要讲client了。硬着头皮讲吧,因为是盲人摸象,所以讲的乱大家不要拍我(画外音:好像你丫其他的就讲得好似的)

好吧,没办法,硬着头皮讲吧。 下面,我按照包的名称讲 先讲com.alibaba.rocketmq.client包下面的

  • com.alibaba.rocketmq.client
  1. ClientConfig 就是一个配置类,里面需要配置nameserveraddress,clientip,实例名称,可以用于做callback的线程。 有几个默认的值比较重要
  • pollNameServerInteval 默认是30000毫秒,看名称意思是从nameserver进行访问的间隔时间。被用在com.alibaba.rocketmq.client.impl.factory.MQClientInstance 这个类中,用于生产实例!
  • heartbeatBrokerInterval 默认也是30000毫秒,从名称上推敲,应该是保证到broker心跳的时间间隔。30秒一次。
  • persistConsumerOffsetInterval 看用法貌似是消费者重新计算offset的时间间隔,默认是5秒。翻看相关代码的时候,发现pull/push 的consumer都使用了这个。也就是说,这两种consumer对会对offset进行记录。 而记录的方式分为记录到本地文件,和远程地址两种。等讲到的时候再讲,要是忘了,你们提醒我哦。

这里顺便讲一下clientid的规律,就是client的ip+@+实例名称。 看看他们的源码

StringBuilder sb = new StringBuilder();
   
sb.append(this.getClientIP());
    sb.append("@");
   
sb.append(this.getInstanceName());
    return sb.toString();

呵呵,原来阿里的员工也写sb,记得在海关总署的时候,当时带我的高架,在codereview的时候,说,在代码中别骂人,这是道义。哎,现在的人啊。

剩下的方法就是重置配置啊,克隆啊这些什么的。对,这个类还重写了toString 方法,想看config信息的,直接tostring就ok了。

  1. MQAdmin 就是个接口,之前在讲tools的时候讲过了。主要是创建话题,查找offset,以及查找消息的一类东西。
  2. MQHelper 工具类 resetOffsetByTimestamp
  3. 其余不重要的代码略过 比如QueryResult,Validators,VirtualEnvUtil
  • com.alibaba.rocketmq.client.admin

里面只有一个接口,MQAdminExtInner 这个只有在client中用过的。应该是一个标记型接口。

  • consumer

重点来了,最重要的两个两个包之一。

先看看有几个包,有listener,rebalance,store。还好不是很多。我就顺着往下看了

  • consumer包内部
  1. AllocateMessageQueueStrategy是用来平衡consumer收到消息的算法接口,典型的strategy模式。具体算法后面会讲到
  2. DefaultMQPullConsumer 最重要的类,默认消费者。里面封装了DefaultMQPullConsumerImpl 这个对象。注意类默认的消息策略是AllocateMessageQueueAveragely。
  3. DefaultMQPushConsumer 是推送消息的消费者,注意,默认的消息策略也是AllocateMessageQueueAveragely
  4. MessageQueueListener 消息队列发生变化的时候的listener。
  5. MQConsumer 是消费者的接口,主要是延时消费和获取订阅消息的功能。
  6. MQPullConsumerScheduleService 是拉消息的后台线程service,从start方法开始看。其中 this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);很重要。
  7. PullBack,PullResult,PullStatus,PullTaskCallback,PullTaskContext等都是一些辅助类,不细讲了。
  • com.alibaba.rocketmq.client.consumer.listener

这个包里面主要是MessageListner相关的东西,分为并发消费和顺序消费两种。 MessageListener是基础接口,MessageListenerConcurrently和MessageListenerOrderly是他的两个继承接口 剩下的东西,都是这个接口中需要的一些辅助类,比如context,status等。

  • com.alibaba.rocketmq.client.consumer.rebalance

这下面是四个类,都实现了AllocateMessageQueueStrategy 这个接口,看了看典型的strategy模式 这个包里面的几个类是具体的算法实现。

  • com.alibaba.rocketmq.client.consumer.store

这个包下面是用来存储offset的。主要分为localfileoffset和remotebrokeroffset。至于什么是offset,我的理解就是所谓的刷盘机制。讲消费进度和内容保存为文件。必要的时候可以读取。这里粗略的看了一下。

localoffset用的是fastjson来存储的。文件位于系统变量'rocketmq.client.localOffsetStoreDir'所指向的位置,如果没有的话,会放在user.home系统变量的.rocketmq_offsets文件中。计算offset变量的时候用的是atomicLong,所以不用担心异步问题。

remotebrokeroffset
中用到了大量的netty和java concurrent的内容。很有必要研究一下。

这里给大家两个传送门

http://47966392.blog.163.com/blog/static/13070442320091169152773/

http://www.cnblogs.com/whgw/archive/2011/09/29/2195555.html

当然,最好还是研读专业书籍。

其中invokeOnewayImpl 这个方法,使用semahore来获取资源锁,netty写完后,再释放。invokeAsyncImpl也是类似的套路。所以读懂了其中一段代码,就读懂了其他的代码

至此,consumer讲完了。

  • com.alibaba.rocketmq.client.exception

这里面主要是MQ的一些异常类,不细讲了

  • com.alibaba.rocketmq.client.hook

这里面都是一些Context和hook。大家不要觉得陌生,其实这就是listener。只不过叫法不一样。具体用法是在一些producer,processor中会有一个借口叫做registerXXXHoook的这个方法。这个方法的作用就是把这些hook注册到系统中,然后当系统流转到某个步骤的时候会触发这些hook。这些hook都是接口。看来要自己实现了。

其实这就是一个观察者模式,写成context和hook,看来作者manhong.yqd<jodie.yqd@gmail.com> 应该是从c转过来做java的。个人感觉,不喜勿喷。

  • com.alibaba.rocketmq.client.log

里面只有一个logger,没什么可讲的。

  • com.alibaba.rocketmq.client.producer

又是一个大部头。咦中间为什么把impl这个包跳过呢?因为impl包是具体的实现。最后在写。

  • MQProducer

是一个继承了 MQAdmin 的接口,消费者的主要接口,主要是有几个方法:start、shutdown、fetchPublishMessageQueues、send(多态)、TransactionSendResult(支持事务的)

这个方法是生产者的总接口

  • DefaultMQProducer

是 mq 默认的生产者,注意代码实现中的

private volatile int defaultTopicQueueNums =
4;

这个表示默认的 topic 队列个数,这里的关键字 volatile。关于这个关键字的用法,请移步这里

发送消息超时时间默认是3000毫秒,也就是3秒。要注意,其中有一个变量叫做compressMsgBodyOverHowmuch 。这个名字真是。。。。。看名字意思是如果消息体超过了4096,就会进行压缩

retryTimesWhenSendFailed 名字很诡异,大意是如果发送失败会重试多少次。

retryAnotherBrokerWhenNotStoreOK 这个名字很牛逼,是否在 Store 挂掉的情况下,尝试其他 broker。

maxMessageSize 最大消息体,1024*128.

这个类里面,最重要的就是这些默认值和意义。因为具体的实现都在 impl 中了。

  • LocalTransactionExecuter

是个接口,应该是本地交易执行后,用于设置标志位的一个接口。参看
DefaultMqProducerImpl 中的 TransactionSendResult 方法。如果返回
COMMIT,那么后续结束。如果这个接口的实现类返回的结果是失败,会设置一个标志位 TransactionRollbackType。结束时impl
中会调用一个私有方法endTransaction 来做统一的处理。这个方法中,会针对
LocalTransactionExecuter 返回的结果"MessageSysFlag.TransactionRollbackType"做具体的处理。主要是在
broker 中的 EndTransactionProcessor 类中和store 项目中做处理(store 项目中的 IndesService中会有对 RollBack 的处理)

  • MessageQueueSelector

也是一个接口,用于在
producer 发送消息时,对消息进行选择的。是 producer.send 方法的过滤器。具体使用方法可以参考 example 项目。源码如下

SendResult
sendResult = producer.send(msg, new
MessageQueueSelector() {

@Override

public MessageQueue
select(List<MessageQueue> mqs, Message
msg, Object arg)
{

Integer id = (Integer) arg;

int index = id % mqs.size();

return mqs.get(index);

}

}, orderId);

  • SendCallback、SendResult、SendStatus

主要是一些状态,cbk 接口

  • TransactionCheckListener

这个是需要开发人员自己实现的,用于检查事务的检查器,请参考
example 项目的 transaction。具体用处请参考 DefaultMQProducerImpl 类中的checkTransactionState 方法

  • TransactionMQProducer

主要是对 defaultMQProducerImpl 的封装,里面封装了关于 impl 中使用事务的封装。这个和
DefaultMQProducer 的区别在于,start 的时候,需要初始化以下。称之为 initTransactionEnv(),shutdown 的时候,需要做一下 destoryTransactionEnv,对环境进行处理

  • com.alibaba.rocketmq.client.impl

这个是 consumer、producer、factory、admin 的具体实现细节

整个 client 中最重要的包了。看懂了这个部分,就了解了整个 client 的逻辑。而 client 是整个 mq
中面对开发者来说最重要的组成模块。

  • CommunicationMode 类

通信类型,分为 SYNC 同步,ASYNC 异步,ONEWAY模式(我称之为有去无回模式,具体请看这里2-3

  • FindBorkerResult 类

查找 broker 的结果 bean。

  • MQAdminImpl 类

管理员 api 的实现,在上一章tools 中,有讲到。具体来分析以下这个类都干了什么

admin 管理的时候,也是要创建一个 topic,这里使用的设计思路跟 redis 的 admin 是一个思路。

主要将 createTopic 方法,这个方法先去根据 MQClientFactory 找路由信息。然后根据路由信息,去找 Broker 的 MASTER 的地址,然后通过
MQClientFactory 创建一个 topic

fetchPublicMessageQueue 这个方法是获取广播信息

fetchSubscribeMessageQueues 这个方法是获取订阅信息,以上两个方法超时时间都是3秒

queryMessage 方法是先查找路由信息,然后使用 MQClientFactory 查询,超时时间是15喵。查询成功后,会将返回的结果序列化为消息体,MessageExt。然后将 MessageExt 封装为 QueryResult。然后根据要查找的 key 对 message 进行过滤。并且会根据最后一次更新的时间作为标记。将这些时间之后的数据返回。

中间使用了CountDownLatch 这个类,用于多线程计数器。关于 CountDownLatch 请参考这里。其实 java 本身已经开源了,他的孪生兄弟 CyclicBarrier 也是一个很好玩的家伙。CountDownLatch 计数器使用了2秒的超时时间。也就是说,如果2秒内没有得到查到消息也会继续后续的操作。而且不会报错。

searchOffset 方法

这个方法是查询进度的,调用的是 MQClientAPIImpl的实现方法

  • MQClientAPIImpl 类

ROCKETMQ 核心方法类,没有之一

  • MQClientManager 类

主要是包含了创建实例,删除客户端工厂的方法。创建后会放到一个
ConcurrentHashMap中。关于 ConcurrentHashMap,可以参考这篇文章

  • ClientRemotingProcessor 类
  • com.alibaba.rocketmq.client.impl.consumer
    • ConsumeMessageService
    • ConsumeMessageConcurrentlyService
    • ConsumeMessageOrderlyService
    • MQConsumerInner
    • DefaultMQPullConsumerImpl
    • DefaultMQPushConsumerImpl
    • MessageQueueLock
    • ProcessQueue
    • PullAPIWrapper
    • PullMessageService
    • PullRequest,PullResultExt
    • RebalanceImpl
    • RebalancePullImpl
    • RebalancePushImpl
    • RebalanceService
    • com.alibaba.rocketmq.client.impl.factory
      • MQClientInstance
    • com.alibaba.rocketmq.client.impl.producer
      • MQProducerInner
      • DefaultMQProducerImpl
      • TopicPublishInfo
上一篇:【分布式】Zookeeper序列化及通信协议


下一篇:Dynamics AX 2012 R2 无法创建类"Excel.Application"的COM对象