一、用来干啥
EventBus是Guava包内的一个 实现订阅/通知的一个组件,可以用来实现进程内的消息通知。(分布式系统要使用MQ消息队列进行通信)
二、怎么做
引入依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
创造观察者:
// 观察者1
public class Observer1 {
@Subscribe
public void toDo(String msg){
System.out.println("Observer1 获得消息" + msg);
}
}
// 观察者2
public class Observer2 {
@Subscribe
public void toDo(String msg){
System.out.println("Observer2 获得消息:" + msg);
}
}
创建观察者特别方便,只需要通过注解@Subscribe
标识即可。
创造通知者:
public class DemoMain {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
Observer1 Observer1 = new Observer1();
Observer2 Observer2 = new Observer2();
// 进行消息订阅
eventBus.register(Observer1);
eventBus.register(Observer2);
// 通知者发送消息
eventBus.post("发送消息 啦啦啦.....");
}
}
运行结果:
三、why(底层如何实现的)
3.1 观察者注册
// 注册观察者
eventBus.register(Observer1);
eventBus.register(Observer2);
底层原理:
void register(Object listener) {
// 获得该订阅者所有的 订阅方法
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
// 合并
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers = MoreObjects.firstNonNull(
subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
可以发现 EventBus内部是通过 一个Key为 Class , Value为Subcriber的对象的 Map 来维护观察者的信息。
3.2 消息发送
eventBus.post("发送消息 啦啦啦.....");
底层原理:
public void post(Object event) {
// 获得所有的订阅者
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 通过分发器进行分发 处理事件
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
分发处理逻辑:PerThreadQueuedDispatcher
,这个分发器是在每个线程用一个队列存储 事件对象,用ThreadLocal进行管理,内部的队列,保证了发布处理的顺序性。
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
// 事件存储到ThreadLocal 管理的 队列当中
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
// 按顺序进行处理
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
// 内部逻辑 就是调用目标的订阅方法
// method.invoke(target, checkNotNull(event));
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
