Google的Guava包下的EventBus源码解析

EventBus解析

1、EventBus的构造方法

  • 使用EventBus作为具体实现类
  • 使用AsyncEventBus作为实现类

(1)使用EventBus作为实现类,其构造方法有:

public EventBus() {
        this("default");
    }

public EventBus(String identifier) {
        this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
}

public EventBus(SubscriberExceptionHandler exceptionHandler) {
        this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}

(2)使用AsyncEventBus作为实现类,其构造方法为:

public AsyncEventBus(String identifier, Executor executor) {
        super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
    }
    public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
        super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
    }
    public AsyncEventBus(Executor executor) {
        super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
    }

统一调用的构造方法为:

EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
        this.subscribers = new SubscriberRegistry(this);
        this.identifier = (String)Preconditions.checkNotNull(identifier);
        this.executor = (Executor)Preconditions.checkNotNull(executor);
        this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
        this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
    }

参数的意义分别是:

identifier:类似当前EventBus对象的别名,可以描述该EventBus的用途,默认为“default”

executor:使用异步执行时传入的自定义线程池

dispatcher:指定分发消息的模式

exceptionHandler:处理订阅消息异常的方法

subscribers:注册订阅者的类

2、两种实现方式创建对象时的区别

(1)EventBus实现:

  • 默认使用default作为identifier,
  • 执行器使用MoreExecutors.directExecutor()
  • dispatcher:使用Dispatcher.perThreadDispatchQueue()队列
  • exceptionHandler:默认使用EventBus提供的LoggingHandler.INSTANCE,如果有传入参数,就是用参数

(2)AsyncEventBus实现:

  • 默认使用default作为identifier
  • 执行器使用自定义的对象
  • dispatcher:使用Dispatcher.legacyAsync()类型
  • exceptionHandler:默认使用LoggingHandler.INSTANCE,如果有传入参数,就是用参数

(3)Dispatcher调度器:

eventbus包下的Dispatcher类提供了三种类型的调度器,分别为:

  • PerThreadQueuedDispatcher
  • LegacyAsyncDispatcher
  • ImmediateDispatcher

(4)Executor执行器

  • EventBus默认提供的是DirectExecutor,单线程的执行器

Google的Guava包下的EventBus源码解析

3、注册对象到EventBus

3.1、注册对象

Google的Guava包下的EventBus源码解析

使用this.subscribers对象的register方法注册,此处的subscribers对象为SubscriberRegistry类。

3.2、进入SubscriberRegistry类,register方法

Google的Guava包下的EventBus源码解析

3.2.1、查找所有订阅的方法

使用Muitmap集合存储一个对象下有哪些方法订阅了,具体实现findAllSubscribers方法,该方法内部如下:

Google的Guava包下的EventBus源码解析

(1)该方法内部使用getAnnotatedMethods方法获取clazz及其多级父类以及实现的接口中所有方法上有@Subscribe注解的方法。方法具体实现如下:

Google的Guava包下的EventBus源码解析

private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    //获取到传递的class对象的类以及父类以及实现的接口
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    //创建一个Map集合
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    //遍历得到的class对象
    for (Class<?> supertype : supertypes) {
        //获取class对象的所有方法
      for (Method method : supertype.getDeclaredMethods()) {
          //如果方法上有Subscribe注解,并且isSynthetic表示方法不是由java编译器生成的
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
          // 获取该方法的参数类型
          Class<?>[] parameterTypes = method.getParameterTypes();
           //检查方法的参数只能是1个
          checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters."
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);
		//根据方法创建MethodIdentifier对象,其中包含方法名、方法的参数类型
          MethodIdentifier ident = new MethodIdentifier(method);
            //如果map集合中不包含该对象,就将ident和method对象存储到identifiers的map集合中
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    //返回map集合中的方法
    return ImmutableList.copyOf(identifiers.values());
  }

(2)遍历该对象中添加了@Subscribe注解的方法集合

Google的Guava包下的EventBus源码解析

(3)获取该方法的参数类型,将第0个参数类型赋值给eventType

再获取eventType时,会查找监听对象的父类以及接口,查看有没有订阅方法。

