Dispatcher

Dispatcher是guava EventBus的事件分发器.

Dispatcher是抽象类, 抽象方法: abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
定义了3个内部类: ImmediateDispatcher LegacyAsyncDispatcher PerThreadQueuedDispatcher.
这3个类是Dispatcher的实现.

LegacyAsyncDispatcher现在用PerThreadQueuedDispatcher代替

Subscriber包括EventBus, Object target, Method method, Executor executor.

   final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event); // 调用method, method.invoke(target, event)
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}

Subscriber的dispatchEvent()

  @Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}

ImmediateDispatcher的dispatch()

  void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get(); // Queues.newArrayDeque()
queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) {
dispatching.set(true); // 是线程安全的
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
// 清除
dispatching.remove();
queue.remove();
}
}
}
 PerThreadQueuedDispatcher的dispatch()

 PerThreadQueuedDispatcher包括ThreadLocal<Queue<Event>> queue和ThreadLocal<Boolean> dispatching

 dispatching表示是否正在执行dispatch, queue用于让每个线程有一个独立的事件队列.
上一篇:Redis源码阅读(三)集群-连接初始化


下一篇:docker-4-镜像