Guava之EventBus事件驱动。

EventBus是Guava框架对观察者模式的一种实现,使用EventBus可以很简洁的实现事件注册监听和消费。

 

package com.boot.demo.test.eventbus;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

/**
 * @author braska
 * @date 2020/3/26
 **/
public class EventBusTest {
    // 定义监听事件
    static class TestEvent {
        String word;
        public TestEvent(String word) {
            this.word = word;
        }
    }

    // 定义监听者,可以监听多种类型的事件。
    static class EventListener {
        // @Subscribe注解必写,入参为具体的事件,如果入参事件B是另一个事件A的子类。则A、B事件会同时出触发。
        @Subscribe
        public void run(TestEvent event) {
            System.out.println(String.format("hello %s.", event.word));
        }
     // 事件可以是一个简单的引用类型。 
        @Subscribe
        public void run(String event) {
            System.out.println(event);
        }
    }


    public static void main(String[] args) throws Throwable {
        //  实例化EventBus
        EventBus bus = new EventBus("id");
        // 注册一个监听者
        bus.register(new EventListener());
        //  事件发布
        bus.post(new TestEvent("world"));
        bus.post("hello java.");
    }
}

  

Guava框架里面提供了两种相关的实现,一种是单线程同步事件消费(EventBus),另外一直是多线程异步事件消费(AsyncEventBus)。我们来看看demo。

定义事件

package com.boot.demo.test.eventbus.event;

/**
 * @author braska
 * @date 2020/3/26
 **/
public interface Event {
}

  

package com.boot.demo.test.eventbus.event;

/**
 * @author braska
 * @date 2020/3/26
 **/
public class PrintEvent implements Event{

    private String content;

    public String getContent() {
        return content;
    }

    public PrintEvent(String content) {
        this.content = content;
    }
}

  

package com.boot.demo.test.eventbus.event;

/**
 * @author braska
 * @date 2020/3/26
 **/
public class ErrorEvent implements Event {

    private String message;
    private Exception exception;

    public String getMessage() {
        return message;
    }

    public Exception getException() {
        return exception;
    }

    public ErrorEvent(String messsage, Exception exception) {
        this.message = messsage;
        this.exception = exception;
    }
}

  

定义事件消费管理器。

package com.boot.demo.test.eventbus;

import com.boot.demo.test.eventbus.event.Event;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;

import java.util.concurrent.Executors;

/**
 * 使用单例模式管理事件
 * @author braska
 * @date 2020/3/26
 **/
public class EventBusManager {
    
    // 使用线程内部类存储同步事件处理过程中抛出的异常信息。
    ThreadLocal<Throwable> threadLocal = new ThreadLocal();
    
    // 定义同步事件消费,使用匿名内部类处理异常。
    private EventBus syncEventBus = new EventBus((throwable, context) -> {
        threadLocal.set(throwable);
    });
    // 定义异常事件消费,指定线程池。
    private AsyncEventBus asyncEventBus = new AsyncEventBus(Executors.newCachedThreadPool());

    // 使用静态内部类做单例模式。
    private static class SINGLETON {
        private static final EventBusManager manager = new EventBusManager();

        private static EventBusManager build() {
            EventBusSubmitter submitter = new EventBusSubmitter();
            manager.asyncEventBus.register(submitter);
            manager.syncEventBus.register(submitter);
            return manager;
        }
    }

   // 获取manager单例
    public static EventBusManager instance() {
        return SINGLETON.build();
    }
   
    // 提交同步事件
    public void postSync(Event event) throws Throwable {
        syncEventBus.post(event);
        // 往上抛出异常。
        Throwable ex = threadLocal.get();
        if (ex != null) {
            threadLocal.remove();
            throw ex;
        }
    }
    
    // 提交异步事件
    public void postAsync(Event event) {
        asyncEventBus.post(event);
    }
}

  

定义事件监听器。也就是处理器。

package com.boot.demo.test.eventbus;

import com.boot.demo.test.eventbus.event.ErrorEvent;
import com.boot.demo.test.eventbus.event.PrintEvent;
import com.google.common.base.Strings;
import com.google.common.eventbus.Subscribe;

/**
 * @author braska
 * @date 2020/3/26
 **/
public class EventBusSubmitter {
    // 处理print事件
    @Subscribe
    public void print(PrintEvent event) {
        if (Strings.isNullOrEmpty(event.getContent())) {
            throw new RuntimeException("content must be specify.");
        }
        System.out.println(String.format("hello %s.", event.getContent()));
    }

// 处理error事件 @Subscribe public void errorHandler(ErrorEvent event) { System.out.println(String.format("%s%s", event.getMessage(), event.getException().getMessage())); } }

  

测试代码

public static void main(String[] args) {
        EventBusManager manager = EventBusManager.instance();
        try {
            manager.postSync(new PrintEvent("world"));
            manager.postSync(new PrintEvent(null));
        } catch (Throwable e) {
            manager.postAsync(new ErrorEvent("receive a error message: ", (RuntimeException)e));
        }
    }

  

控制台输出

Connected to the target VM, address: '127.0.0.1:54538', transport: 'socket'
hello world.
receive a error message: content must be specify.

  

上一篇:思考一种好的架构(六)


下一篇:使用EventBus实现兄弟组件之间的通信