在上一篇文章中可能你的EventBus使用并不正确,是时候真正搞懂EventBus了(上),我们说了使用apt方式进行优化使用eventbus
,并对EventBus#register
和EventBus#unRegister
方法进行了说明。对这块内容不熟悉的,可以先看下在继续看下面的内容。
本节内容将讲解剩下的内容,包括了EventBus#post
、粘性事件
等。
1、EventBus#post(Object event)
之前说了,register的时候,会将查找到的添加的注解的方法进行查找,并保存。这节将讲解使用。
public void post(Object event) {
//获取PostingThreadState对象
PostingThreadState postingState = currentPostingThreadState.get();
//获取eventQueue,并将event设置进去
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
//判断当前的event是否被post
if (!postingState.isPosting) {
//设置当前post是否主线程
postingState.isMainThread = isMainThread();
//将任务标志为已经post
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//这里的eventQueue是一个List,
// 而我们知道List#remove方法是从队列中移除对象,并返回这个移除的对象
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
其实这里面核心代码主要就是postSingleEvent(eventQueue.remove(0), postingState)
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//eventInheritance 默认为true,表示会检索继承关系
if (eventInheritance) {
//根据event类对象查找所有的eventType类型(包含了所有的event的父类对象)
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//判断是否检索到eventType, 并发送post出去
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
//如果没有找到的话 && logNoSubscriberMessages == true, 则抛出异常,否则发送NoSubscriberEvent
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
代码逻辑很简单,使用postSingleEventForEventType
发送事件,并且发挥发送结果,如果发送结果返回false,则判断logNoSubscriberMessages
是否为true,true的话跑出没有找到注册方法的异常
接着看postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//还记得之前在EventBus.getDefault().register()执行到最后的subscribe方法吗,
// 将注册方法的event(也就是我们设置@Subscribe注解的方法参数)作为key,
// 将有优先级顺序的方法结合作为value,这边直接遍历执行,
//subscription里面之前就已经包含了method对象,直接invoke执行,就达到了调用消息传递的目标了
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
//这里是重点!!,还记得之前@Subscribe注解中定义了threadMode吗,这里就派上用场了
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
可以看到使用到了之前register
的时候的subscriptionsByEventType
,如果看过上篇文章,就知道它是一个以event
类方法为key,所有添加了注解参数方法中参数为event的方法集合为value的map集合。
然后进行遍历,调用postToSubscription(subscription, event, postingState.isMainThread);
触达事件发送。
再看postToSubscription(subscription, event, postingState.isMainThread);
方法实现。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据注解方法上的threadMode来区分使用那种poster发送event出去
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
//invokeSubscriber本身不关心线程,在当前线程直接发送
invokeSubscriber(subscription, event);
} else {
//使用handler切换到主线程执行
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这里其实核心用到了三个poster
和5种ThreadMode
首先看三种poster
-
mainThreadPoster
这个poster本身就比较简单了,它的createPoster(EventBus eventBus)
方法实际上就是HandlerPoster
,并且在创建HandlerPoster
的时候,使用的是MainLooper
,是不是秒懂了,其实就是不管你是在哪个线程发消息,最后都是从主线程出去的。 -
backgroundPoster
这个poster
是继承自Runnable
,在你每次send
的时候,将event
会添加到一个队列中,然后丢到线程池中调用执行。简单列一下核心代码。
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
//调用的时候,会塞到队列中
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
-
asyncPoster
这个poster
和上面的backgroudPoster
,核心思路是一样的,也都是通过线程池,只是区别是backgroundPoster
每次发送一条,会做线程同步和标志位判断,确保执行顺序。
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
其实除了上面的三种poster
,你可能还看到有这么一个方法invokeSubscriber(Subscription subscription, Object event)
void invokeSubscriber(Subscription subscription, Object event) {
try {
//有没有发现很简单,直接使用反射调用对应的方法,来实现通知的作用
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
其实上面的三种不管是同步、异步、还是主线程,最终执行都会是这个方法。
当搞明白了三种poster
和最终的通知方式之后,对于5中ThreadMode
就不难理解了。
5种ThreadMode
分别是
-
POSTING
在哪个线程发送,直接哪个线程接收 -
MAIN
不管是从哪个线程发送,最终都回到主线程使用 -
MAIN_ORDERED
主线程使用Handler顺序发送 -
BACKGROUND
不管是主线程还是子线程发送,最终都回到子线程使用 -
ASYNC
根本不判断线程,直接丢给子线程发送给,最终回到子线程使用
至此,相信你对于EventBus
中的从注册、反注册、发送的逻辑使用应该都很清除了吧。下面还有一个(postSticky(Object event)
)没有说到,当然工作中好像不是太常用,但是理解学习下并没有啥坏处是不是。