CLIENT
之前讲过tools里面有大量调用client的东西。为了从源码层面了解rocket,决定啃下client这块骨头。
- pom
先看pom,看看CLIENT依赖谁。看完后原来是依赖common的。common又依赖remoting的 翻开remoting看了看,都是一些接口定义。明白了,remoting应该是通信协议,公共接口。看来如果讲框架的时候,应该从remoting开始讲。 而common,看名字就知道是个公共包,里面提供了各种公用的东西。先不讲了,用到再看吧。
好的,开始分析源码。。。。。。 . . . . .
十分钟过后,撞墙了。这特么是个类库啊从哪里下手呢? 没办法已经答应你们要讲client了。硬着头皮讲吧,因为是盲人摸象,所以讲的乱大家不要拍我(画外音:好像你丫其他的就讲得好似的)
好吧,没办法,硬着头皮讲吧。 下面,我按照包的名称讲 先讲com.alibaba.rocketmq.client包下面的
- com.alibaba.rocketmq.client
- ClientConfig 就是一个配置类,里面需要配置nameserveraddress,clientip,实例名称,可以用于做callback的线程。 有几个默认的值比较重要
- pollNameServerInteval 默认是30000毫秒,看名称意思是从nameserver进行访问的间隔时间。被用在com.alibaba.rocketmq.client.impl.factory.MQClientInstance 这个类中,用于生产实例!
- heartbeatBrokerInterval 默认也是30000毫秒,从名称上推敲,应该是保证到broker心跳的时间间隔。30秒一次。
- persistConsumerOffsetInterval 看用法貌似是消费者重新计算offset的时间间隔,默认是5秒。翻看相关代码的时候,发现pull/push 的consumer都使用了这个。也就是说,这两种consumer对会对offset进行记录。 而记录的方式分为记录到本地文件,和远程地址两种。等讲到的时候再讲,要是忘了,你们提醒我哦。
这里顺便讲一下clientid的规律,就是client的ip+@+实例名称。 看看他们的源码
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
return sb.toString();
呵呵,原来阿里的员工也写sb,记得在海关总署的时候,当时带我的高架,在codereview的时候,说,在代码中别骂人,这是道义。哎,现在的人啊。
剩下的方法就是重置配置啊,克隆啊这些什么的。对,这个类还重写了toString 方法,想看config信息的,直接tostring就ok了。
- MQAdmin 就是个接口,之前在讲tools的时候讲过了。主要是创建话题,查找offset,以及查找消息的一类东西。
- MQHelper 工具类 resetOffsetByTimestamp
- 其余不重要的代码略过 比如QueryResult,Validators,VirtualEnvUtil
- com.alibaba.rocketmq.client.admin
里面只有一个接口,MQAdminExtInner 这个只有在client中用过的。应该是一个标记型接口。
- consumer
重点来了,最重要的两个两个包之一。
先看看有几个包,有listener,rebalance,store。还好不是很多。我就顺着往下看了
- consumer包内部
- AllocateMessageQueueStrategy是用来平衡consumer收到消息的算法接口,典型的strategy模式。具体算法后面会讲到
- DefaultMQPullConsumer 最重要的类,默认消费者。里面封装了DefaultMQPullConsumerImpl 这个对象。注意类默认的消息策略是AllocateMessageQueueAveragely。
- DefaultMQPushConsumer 是推送消息的消费者,注意,默认的消息策略也是AllocateMessageQueueAveragely
- MessageQueueListener 消息队列发生变化的时候的listener。
- MQConsumer 是消费者的接口,主要是延时消费和获取订阅消息的功能。
- MQPullConsumerScheduleService 是拉消息的后台线程service,从start方法开始看。其中 this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);很重要。
- PullBack,PullResult,PullStatus,PullTaskCallback,PullTaskContext等都是一些辅助类,不细讲了。
- com.alibaba.rocketmq.client.consumer.listener
这个包里面主要是MessageListner相关的东西,分为并发消费和顺序消费两种。 MessageListener是基础接口,MessageListenerConcurrently和MessageListenerOrderly是他的两个继承接口 剩下的东西,都是这个接口中需要的一些辅助类,比如context,status等。
- com.alibaba.rocketmq.client.consumer.rebalance
这下面是四个类,都实现了AllocateMessageQueueStrategy 这个接口,看了看典型的strategy模式 这个包里面的几个类是具体的算法实现。
- com.alibaba.rocketmq.client.consumer.store
这个包下面是用来存储offset的。主要分为localfileoffset和remotebrokeroffset。至于什么是offset,我的理解就是所谓的刷盘机制。讲消费进度和内容保存为文件。必要的时候可以读取。这里粗略的看了一下。
localoffset用的是fastjson来存储的。文件位于系统变量'rocketmq.client.localOffsetStoreDir'所指向的位置,如果没有的话,会放在user.home系统变量的.rocketmq_offsets文件中。计算offset变量的时候用的是atomicLong,所以不用担心异步问题。
remotebrokeroffset
中用到了大量的netty和java concurrent的内容。很有必要研究一下。
这里给大家两个传送门
http://47966392.blog.163.com/blog/static/13070442320091169152773/
http://www.cnblogs.com/whgw/archive/2011/09/29/2195555.html
当然,最好还是研读专业书籍。
其中invokeOnewayImpl 这个方法,使用semahore来获取资源锁,netty写完后,再释放。invokeAsyncImpl也是类似的套路。所以读懂了其中一段代码,就读懂了其他的代码
至此,consumer讲完了。
- com.alibaba.rocketmq.client.exception
这里面主要是MQ的一些异常类,不细讲了
- com.alibaba.rocketmq.client.hook
这里面都是一些Context和hook。大家不要觉得陌生,其实这就是listener。只不过叫法不一样。具体用法是在一些producer,processor中会有一个借口叫做registerXXXHoook的这个方法。这个方法的作用就是把这些hook注册到系统中,然后当系统流转到某个步骤的时候会触发这些hook。这些hook都是接口。看来要自己实现了。
其实这就是一个观察者模式,写成context和hook,看来作者manhong.yqd<jodie.yqd@gmail.com> 应该是从c转过来做java的。个人感觉,不喜勿喷。
- com.alibaba.rocketmq.client.log
里面只有一个logger,没什么可讲的。
- com.alibaba.rocketmq.client.producer
又是一个大部头。咦中间为什么把impl这个包跳过呢?因为impl包是具体的实现。最后在写。
- MQProducer
是一个继承了 MQAdmin 的接口,消费者的主要接口,主要是有几个方法:start、shutdown、fetchPublishMessageQueues、send(多态)、TransactionSendResult(支持事务的)
这个方法是生产者的总接口
- DefaultMQProducer
是 mq 默认的生产者,注意代码实现中的
private volatile int defaultTopicQueueNums =
4;
这个表示默认的 topic 队列个数,这里的关键字 volatile。关于这个关键字的用法,请移步这里
发送消息超时时间默认是3000毫秒,也就是3秒。要注意,其中有一个变量叫做compressMsgBodyOverHowmuch 。这个名字真是。。。。。看名字意思是如果消息体超过了4096,就会进行压缩
retryTimesWhenSendFailed 名字很诡异,大意是如果发送失败会重试多少次。
retryAnotherBrokerWhenNotStoreOK 这个名字很牛逼,是否在 Store 挂掉的情况下,尝试其他 broker。
maxMessageSize 最大消息体,1024*128.
这个类里面,最重要的就是这些默认值和意义。因为具体的实现都在 impl 中了。
- LocalTransactionExecuter
是个接口,应该是本地交易执行后,用于设置标志位的一个接口。参看
DefaultMqProducerImpl 中的 TransactionSendResult 方法。如果返回
COMMIT,那么后续结束。如果这个接口的实现类返回的结果是失败,会设置一个标志位 TransactionRollbackType。结束时impl
中会调用一个私有方法endTransaction 来做统一的处理。这个方法中,会针对
LocalTransactionExecuter 返回的结果"MessageSysFlag.TransactionRollbackType"做具体的处理。主要是在
broker 中的 EndTransactionProcessor 类中和store 项目中做处理(store 项目中的 IndesService中会有对 RollBack 的处理)
- MessageQueueSelector
也是一个接口,用于在
producer 发送消息时,对消息进行选择的。是 producer.send 方法的过滤器。具体使用方法可以参考 example 项目。源码如下
SendResult
sendResult = producer.send(msg, new
MessageQueueSelector() {
@Override
public MessageQueue
select(List<MessageQueue> mqs, Message
msg, Object arg)
{
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
- SendCallback、SendResult、SendStatus
主要是一些状态,cbk 接口
- TransactionCheckListener
这个是需要开发人员自己实现的,用于检查事务的检查器,请参考
example 项目的 transaction。具体用处请参考 DefaultMQProducerImpl 类中的checkTransactionState 方法
- TransactionMQProducer
主要是对 defaultMQProducerImpl 的封装,里面封装了关于 impl 中使用事务的封装。这个和
DefaultMQProducer 的区别在于,start 的时候,需要初始化以下。称之为 initTransactionEnv(),shutdown 的时候,需要做一下 destoryTransactionEnv,对环境进行处理
- com.alibaba.rocketmq.client.impl
这个是 consumer、producer、factory、admin 的具体实现细节
整个 client 中最重要的包了。看懂了这个部分,就了解了整个 client 的逻辑。而 client 是整个 mq
中面对开发者来说最重要的组成模块。
- CommunicationMode 类
通信类型,分为 SYNC 同步,ASYNC 异步,ONEWAY模式(我称之为有去无回模式,具体请看这里2-3)
- FindBorkerResult 类
查找 broker 的结果 bean。
- MQAdminImpl 类
管理员 api 的实现,在上一章tools 中,有讲到。具体来分析以下这个类都干了什么
admin 管理的时候,也是要创建一个 topic,这里使用的设计思路跟 redis 的 admin 是一个思路。
主要将 createTopic 方法,这个方法先去根据 MQClientFactory 找路由信息。然后根据路由信息,去找 Broker 的 MASTER 的地址,然后通过
MQClientFactory 创建一个 topic
fetchPublicMessageQueue 这个方法是获取广播信息
fetchSubscribeMessageQueues 这个方法是获取订阅信息,以上两个方法超时时间都是3秒
queryMessage 方法是先查找路由信息,然后使用 MQClientFactory 查询,超时时间是15喵。查询成功后,会将返回的结果序列化为消息体,MessageExt。然后将 MessageExt 封装为 QueryResult。然后根据要查找的 key 对 message 进行过滤。并且会根据最后一次更新的时间作为标记。将这些时间之后的数据返回。
中间使用了CountDownLatch 这个类,用于多线程计数器。关于 CountDownLatch 请参考这里。其实 java 本身已经开源了,他的孪生兄弟 CyclicBarrier 也是一个很好玩的家伙。CountDownLatch 计数器使用了2秒的超时时间。也就是说,如果2秒内没有得到查到消息也会继续后续的操作。而且不会报错。
searchOffset 方法
这个方法是查询进度的,调用的是 MQClientAPIImpl的实现方法
- MQClientAPIImpl 类
ROCKETMQ 核心方法类,没有之一
- MQClientManager 类
主要是包含了创建实例,删除客户端工厂的方法。创建后会放到一个
ConcurrentHashMap中。关于 ConcurrentHashMap,可以参考这篇文章。
- ClientRemotingProcessor 类
- com.alibaba.rocketmq.client.impl.consumer
- ConsumeMessageService
- ConsumeMessageConcurrentlyService
- ConsumeMessageOrderlyService
- MQConsumerInner
- DefaultMQPullConsumerImpl
- DefaultMQPushConsumerImpl
- MessageQueueLock
- ProcessQueue
- PullAPIWrapper
- PullMessageService
- PullRequest,PullResultExt
- RebalanceImpl
- RebalancePullImpl
- RebalancePushImpl
- RebalanceService
- com.alibaba.rocketmq.client.impl.factory
- MQClientInstance
- com.alibaba.rocketmq.client.impl.producer
- MQProducerInner
- DefaultMQProducerImpl
- TopicPublishInfo