?
- Disruptor
需要:消息,生产者,消费者,事件处理器
在SOUL中,使用的是多个生产者,多个消费者,即多写多读的模式
- Disruptor有消费者,生产者,Disrutpor,生产者生产消息,放置到disruptor ringbuffer上,消费者到ringbuffer上去消费。
- soul中,Disruptor在客户机接入时用来同步数据的,用以进行网关服务器数据的更新操作
- 先跟着运行的流程走一遍,过程是,客户端启动,搜索controller类中的注解,将扫描到的接口使用disruptor发布通知消息,由消费值进行远程发送,发送到souladmin服务器当中,到了服务器中,服务器将接收到的信息,也经过disruptor发布消息,由消费者进行数据的更新入库。
- 客户端发送消息过程:
- 首先,要启动disruptor,设定好消费者,找到soul-spring-boot-starter-client-springmvc jar包,看它的配置类
- 在SoulSpringMvcClientConfiguration中注册了SpringMvcClientBeanPostProcessor
- 进入到SpringMvcClientBeanPostProcessor类中:在spring容器启动时,先进行Processor的对象创建
config是配置类,在yml文件中设定注册的类型,以及服务器列表。
soulClientRegisterRepository,是根据配置文件的注册类型,由SPI获取到的具体的注册处理类
在创建SpringMvcClientBeanPostProcessor的时候,初始化了一个此线程大小为1的线程池,并调用了SoulClientRegisterEventPublisher.getInstance()获取的publisher的start方法。 - 进入start方法
初始化了一个客户端注册执行器工厂,并将订阅者(消费者)设置到对象当中了,初始化了DisrutporProviderManager对象
设置了客户端注册执行器工厂,ringBuffer的大小,消费者的大小,以及初始化一个线程池。随后调用providerManage.startup(); - 在providerManage.startup();方法中,
初始化一个Disruptor,初始化消费者,消费者对象设置消费者线程池和消费者工厂(客户端注册执行器工厂),将消费者绑定到disrutpor中,并启动disrutpor,最后将ringbuffer和disruptor设置到provider成员变量当中。至此,disruptor的初始化启动已经完成了,开启了disruptor,指定了发布的消息类型,设定了ringbuffer的大小,绑定消息执行处理器,最后开启,并获取到ringbuffer,设定到disruptor管理器的成员变量中。 - 那么,processor的构造器执行完毕,在每个交给Spring托管的Bean初始化之后会执行该processer中的postProcessAfterInitialization方法
- 执行到postProcessAfterInitialization方法
在每个Bean初始化之后检索Controller注解,并检索SoulSpringMvcClient注解,如果存在,则进行一步的disruptor消息发布,publist.publistEvent - 在publishEvent方法中
获取到了provider,并调用生产者的onData方法,参数是一个消费者类型的函数式接口 - 进入生产者的onData方法
获取到了ringbuffer当前写操作的游标,并将消息写到该位置。 - 接下来将由消费者进行消费。消费者为QueueConsumer的onEvent方法
- 在消费者的onEvent方法中
factory为:RegisterClientExecutorFactory,create方法创建出了RegisterClientConsumerExecutor对象,该对象实现了Runnable接口
线程池启动run方法
获取到了消息,并将消息,交由executorSubscriber.executor执行 - executorSubscriber.executor
至此,开始调用具体的逻辑执行方法,将数据发送至souladmin端
- 接下来是souladmin数据的接收
- 由于souladmin接收数据之后,也是使用disruptor进行后续逻辑处理的,所以先看下disruptor的初始化启动过程
- 在配置类RegisterCenterConfiguration类中
由SPI获取了具体的业务逻辑处理类,在此生成了一个RegisterServerDisruptorPublisher对象,并调用starter方法 - start方法
初始化一个注册服务器逻辑处理的工厂类,并加入订阅者,最后初始化一个DisruptorProviderManage对象,并调用startup方法 - 由于该方法,是soul-disrutpor module当中的,与客户端逻辑一致,所以,factory的create方法
由于发送过来的消息类型不只一种,所以订阅者都实现了ExecutorTypeSubscriber接口,将消费者按type进行分组,存到map当中,type为key,之后由消费者线程池进行调度,调用RegisterServerConsumerExecutor的run方法 - RegisterServerConsumerExecutor run方法
从map中获取对应的消息处理器,进行数据处理 - executor处理
这是其中一种处理器,之后进行数据入库,同步到网关等操作。 - 至此,服务端的disruptor也完成工作。
- 看一下整个的设计,
- QueueConsumerFactory<T> 接口,消费者工厂,用来创建消费者
- QueueConsumerExecutor<T> 消费者执行器,实现了Runnable接口,为消息指定具体的处理器
- QueueConsumer<T> 消费者 侦听消息的发布,并进行消息处理
- DataEvent<T> 侦听的消息类型
- DisruptorEventFactory<T> disruptor需要的消息工厂
- DisruptorProviderManage<T> disruptor的具体启动过程,初始化并管理disrutpor的生产者,消费者,数据工厂等
- DisruptorProvider<T> 生产者
- DisruptorThreadFactory 生产者线程池工厂
- souladmin、soulclient:DataTypeParent 通用消息接口,包含type,用来区别消息类型
- Souladmin:SoulServerRegisterPublisher 生产者,发布消息,关闭disruptor,还有案例中都用来开启disruptor
- Souladmin: ExecutorSubscriber<T> 消息执行器,具体的消息执行
- 模块封装
- soul-disruptor模块
- 定义了DisruptorProvider、DisruptorProviderManage、DataEvent、QueueConsumerFactory、DisrutporThreadFactory等一系列通用接口
- 该模块的搭建了一个disruptor的初始化框架
- DisruptorProviderManage提供Disruptor的初始化,可以在初始化是自定义参数,而初始化参数中,包含消费者工厂,初始化会将消费者工厂放置到QueueConsumer的成员变量当中,有QueueConsumer进行消息的侦听,一旦有消息,则由消费者工厂QueueConsumerFactory创建QueueConsumerExecutor进行消息的处理,QueueConsumerExecutor可以拿到消息,是具体的操作。而在DisruptorProviderManage对象中,成员变量provide是此次初始化的disruptor的生产者,由此provider进行消息的发布。
- 所以,这个模块是对disruptor的通用封装,可以使用任何类型的数据,外界使用该模块需要进行的操作是,继承QueueConsumerExecutor其executor方法用来写具体的逻辑操作,实现QueueConsumerFactory接口,用来创建自己的实现的QueueConsumerExecutor,将工厂类用做DisruptorProviderManage的构造参数,获得对象,之后调用DisruptorProviderManage对象的start方法进行disruptor的初始化,disruptor便启动了,启动之后,就可以正常使用disruptor了,之后发布消息,则使用DisruptorProviderManage对象获取provider,进行消息的发布和disruptor的关闭。
- soul-disruptor模块
- 传递的消息都实现了DataTypeParent接口,在客户端上传时,标记自己的消息是什么类型的,服务端接收的时候,在disruptor消息分发的时候,用来决定该分发给那个具体的消息执行器。
?