(4)将参数类型作为key,订阅者作为value,存储到Multimap集合中。value中包含被监听的bus,被监听的bus中的执行器,监听该bus的对象listener,以及监听参数类型的方法。

Google的Guava包下的EventBus源码解析

会去检查订阅方法上有没有注解AllowConcurrentEvents,如果有该注解,在使用create方法创建订阅者对象时,订阅者对象使用Subcriber,如果没有注解使用SynchronizedSubscriber对象。这两种对象在真正分发事件时区别才会体现出来。

示例如下:说明objA中的三个方法都有@Subcribe注解

Google的Guava包下的EventBus源码解析

对象继承情况:

Google的Guava包下的EventBus源码解析

对象实现接口,接口内使用JDK8提供的default来实现方法,并添加注解,也可以订阅。

Google的Guava包下的EventBus源码解析

(5)获取到该对象的所偶遇订阅者后,返回Multimap集合

3.2.2、将获取到的订阅方法进行缓存

Google的Guava包下的EventBus源码解析

(1)遍历得到的Multimap集合

  • 获取Key值,也就是该对象内部订阅方法的参数类型
  • 获取订阅者
  • 根据参数类型,获取全局变量subscribers中已有的所有订阅者

全局变量:

Google的Guava包下的EventBus源码解析

  • 判断订阅者set集合如果为空,创建CopyOnWriteArraySet集合,然后使用subscribers.putIfAbsent将订阅类型(参数类型)和set集合存储进去。使用MoreObjects的方法校验第一个参数返回如果是null,就是用第二个参数,如果第二个newSet参数还是null,就会报空指针异常,会返回一个空的set集合。
  • 将该订阅类型对应的订阅者保存到eventSubscribers中,也就是保存到了全局变量subscribers中。

4、取消对象注册到EventBus

4.1、取消注册对象

Google的Guava包下的EventBus源码解析

当前EventBus对象中的subscribers是SubscriberRegistry类的对象,执行该类中的unregister方法

4.2、进入SubscriberRegistry类,unregister方法

Google的Guava包下的EventBus源码解析

4.2.1、获取该对象中的订阅方法

使用findAllSubscribers方法,与注册对象中的方法相同,都是查找该类以及其多级父类中的订阅类型和方法。

4.2.2、遍历获取到的订阅方法

(1)获取到集合中的key,也就是该对象中订阅的类型(方法上的参数类型)

(2)获取到集合中key对应的value,也就是该对象中的订阅方法。

(3)获取到全局变量subscribers中缓存的数据,赋值给currentSubscribers

(4)如果currentSubscribers为null,则抛出异常,如果移除该对象中的所有订阅者返回结果为false,也抛出异常,如果为true,则正常移除。

5、发送消息

5.1、EventBus中的post方法

Google的Guava包下的EventBus源码解析

5.1.1、获取该消息关联的类型所有订阅者

通过subscribers(SubscriberRegistry类)中的全局变量获取订阅该事件以及该事件父类和接口的所有方法。

Google的Guava包下的EventBus源码解析

获取传递的event事件的类、父类以及实现的接口。

例如:如果发送的是MsgA这个消息,那么就会找到Msg类,获取到的集合中就包含了MsgA类型和Msg类型,所有订阅了这两个类型的都会接收到该消息。接口也是一样的(MsgA和MsgB都实现了Msg接口)。

Google的Guava包下的EventBus源码解析

下图中就描述了发送的消息在实现了接口的情况。

Google的Guava包下的EventBus源码解析

如图中所示,MsgA和MsgB实现了Msg接口,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。

下图中表示发送的消息有父类的情况:

Google的Guava包下的EventBus源码解析

如图中所示,MsgA和MsgB继承了Msg类,如果有订阅者订阅的类型(参数类型)是Msg,那么发送的时候不管发送MsgA还是MsgB,订阅Msg的方法都可以接收到。如果发送的是Msg类,那就只有订阅了Msg类的方法可以接收到。

总结:发送消息的时候,会查找该消息的父类以及接口,查看有没有订阅的方法,如果有就会都发送一次。

5.1.2、使用dispacther向订阅者发送

Google的Guava包下的EventBus源码解析

根据构造方法调度器使用了两种类型,分别是:

  • PerThreadQueuedDispatcher
  • LegacyAsyncDispatcher
