Guava的EventBus

一、用来干啥

EventBus是Guava包内的一个 实现订阅/通知的一个组件,可以用来实现进程内的消息通知。(分布式系统要使用MQ消息队列进行通信)

二、怎么做

引入依赖:

       <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>19.0</version>
        </dependency>

创造观察者:

//	观察者1
public class Observer1 {

    @Subscribe
    public void toDo(String msg){
        System.out.println("Observer1 获得消息" + msg);
    }
}

// 观察者2
public class Observer2 {

    @Subscribe
    public void toDo(String msg){
        System.out.println("Observer2 获得消息:" + msg);
    }
}

创建观察者特别方便,只需要通过注解@Subscribe标识即可。

创造通知者:

public class DemoMain {
    public static void main(String[] args) {
        EventBus eventBus = new EventBus();
        Observer1 Observer1 = new Observer1();
        Observer2 Observer2 = new Observer2();
		
        // 进行消息订阅
        eventBus.register(Observer1);
        eventBus.register(Observer2);
		
        // 通知者发送消息
        eventBus.post("发送消息 啦啦啦.....");
    }
}

运行结果:
Guava的EventBus

三、why(底层如何实现的)

3.1 观察者注册

 // 注册观察者
eventBus.register(Observer1);
  eventBus.register(Observer2);

底层原理:

void register(Object listener) {
    //  获得该订阅者所有的 订阅方法
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
	
    // 合并
    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers = MoreObjects.firstNonNull(
            subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

可以发现 EventBus内部是通过 一个Key为 Class , Value为Subcriber的对象的 Map 来维护观察者的信息。

3.2 消息发送

eventBus.post("发送消息 啦啦啦.....");

底层原理:

  public void post(Object event) {
     // 获得所有的订阅者 
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      // 通过分发器进行分发 处理事件
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

分发处理逻辑:
PerThreadQueuedDispatcher,这个分发器是在每个线程用一个队列存储 事件对象,用ThreadLocal进行管理,内部的队列,保证了发布处理的顺序性。

@Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
        
       // 事件存储到ThreadLocal 管理的 队列当中 
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));

      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
            // 按顺序进行处理
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
                
              // 内部逻辑 就是调用目标的订阅方法 
              // method.invoke(target, checkNotNull(event));  
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }


上一篇:EventBus


下一篇:Angular - 跨组件间的数据广播通信