前言
相信大家已经非常熟练的使用EventBus了,简单的说EventBus是一个Android事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事件传递,是个典型的观察者模式,那么是什么是观察者模式,有个很形象的比喻:西游记中各路神仙一挥手,自己的坐骑就现出原形了,那么这些妖怪就是观察者,当他们观察到神仙挥手时就会现出原形。本文源码基于EventBus3.0。
一、获取EventBus实例
EventBus.getDefault().register(this);
使用前我们要进行注册,EventBus.getDefault().register(this);首先我们来看下getDefault方法
/** Convenience singleton for apps using a process-wide EventBus instance. */ public static EventBus getDefault() { if (defaultInstance == null) { synchronized (EventBus.class) { if (defaultInstance == null) { defaultInstance = new EventBus(); } } } return defaultInstance; }
我们可以看出这是一个典型的双重锁校验的单例模式,如果不存在则新建,接着我们来看EventBus的构造方法
/** * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a * central bus, consider {@link #getDefault()}. */ public EventBus() { this(DEFAULT_BUILDER); }
看到这里有没有疑惑的地方?既然是单例模式为什么构造方法不是private而是public,这是因为EventBus可能有多条总线,订阅者注册到不同线上的 EventBus,通过不同的实例来发送数据,不同的 EventBus 是相互隔离开的,订阅者都只会收到注册到该线上事件。
构造方法中只有一个DEFAULT_BUILDER,接着我们来看DEFAULT_BUILDER
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
我们可以看出EventBus是通过一个EventBusBuilder的实例来构建的
EventBus(EventBusBuilder builder) { subscriptionsByEventType = new HashMap<>(); typesBySubscriber = new HashMap<>(); stickyEvents = new ConcurrentHashMap<>(); mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); backgroundPoster = new BackgroundPoster(this); asyncPoster = new AsyncPoster(this); indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0; subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); logSubscriberExceptions = builder.logSubscriberExceptions; logNoSubscriberMessages = builder.logNoSubscriberMessages; sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; sendNoSubscriberEvent = builder.sendNoSubscriberEvent; throwSubscriberException = builder.throwSubscriberException; eventInheritance = builder.eventInheritance; executorService = builder.executorService; }
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
前三行是三个hashMap但有所不同:
subscriptionsByEventType 对应的hashMap是:
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
它的key是订阅事件,value是所有订阅者集合。当我们发送Event事件的时候都是从这个集合中去寻找,
typesBySubscriber 对应的hashMap是:
private final Map<Object, List<Class<?>>> typesBySubscriber;
它的key是订阅者对象,value是这个订阅者订阅的所有事件集合。当我们注册或者反注册的时候都是操作这个集合然后操作subscriptionsByEventType。
stickyEvents 对应的hashMap是:
private final Map<Class<?>, Object> stickyEvents;
它的key是粘性事件的class对象,value是粘性事件对象。接着我们看后面三行代码:
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
mainThreadPoster
我们知道,在此之前我们线程间通信或者传值的话 都是使用Hanlder,而mainThreadPoster 的本质就是一个Hanlder,我们点击mainThreadPoster 进去看
既然是hanlder所以我们主要来看它的hanleMessage方法
@Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } }
从队列中不断的去处pendingPost,使用 eventBus.invokeSubscriber(pendingPost)进行事件分发,而pendingPost是一个可复用对象的复用池,通过obtainPendingPost方法进行复用,releasePendingPost方法进行回收,这个HandlerPoster是运行在主线程中的,因为初始化的时候有个 Looper.getMainLooper()。
pendingPost主要有三个对象:事件、订阅和一个节点
backgroundPoster
backgroundPoster本质是一个Runnable,主要在后台处理事件,所以我们主要看它的run方法
public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }
和mainThreadPoster是一样的,从复用池中取出,如果队列为空就间隔1秒再取,然后调用invokeSubscriber方法进行分发
asyncPoster
asyncPoster的本质也是一个Runnable
@Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); }
但是每次只取一个pendingPost。不论发布线程是否为主线程,都使用一个空闲线程来处理。Async类的所有线程是相互独立的,因此不会出现卡线程的问题。
说到这里这三个poster都是负责线程调度的,最后都调用invokeSubscriber进行事件分发,那么我们有必要来来看下这个invokeSubscriber方法。
invokeSubscriber
/** * Invokes the subscriber if the subscriptions is still active. Skipping subscriptions prevents race conditions * between {@link #unregister(Object)} and event delivery. Otherwise the event might be delivered after the * subscriber unregistered. This is particularly important for main thread delivery and registrations bound to the * live cycle of an Activity or Fragment. */ void invokeSubscriber(PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } }
void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
这样就比较一目了然了,从peningPost中得到事件和订阅,通过反射调用了订阅者的订阅函数并把event对象作为参数传入。
二、注册事件
/** * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they * are no longer interested in receiving events. * <p/> * Subscribers have event handling methods that must be annotated by {@link Subscribe}. * The {@link Subscribe} annotation also allows configuration like {@link * ThreadMode} and priority. */ public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
我们可以看到这个注册方法
首先获取订阅者的class对象,通过subscriberMethodFinder寻找这个class对象的所有订阅方法集合SubscriberMethod,SubscriberMethod,而SubscriberMethod中包含了相应的线程、Method对象、事件类型、优先级记忆是否是粘性事件等。
然后通过 subscribe(subscriber, subscriberMethod);订阅事件,而subscriberMethod对象是通过subscriberMethodFinder获取的,所以我们先来看下subscriberMethodFinder的实现
subscriberMethodFinder
subscriberMethodFinder类是用来查找和缓存订阅者响应函数的信息的类,那么我们如何获取订阅者响应函数的信息,这里就要提到APT运行时注解了,当然我们这里不对APT进行介绍,EventBus中是通过@Subscribe()注解来获取的,我们来看下@Subscribe()
我们可以看到默认粘性事件为false,retention生命是运行时注解,相比反射而言效率更高,如果你想了解APT注解是什么,这里推荐一篇不错的博文。【Android】APT - 泡在网上的日子,如果这篇博文作者看到可以联系我,如有侵权可删除。
言归正传我们来看subscriberMethodFinder方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
从METHOD_CACHE取看是否有缓存,key是保存订阅类的类名,value是保存类中订阅的方法数据,如果忽略注解器生成的MyEventBusIndex类就调用反射来获取订阅类中的订阅方法信息,如果没有忽略就从注解生成的是否忽略注解器生成的MyEventBusIndex类中来获取订阅类中的订阅方法信息。
findUsingReflection
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { //通过反射来获得订阅方法信息 findUsingReflectionInSingleClass(findState); //查找父类的订阅方法 findState.moveToSuperclass(); } //返回订阅方法集合 return getMethodsAndRelease(findState); }
findUsingReflectionInSingleClass
private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; //反射得到方法数组 try { // This is faster than getMethods, especially when subscribers are fat classes like Activities methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } //遍历Method for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); //保证必须只有一个事件参数 if (parameterTypes.length == 1) { //得到注解 Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { Class<?> eventType = parameterTypes[0]; //校验是否添加该方法 if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); //实例化SubscriberMethod对象并添加 findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } }
findUsingInfo
findUsingInfo是通过运行时注解生成的MyEventBusIndex类来获取订阅方法信息
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { //得到订阅者信息 findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null) { //获取方法数组 SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { //检验是否添加 if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } //到父类中查找 findState.moveToSuperclass(); } return getMethodsAndRelease(findState); }
getSubscriberInfo
private SubscriberInfo getSubscriberInfo(FindState findState) { //判断FindState对象中是否有缓存的订阅方法 if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) { SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo(); if (findState.clazz == superclassInfo.getSubscriberClass()) { return superclassInfo; } } //从注解器生成的MyEventBusIndex类中获得订阅类的订阅方法信息 if (subscriberInfoIndexes != null) { for (SubscriberInfoIndex index : subscriberInfoIndexes) { SubscriberInfo info = index.getSubscriberInfo(findState.clazz); if (info != null) { return info; } } } return null; }
这样,订阅类的所有SubscriberMethod都已经被保存了,最后再通过getMethodsAndRelease()返回List<SubscriberMethod>。
然后我们再回到注册中的subscribe(subscriber, subscriberMethod);方法,方法代码如下:
//必须在同步代码块里调用 private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { //获取订阅的事件类型 Class<?> eventType = subscriberMethod.eventType; //创建Subscription对象 Subscription newSubscription = new Subscription(subscriber, subscriberMethod); //从subscriptionsByEventType里检查是否已经添加过该Subscription,如果添加过就抛出异常 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } //根据优先级priority来添加Subscription对象 int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } //将订阅者对象以及订阅的事件保存到typesBySubscriber里. List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); //如果接收sticky事件,立即分发sticky事件 if (subscriberMethod.sticky) { //eventInheritance 表示是否分发订阅了响应事件类父类事件的方法 if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
三、发送事件
EventBus是通过post()方法进行事件发送的,接下来我们来看post方法
/** Posts the given event to the event bus. */ public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
首先通过 PostingThreadState postingState = currentPostingThreadState.get();来获取当前线程的状态
currentPostingThreadState代码如下:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue() { return new PostingThreadState(); } };
ThreadLocal概念引用:是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,而这段数据是不会与其他线程共享的。其内部原理是通过生成一个它包裹的泛型对象的数组,在不同的线程会有不同的数组索引值,通过这样就可以做到每个线程通过get() 方法获取的时候,取到的只能是自己线程所对应的数据。 所以这里取到的就是每个线程的PostingThreadState状态,然后得到当前事件的队列,最后调用postSingleEvent方法来发送事件
postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
首先得到事件类型对应的class类,查找eventClass类所有的父类以及接口如果没找到则会打印日志,存在最终是调用postSingleEventForEventType方法进行分发。
postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
首先从subscriptionsByEventType里获得所有订阅了这个事件的Subscription列表,然后在通过postToSubscription()方法来分发
事件,在postToSubscription()通过不同的threadMode在不同的线程里invoke()订阅者的方法。
ThreadMode:
public enum ThreadMode { /** * Subscriber will be called in the same thread, which is posting the event. This is the default. Event delivery * implies the least overhead because it avoids thread switching completely. Thus this is the recommended mode for * simple tasks that are known to complete is a very short time without requiring the main thread. Event handlers * using this mode must return quickly to avoid blocking the posting thread, which may be the main thread. */ POSTING, /** * Subscriber will be called in Android's main thread (sometimes referred to as UI thread). If the posting thread is * the main thread, event handler methods will be called directly. Event handlers using this mode must return * quickly to avoid blocking the main thread. */ MAIN, /** * Subscriber will be called in a background thread. If posting thread is not the main thread, event handler methods * will be called directly in the posting thread. If the posting thread is the main thread, EventBus uses a single * background thread, that will deliver all its events sequentially. Event handlers using this mode should try to * return quickly to avoid blocking the background thread. */ BACKGROUND, /** * Event handler methods are called in a separate thread. This is always independent from the posting thread and the * main thread. Posting events never wait for event handler methods using this mode. Event handler methods should * use this mode if their execution might take some time, e.g. for network access. Avoid triggering a large number * of long running asynchronous handler methods at the same time to limit the number of concurrent threads. EventBus * uses a thread pool to efficiently reuse threads from completed asynchronous event handler notifications. */ ASYNC }
ThreadMode一共有四类(引用自EventBus 源码解析 EventBus 是一个 Android 事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事件传递,这里的事件可以理解 @codeKK 开源项目源码分析站)
PostThread:默认的 ThreadMode,表示在执行 Post 操作的线程直接调用订阅者的事件响应方法,不论该线程是否为主线程(UI 线程)。当该线程为主线程时,响应方法中不能有耗时操作,否则有卡主线程的风险。适用场景:对于是否在主线程执行无要求,但若 Post 线程为主线程,不能耗时的操作;
MainThread:在主线程中执行响应方法。如果发布线程就是主线程,则直接调用订阅者的事件响应方法,否则通过主线程的 Handler 发送消息在主线程中处理——调用订阅者的事件响应函数。显然,MainThread类的方法也不能有耗时操作,以避免卡主线程。适用场景:必须在主线程执行的操作;
BackgroundThread:在后台线程中执行响应方法。如果发布线程不是主线程,则直接调用订阅者的事件响应函数,否则启动唯一的后台线程去处理。由于后台线程是唯一的,当事件超过一个的时候,它们会被放在队列中依次执行,因此该类响应方法虽然没有PostThread类和MainThread类方法对性能敏感,但最好不要有重度耗时的操作或太频繁的轻度耗时操作,以造成其他操作等待。适用场景:操作轻微耗时且不会过于频繁,即一般的耗时操作都可以放在这里;
Async:不论发布线程是否为主线程,都使用一个空闲线程来处理。和BackgroundThread不同的是,Async类的所有线程是相互独立的,因此不会出现卡线程的问题。适用场景:长耗时操作,例如网络访问。
这篇文章参考了许多前辈的文章,可能没有一一指出,希望包含,另外,前段时间创建了和微信公众号同名的“代码男人QQ技术交流群”,欢迎大家加入,目前就我自己一个人,不要水军,致力做一个真正的技术交流群。微信公众号和QQ群二维码在博客左侧栏。