EventBus是Guava框架对观察者模式的一种实现,使用EventBus可以很简洁的实现事件注册监听和消费。
package com.boot.demo.test.eventbus; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; /** * @author braska * @date 2020/3/26 **/ public class EventBusTest { // 定义监听事件 static class TestEvent { String word; public TestEvent(String word) { this.word = word; } } // 定义监听者,可以监听多种类型的事件。 static class EventListener { // @Subscribe注解必写,入参为具体的事件,如果入参事件B是另一个事件A的子类。则A、B事件会同时出触发。 @Subscribe public void run(TestEvent event) { System.out.println(String.format("hello %s.", event.word)); } // 事件可以是一个简单的引用类型。 @Subscribe public void run(String event) { System.out.println(event); } } public static void main(String[] args) throws Throwable { // 实例化EventBus EventBus bus = new EventBus("id"); // 注册一个监听者 bus.register(new EventListener()); // 事件发布 bus.post(new TestEvent("world")); bus.post("hello java."); } }
Guava框架里面提供了两种相关的实现,一种是单线程同步事件消费(EventBus),另外一直是多线程异步事件消费(AsyncEventBus)。我们来看看demo。
定义事件
package com.boot.demo.test.eventbus.event; /** * @author braska * @date 2020/3/26 **/ public interface Event { }
package com.boot.demo.test.eventbus.event; /** * @author braska * @date 2020/3/26 **/ public class PrintEvent implements Event{ private String content; public String getContent() { return content; } public PrintEvent(String content) { this.content = content; } }
package com.boot.demo.test.eventbus.event; /** * @author braska * @date 2020/3/26 **/ public class ErrorEvent implements Event { private String message; private Exception exception; public String getMessage() { return message; } public Exception getException() { return exception; } public ErrorEvent(String messsage, Exception exception) { this.message = messsage; this.exception = exception; } }
定义事件消费管理器。
package com.boot.demo.test.eventbus; import com.boot.demo.test.eventbus.event.Event; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import java.util.concurrent.Executors; /** * 使用单例模式管理事件 * @author braska * @date 2020/3/26 **/ public class EventBusManager { // 使用线程内部类存储同步事件处理过程中抛出的异常信息。 ThreadLocal<Throwable> threadLocal = new ThreadLocal(); // 定义同步事件消费,使用匿名内部类处理异常。 private EventBus syncEventBus = new EventBus((throwable, context) -> { threadLocal.set(throwable); }); // 定义异常事件消费,指定线程池。 private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool()); // 使用静态内部类做单例模式。 private static class SINGLETON { private static final EventBusManager manager = new EventBusManager(); private static EventBusManager build() { EventBusSubmitter submitter = new EventBusSubmitter(); manager.asyncEventBus.register(submitter); manager.syncEventBus.register(submitter); return manager; } } // 获取manager单例 public static EventBusManager instance() { return SINGLETON.build(); } // 提交同步事件 public void postSync(Event event) throws Throwable { syncEventBus.post(event); // 往上抛出异常。 Throwable ex = threadLocal.get(); if (ex != null) { threadLocal.remove(); throw ex; } } // 提交异步事件 public void postAsync(Event event) { asyncEventBus.post(event); } }
定义事件监听器。也就是处理器。
package com.boot.demo.test.eventbus; import com.boot.demo.test.eventbus.event.ErrorEvent; import com.boot.demo.test.eventbus.event.PrintEvent; import com.google.common.base.Strings; import com.google.common.eventbus.Subscribe; /** * @author braska * @date 2020/3/26 **/ public class EventBusSubmitter { // 处理print事件 @Subscribe public void print(PrintEvent event) { if (Strings.isNullOrEmpty(event.getContent())) { throw new RuntimeException("content must be specify."); } System.out.println(String.format("hello %s.", event.getContent())); }
// 处理error事件 @Subscribe public void errorHandler(ErrorEvent event) { System.out.println(String.format("%s%s", event.getMessage(), event.getException().getMessage())); } }
测试代码
public static void main(String[] args) { EventBusManager manager = EventBusManager.instance(); try { manager.postSync(new PrintEvent("world")); manager.postSync(new PrintEvent(null)); } catch (Throwable e) { manager.postAsync(new ErrorEvent("receive a error message: ", (RuntimeException)e)); } }
控制台输出
Connected to the target VM, address: '127.0.0.1:54538', transport: 'socket' hello world. receive a error message: content must be specify.