EventBus

EventBus配置

android {
    defaultConfig {
        javaCompileOptions {
            annotationProcessorOptions {
                // 生成的Index类的名称
                arguments = [ eventBusIndex : 'com.example.myapp.MyEventBusIndex' ]
            }
        }
    }
}

dependencies {
    def eventbus_version = '3.2.0'
    implementation "org.greenrobot:eventbus:$eventbus_version"
    annotationProcessor "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
}

EventBus使用流程

  • 定义Event事件。类型对应下文的eventType
  • 使用@Subscribe注解方法。方法的入参是上一步中定义的Event。方法只允许有一个Event入参。
  • register注册。
  • post发送事件消息,入参是Event对象。@Subscribe注解过的方法就会接收到事件了。
  • unregister解除注册。

EventBus中的数据结构

Map变量:subscriptionsByEventType

  • Map 的key值是class类型,value是一个list同步写列表。
  • 一个key代表一个class类型,是Subscribe注解的函数的参数class类型,是event类型,即eventType。
  • value是一个list,包含了该class中的所有Subscribe注解的方法。这个list是按照优先级priority顺序排列的,priority越大,优先级高,在列表的前面。
  • 一个key-value键值对 表示监听一个特定event类型事件的所有method方法。post方法传入的也是event类型。
//eventType : Subscription
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;

Map变量:typesBySubscriber

  • Map的key值是subscriber对象实例,value值是一个普通list列表,存储class对象实例。list列表中存储的是eventType的类型。
  • key-value键值对表示的是一个subscriber对象对应的所有的eventType类型。
private final Map<Object, List<Class<?>>> typesBySubscriber;

Subscription.java

  • Subscription 单词是订阅,观察的意思,是一个观察者。
  • 这个类代表一个观察者。该类持有一个对象实例subscriber 和 实例所在类的一个消息处理的方法(用Subscribe注解的方法)。
final class Subscription {
    final Object subscriber; // Object对象,反射invoke方法时传入
    final SubscriberMethod subscriberMethod; // 一个使用Subscribe注解标记的方法
}

SubscriberMethod.java

  • 是一个bean类型的方法。
  • 注解方法解析后的封装对象。封装了一个使用 Subscribe注解的方法。可以用来处理消息。
final Method method; //经过注册解析得到的函数方法,用于处理post的事件。
final ThreadMode threadMode; // 线程模式
final Class<?> eventType; // event 事件类型。注解方法的入参类型,post函数的入参类型
final int priority; // 优先级,优先级越高,越先收到消息通知。
final boolean sticky; // 是否支持粘性方法。

PendingPost.java

  • Post事件对象,采用了对象池。

PendingPostQueue.java

  • 链表实现的一个简单的事件队列

注解相关

@Subscribe注解

  • Subscribe注解 表示一个方法注册监听消息事件

ThreadMode

  • POSTING 默认模式。直接在本线程调用执行其他对象注册的函数方法。
  • MAIN APP主函数调用,应该避免阻塞操作。如果当前线程是主函数,那么会直接调用。如果不是主线程,那么会先通过handler抛到主函数的事件队列中处理。
  • MAIN_ORDERED 无论当前是否是主线程,直接将事件抛到主函数事件队列,通过队列去执行。
  • BACKGROUND 如果当前是主线程,直接抛到一个后台线程队列去处理;如果当前不是主线程,那么直接在当前线程,调用注册的函数方法。 注意,同一时间,只会有一个后台线程出来
  • ASYNC 所有消息都会抛到一个独立的线程处理。适合耗时操作。执行线程从线程池中获取。EventBus会采用线程池,动态的扩展线程。默认采用的线程池是newCachedThreadPool.
    避免同时提交过多的耗时操作,导致线程池创建过多的线程。
    Avoid triggering a large number of long running asynchronous subscriber methods at the same time to limit the number of concurrent threads
  • BACKGROUND 和 ASYNC的区别:BACKGROUND 的事件,同一时刻,所有事件会发送到一个线程队列中,串行处理。ASYNC则不同,每个事件都会单独占用一个线程处理。
    org.greenrobot.eventbus.EventBusBuilder
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