(1)默认PerThreadQueuedDispatcher调度器的实现

Google的Guava包下的EventBus源码解析

  • 获取ThreadLocal中创建的Queue队列,如果已创建,就会获取当前线程对应的Queue队列,初始化一个ArrayDeque队列,ArrayDeque是一个双端队列,即可以实现队列的先进先出,也可以实现栈的先进后出。它是线程不安全的,而且不允许有null值。它是可以自动扩容的循环数组,每次扩容都是2的n次方,初始大小为16.

Google的Guava包下的EventBus源码解析

  • 通过offer将事件以及订阅者存储到队列尾部
  • 如果dispatching返回为false,说明没有在分发事件,将dispatching设置为true,表示正在分发事件
  • 循环获取队列头部的事件(先进先出),然后再循环获取事件对应的订阅者,通过订阅者Subscriber对象的dispatchEvent方法发送event事件

Google的Guava包下的EventBus源码解析

  • 使用executor执行器通过反射去执行监听的方法。默认执行器为MoreExecutors.directExecutor(),直接发送。
  • 订阅者对象中包含了(注册的对象实例target,对象实例中监听的方法method),通过反射去invoke,因为订阅者对象根据订阅方法上有没有添加AllowConcurrentEvents注解分为两种,SynchronizedSubscriber和Subscriber,前者是后者的子类,重写了方法invokeSubscriberMethod方法。
  • 在执行时,如果订阅方法标注了AllowConcurrentEvents注解,使用Subscriber中的方法,如果没有标注注解,则使用SynchronizedSubscriber中的invokeSubscriberMethod。下图为Subscriber的方法

Google的Guava包下的EventBus源码解析

下图为SynchronizedSubscriber中的方法,在原有基础上,使用synchronized锁住该对象,然后去执行。

Google的Guava包下的EventBus源码解析

两者的区别就是:如果添加了注解,那就直接使用Subscriber中的方法,如果没有添加注解,则使用加锁的方法去执行。

(2)LegacyAsyncDispatcher调度器的实现

Google的Guava包下的EventBus源码解析

  • 在初始化时,创建了一个ConcurrentLinkedQueue队列,内部存储EventWithSubscriber对象。
  • 在dispatch方法中,遍历传递的订阅者,使用event和订阅者构建EventWithSubscriber对象,存储到集合queue的尾部。
  • 循环取队列头部的对象,使用订阅者的dispatchEvent方法发送事件event。
  • 发送事件时,根据注册时检测到的是否添加注解,分为加锁执行方法和不加锁执行方法,内部逻辑与PerThreadQueuedDispatcher相同。

5.1.3、post->DeadEvent

在缓存中找不到订阅者并且它本身不是一个DeadEvent事件时,就会发送一个DeadEvent。如果找不到DeadEvent事件的订阅者,就会不进行处理。

6、流程总结

(1)两种创建方式对比

类型 EventBus AsyncEventBus
identifier 默认“default” 默认“default”
executor DirectExecutor 自定义
dispatcher PerThreadQueuedDispatcher LegacyAsyncDispatcher
subscribers SubscriberRegistry SubscriberRegistry
exceptionHandler 默认LoggingHandler 默认LoggingHandler

两种方式的相同点:

  • 注册订阅者和取消订阅者逻辑都是相同的
  • 根据订阅方法有没有添加注解决定执行订阅方法的方式(加锁或者不加锁)

两种方式的区别:

  • EventBus使用默认的DirectExecutor,内部使用单线程去执行任务
  • AsyncEventBus使用传递的Executor,去执行,如果传递的是Single线程,那和EventBus就没什么区别。
  • EventBus的分发模式使用的是ArrayDeque双端队列,先存入然后再取出执行,不是线程安全的集合,通过ThredLocal来在线程内部维护ArrayDeque队列。
  • AsyncEventBus使用的是ConcurrentLinkedQueue,同样是先存入然后取出,支持多线程同时访问。

(2)方法执行流程

Google的Guava包下的EventBus源码解析

上一篇:微服务-基于CAP,EventBus的分布式事务(7)


下一篇:企业级项目实战讲解!大厂经典高频面试题体系化集合,最强技术实现