/**
 * Each subscriber method has a thread mode, which determines in which thread the method is to be called by EventBus.
 * EventBus takes care of threading independently from the posting thread.
 *
 * @see EventBus#register(Object)
 * @author Markus
 */
public enum ThreadMode {
    /**
     * Subscriber will be called directly in the same thread, which is posting the event. This is the default. Event delivery
     * implies the least overhead because it avoids thread switching completely. Thus this is the recommended mode for
     * simple tasks that are known to complete in a very short time without requiring the main thread. Event handlers
     * using this mode must return quickly to avoid blocking the posting thread, which may be the main thread.
     */
    POSTING,

    /**
     * On Android, subscriber will be called in Android's main thread (UI thread). If the posting thread is
     * the main thread, subscriber methods will be called directly, blocking the posting thread. Otherwise the event
     * is queued for delivery (non-blocking). Subscribers using this mode must return quickly to avoid blocking the main thread.
     * If not on Android, behaves the same as {@link #POSTING}.
     */
    MAIN,

    /**
     * On Android, subscriber will be called in Android's main thread (UI thread). Different from {@link #MAIN},
     * the event will always be queued for delivery. This ensures that the post call is non-blocking.
     */
    MAIN_ORDERED,

    /**
     * On Android, subscriber will be called in a background thread. If posting thread is not the main thread, subscriber methods
     * will be called directly in the posting thread. If the posting thread is the main thread, EventBus uses a single
     * background thread, that will deliver all its events sequentially. Subscribers using this mode should try to
     * return quickly to avoid blocking the background thread. If not on Android, always uses a background thread.
     */
    BACKGROUND,

    /**
     * Subscriber will be called in a separate thread. This is always independent from the posting thread and the
     * main thread. Posting events never wait for subscriber methods using this mode. Subscriber methods should
     * use this mode if their execution might take some time, e.g. for network access. Avoid triggering a large number
     * of long running asynchronous subscriber methods at the same time to limit the number of concurrent threads. EventBus
     * uses a thread pool to efficiently reuse threads from completed asynchronous subscriber notifications.
     */
    ASYNC
}

初始化注册流程

  • register注册。 传入一个subscriber对象实例。
  • findSubscriberMethods 解析注解,寻找所有 subscriber对象所有带有@Subscribe的方法。并封装成为SubscriberMethod对象,返回method列表。
  • subscribe注册监听。把解析出的方法添加到Map数据结构里面,方便后续post消息时,调用处理方法。

register 方法

  • 相当于观察者模式的注册监听。主要是用来触发对Subscribe注解的函数的解析。
  • 入参subscriber表示的class类需要含有@Subscribe注解方法。表示该对象监听和处理post消息。
/**
 * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
 * are no longer interested in receiving events.
 * <p/>
 * Subscribers have event handling methods that must be annotated by {@link Subscribe}.
 * The {@link Subscribe} annotation also allows configuration like {@link
 * ThreadMode} and priority.
 */
public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    // 解析类的所有方法,找到有注解的方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod); //将解析的方法,添加到map数据结构中去
        }
    }
}

findSubscriberMethods,解析@Subscribe解方法

  • 入参是register注册函数传入的subscriber的class对象类型。
  • 如果找不到注解函数,会抛出异常。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); //查缓存
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    if (ignoreGeneratedIndex) { // 是否忽略通过预编译生成的Index查找Method额方式,默认值false
        subscriberMethods = findUsingReflection(subscriberClass); //直接通过反射查找类
    } else {
        subscriberMethods = findUsingInfo(subscriberClass); // 查找注解方法
    }
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    FindState findState = prepareFindState(); // 初始化了一个对象池,对象池大小4
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) { // while循环遍历父类
        // 通过注解解析器在编译期间生成注解,避免通过反射查找,来加快查找速度。
        // 可以参考 SimpleSubscriberInfo
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                // 校验正确性。这个函数里面限制了:如果父类和子类同时注解了一个同一个方法,那么只有子类生效
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            findUsingReflectionInSingleClass(findState); // 通过反射来查找Method
        }
        findState.moveToSuperclass(); //向上查找父类。此处通过类的包名过滤,忽略了Android的系统类,java的系统类,提升查找速度。
    }
    return getMethodsAndRelease(findState);
}
private static final int BRIDGE = 0x40; //桥接方法
private static final int SYNTHETIC = 0x1000; // 编译合成方法,编译器生成的方法
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;

// 使用反射解析一个类
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // 省略 catch代码....
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        // 只解析public方法,并且忽略抽象、静态方法,忽略编译器生成的方法
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            Class<?>[] parameterTypes = method.getParameterTypes(); // 获取参数类型列表
            if (parameterTypes.length == 1) { // 只允许一个参数
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    // 校验是否要添加这个方法。当子类和父类重写一个方法时,只添加子类的方法。
                    if (findState.checkAdd(method, eventType)) {
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        // 构造 SubscriberMethod,添加到list列表
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            // 在严格校验模式下,如果方法具有多个参数,会抛出异常
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        // strictMethodVerification 是否启用严格校验,默认值false。
        // 在严格校验的情况下,如果错误的使用Subscribe注解方法,就会抛出异常
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

post 发送消息

ThreadLocal对象: currentPostingThreadState

  • 每个线程内都会有一个ThreadLocal对象,类型PostingThreadState。每个线程内部都有一个事件队列。
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = 
      new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

post函数

/** Posts the given event to the event bus. */
public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get(); // ThreadLocal对象初始化
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event); //post的事件添加到队列中

    if (!postingState.isPosting) { // 如果当前正在对外发送消息。
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) { // 是否考虑父类, 默认为true
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        // 发送事件
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
      // 省略.... 没找到事件的异常处理
    }
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        subscriptions = subscriptionsByEventType.get(eventClass); // 从map中获取所有监听event type类型的订阅者
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) { // for循环进行遍历操作
            postingState.event = event;
            postingState.subscription = subscription; // 订阅者赋值
            boolean aborted;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}
// 根据不同的threadMode,通过不同的线程进行处理
// 可以参考对ThreadMode的注解
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event); // 直接调用
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event); // 主线程直接调用
            } else {
                mainThreadPoster.enqueue(subscription, event); // 非主线程送入队列调用
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) { // Android场景下,这个变量一直非空
                mainThreadPoster.enqueue(subscription, event); // 送到主线程的队列调用
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event); //如果在主线程,需要切换后台线程执行
            } else {
                invokeSubscriber(subscription, event); //如果当前在后台,直接调用方法
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event); // 所有事件,异步执行
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}
/**
 * Invokes the subscriber if the subscriptions is still active. Skipping subscriptions prevents race conditions
 * between {@link #unregister(Object)} and event delivery. Otherwise the event might be delivered after the
 * subscriber unregistered. This is particularly important for main thread delivery and registrations bound to the
 * live cycle of an Activity or Fragment.
 */
void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {
        invokeSubscriber(subscription, event);
    }
}

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event); // 通过反射调用函数接口
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

几种poster

  • MainThreadSupport、HandlerPoster 主线程poster,传入主线程的looper作为参数。
  • BackgroundPoster 后台poster,所有事件顺序进入队列执行。
  • AsyncPoster 异步poster,每个事件在单独的线程中执行。

EventBusAnnotationProcessor 注解解析器

// Build the project at least once to generate the index class specified with eventBusIndex.
// Then, e.g. in your Application class, use EventBus.builder().addIndex(indexInstance) to pass an instance of the index class to EventBus.
EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();
// Use EventBusBuilder.installDefaultEventBus() to set the EventBus with index as the instance returned by EventBus.getDefault().
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
// Now the default instance uses the given index. Use it like this:
EventBus eventBus = EventBus.getDefault();

参考资料

上一篇:Spring Data JPA 从入门到精通~@NamedQueries预定义查询


下一篇:Qt 按钮随机移动并触发信号