手写简化EventBus,理解框架核心原理

前言

本来想学习下EventBus最新框架的源码,但是最新的框架的代码量已经很大了,很多都是锦上添花的东西,很多核心的原理代码需要从中剥离出来去了解。但是对于刚开始看源码就直接拿到这么功能丰富并完善的代码,可能收效甚微。为了自己学习并且帮助读者同志们学习,这里自己根据以前学习的经验理解,手写一份简化Eventbus源码,和大家一起学习此框架的要义。

EventBus技术架构

简介

Eventbus的存在是为了解决组件间的简单通信问题,采用发布-订阅者模式,通过统一的管理机构(bus总线)维护所有的订阅者,将发布者发布的消息发送对应订阅者的处理方法处理。
发布者和订阅者可以是任意Activity、Fragment、Service、Thread等,只要在一个进程中就可以,其不支持多进程。目前git地址为:https://github.com/greenrobot/EventBus。截至今日,最新版本3.2.0。
使用方法:

EventBus in 3 steps

   // 1.Define events: 定义Event,这里为用户使用时自定义的部分,框架不包含此格式,为任意java bean包含自己需要传输的信息。

    public static class MessageEvent { /* Additional fields if needed */ }


    //2 Prepare subscribers: Declare and annotate your subscribing method, optionally specify a thread mode:
    //准备订阅者方法,方法名是任意的,只是必须带@Subscribe注解,必须只有一个参数,参数需是第一步发送的消息。

    @Subscribe(threadMode = ThreadMode.MAIN)  
    public void onMessageEvent(MessageEvent event) {/* Do something */};

    //3.Register and unregister your subscriber. For example on Android, activities and fragments should usually register according to their life cycle:
   // 注册和解注册eventbus,可以在activity和fragment、或者线程中,根据场景需要的时机进行注册解注册。

     @Override
     public void onStart() {
         super.onStart();
         EventBus.getDefault().register(this);
     }

     @Override
     public void onStop() {
         super.onStop();
         EventBus.getDefault().unregister(this);
     }

    //4.Post events: 发送event,可以是任何地方

     EventBus.getDefault().post(new MessageEvent());

架构原理

此架构图为EventBus原版的架构图。
手写简化EventBus,理解框架核心原理中间的EventBus是个大管家,负责所有消息流转和注册信息保存,程序运行起来时,注册者如Activity调用了register方法并把自己的this作为参数传入,大管家会登记所有的登记者和接收处理方法(@Subscribe注解方法)。当有发布者发布消息,即调用了EventBus.getDefault().post方法,将参数自定义Event发布到管家EventBus中, 大管家找到这个进程大家庭中所有注册了此Event类型的方法,并进行调用处理。

手写框架

模块

要手写框架,即需要包括框架核心原理和实现核心方法。这里进行列举,以明确程序框架。根据上节EventBus简介的使用方法可知需要实现:

  • 自定义注解@Subscribe和线程类型枚举。
  • EventBus类,并实现注册register方法、解注册方法unregister、post方法,且有getDefault单例方法。
  • 获取所有注册类中的加了@Subscribe注解的方法、方法参数和注解信息(线程和优先级信息)。
  • 实现post逻辑,即将发送Event到对应订阅方法参数是Event类型的方法

源码设计

菜谱有了,那就可以开始做菜了。如果想同时看着源码看文章的话,可以从github查看Demo。https://github.com/qingdaofu1/ZephyrBus
自定义注解和创建ThreadMode枚举类
自定义注解可以添加默认值,并且这里标注此注解是使用在方法上,并且是运行时保留策略,关于注解的详细原理可以参照我的另一篇总结:Android注解-看这篇文章就够了

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;
    int priority() default 0;
}

线程模型肯定包括默认值POSTING,即和发送线程保持一致。MAIN线程即主线程,BACKGROUND即后台线程,非主线程。这里仅作示例,可以学习EventBus增加其他选项进行串行队列的处理或者线程池多线程等特性。

public enum ThreadMode {
    POSTING,
    MAIN,
    BACKGROUD
}

创建MyEventBus类和必要方法

  • 数据结构

为了保存订阅者的处理方法需要两个数据结构。
由于用户不一定在一个地方注册,即如果进程中Activity中、Fragment、Service等都有注册,则需要一个结合记录每个注册类中所有的订阅方法,这个集合即typesBySubscriber 。
然后比如用户有多种Event类型,比如Aevent、Bevent…,需要有个能记录所有处理Aevent的订阅者方法类和所有处理Bevent的方法类。如果有更多则记录更多。这个数据结构即subscriptionsByEventType ;其中的CopyOnWriteArrayList数据结构用法可以参考:CopyOnWriteArrayList的原理及使用
这里学习了源码中的数据结构名,以便读者返回读源码时更容易理解。

/**
     * 同一类型EventType类与所有注册方法的集合
     */
    private Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
    /**
     * 所有类有注册MyEventBus的  类与其内部所有处理方法集合。
     */
    private static final Map<Object, List<Class<?>>> typesBySubscriber = new HashMap<>();
    
    //订阅处理方法收集和调用策略抽象类,目前只实现了反射策略,另增加了APT类,待后边完善
	MethodInvokeStrategy invokeStrategy;

MethodInvokeStrategy为抽象方法类,定义了收集订阅方法和调用方法的接口,并创建了主线程的Handler和工作线程Handler供调用时使用。

public abstract class MethodInvokeStrategy {
    private static final String TAG = "MethodInvokeStrategy";

    private static HandlerThread handlerThread = new HandlerThread("workThread");

    protected static Handler mainHandler;

    protected static Handler workHander;

    public MethodInvokeStrategy() {
        handlerThread.start();
        mainHandler = new Handler(Looper.getMainLooper());
        workHander = new Handler(handlerThread.getLooper());
    }

    public List<SubscribedMethod> getAllSubscribedMethods(Object subscriber) {
        return null;
    }

    public void invokeMethod(Subscription subscription, Object event) {

    }

这里的必要方法包括单例方法getDefault、注册方法register、解注册方法unregister、发送方法post。

  • getDefault方法单例

这是较常用写法,如果想写不一样的单例可以参考:
设计模式之单例模式的几种写法

/**
 * 单例
 *  * @return
 */
public static MyEventBus getDefault() {
    if (instance == null) {
        synchronized (MyEventBus.class) {
            if (instance == null) {
                instance = new MyEventBus();
            }
        }
    }
    return instance;
}
  • register方法
    根据上边描述的,为了能把发布者的Event消息发布到所有的订阅者,需要记录所有处理Event的消息方法类信息。这里包含实际的处理方法,最终返回上一步的存储集合。
    由于register将当前类的this传入,即相当于获取了这个类的实例,即能够拿到其内的所有方法参数等信息。这些都是供实际收集方法的类需要的。
 /**
     * 注册subscriber到MyEventBus,并获取其所有加了{@link Subscribe} 的方法,并放入集合中
     *
     * @param subscriber 订阅者类,即通过register将this参数传过来的类,可以是activity、service、fragment、thread等。
     */
    public void register(Object subscriber) {
        List<SubscribedMethod> allSubscribedMethods = invokeStrategy.getAllSubscribedMethods(subscriber);
        for (SubscribedMethod subscribedMethod : allSubscribedMethods) {
            Class<?> eventType = subscribedMethod.getEventType();
            CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
            if (subscriptions == null) {
                subscriptions = new CopyOnWriteArrayList<>();
                subscriptionsByEventType.put(eventType, subscriptions);
            }
            // TODO: 2020/4/18  如果做priority逻辑,则在此处需要排序添加
            subscriptions.add(new Subscription(subscriber, subscribedMethod));
            // 获取这个订阅者类中记录的所有的eventType类型
            List<Class<?>> eventTypesInSubscriber = typesBySubscriber.get(subscriber);
            if (eventTypesInSubscriber == null) {
                eventTypesInSubscriber = new ArrayList<>();
                typesBySubscriber.put(subscriber, eventTypesInSubscriber);
            }
            eventTypesInSubscriber.add(eventType);
        }
        printTypesBySubscriber(typesBySubscriber, subscriber);
    }
  • unregister方法

解注册就比较简单了,直接将集合中对应的注册类的key去除。

/**
 * 解注册eventbus
 *
 * @param subscriber
 */
public void unregister(Object subscriber) {
    if (typesBySubscriber != null) {
        typesBySubscriber.remove(subscriber);
    }
}

post方法
post方法将参数event发送,发送时根据eventbus中大管家保存的所有能处理此event的方法发送过去处理。

/**
     * 发送event消息到订阅者 处理方法
     *
     * @param event
     */
    public void post(Object event) {
        if (subscriptionsByEventType.size() <= 0) {
            Log.e(TAG, "post: no any eventbus registed named" + event.toString());
            return;
        }
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(event.getClass());
        for (Subscription subscription : subscriptions) {
            invokeStrategy.invokeMethod(subscription, event);
        }
    }

到这里基本所有的方法都定义好了,理解上我们可以把整个EventBus的架构原理能够理解透了。但是实际的反射策略还没有讲。
实现获取所有订阅者处理方法,这里既然是反射实现的,自然需要懂反射的原理,这里我在写的时候也是很多的不熟悉,后边会跟上一篇介绍反射的文章,听别人讲或者看别人文章不如自己去实现下,你会发现写的时候好像不是那么回事,还是有些不理解的。
总之核心思想就是,通过注册类,可以获取所有的方法,然后获取方法的注解,获取注解的值,获取带特定注解方法的Event类,这些都是大管家需要收集的资料。

/**
     * 获取这个订阅者类中所有的带{@link Subscribe}的方法
     *
     * @param subscriber 订阅者类,即通过register将this参数传过来的类,可以是activity、service、fragment、thread等。
     */
    @Override
    public List<SubscribedMethod> getAllSubscribedMethods(Object subscriber) {
        //记录订阅者方法参数
        //CopyOnWriteArrayList<Subscription> subscribedMethods = new CopyOnWriteArrayList<Subscription>();
        List<SubscribedMethod> subscribedMethods = new ArrayList<>();
        Class<?> aClass = subscriber.getClass();
        //获取所有方法
        Method[] declaredMethods = aClass.getDeclaredMethods();
        for (Method declaredMethod : declaredMethods) {
            if (declaredMethod.isAnnotationPresent(Subscribe.class)) {
                Class<?>[] parameterTypes = declaredMethod.getParameterTypes();
                if (parameterTypes == null || parameterTypes.length > 1) {
                    throw new IllegalArgumentException("参数不能为空,且只能有一个参数");
                }
                Class<?> parameterType = parameterTypes[0];
                Log.d(TAG, "getAllSubscribedMethods: parameterType=" + parameterType.getName());
                Subscribe annotation = declaredMethod.getAnnotation(Subscribe.class);
                int priority = annotation.priority();
                ThreadMode threadMode = annotation.threadMode();
                SubscribedMethod subscribedMethod = new SubscribedMethod(declaredMethod, parameterType, threadMode, priority);
                Log.d(TAG, "getAllSubscribedMethods: subscribedMethod=" + subscribedMethod.toString());
                subscribedMethods.add(subscribedMethod);
            }
        }
        return subscribedMethods;
    }

最后便是根据线程模型调用方法的实现了,这里首先想想上边说的抽象类的两个handler,有主线程的和工作线程的handler。这里可以根据注解的值ThreadMode进行对应处理。相信大家对Handler也比较认识,如果需要进一步理解Handler和线程的关系,可以查看:Handler通信机制源码解读

public void invokeMethod(Subscription subscription, final Object event) {
        final Object subscriber = subscription.getSubscriber();
        SubscribedMethod subscribedMethod = subscription.getSubscribedMethod();
        final Method method = subscribedMethod.getMethod();
        switch (subscribedMethod.getThreadMode()){
            case POSTING:
                Log.d(TAG, "invokeMethod: ThreadMode=POSTING");
                invoke(method, subscriber, event);
                break;
            case MAIN:
                Log.d(TAG, "invokeMethod: ThreadMode=MAIN");
                mainHandler.post(new Runnable() {
                    @Override
                    public void run() {
                        invoke(method, subscriber, event);
                    }
                });
                break;
            case BACKGROUD:
                Log.d(TAG, "invokeMethod: ThreadMode=BACKGROUND");
                workHander.post(new Runnable() {
                    @Override
                    public void run() {
                        invoke(method, subscriber, event);
                    }
                });
                break;
        }

    }

    private void invoke(Method method, Object subscriber, Object event){
        try {
            method.invoke(subscriber, event);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }

源码测试

首先在测试module中添加发送Event源码,btn_send1为主线程发送,btn_send2会从主线程和线程中交替发送。而处理端则在注解Subscribe中分别使用默认线程和主线程。

findViewById(R.id.btn_send1).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                MyEventBus.getDefault().post(new WorkEvent(5));
            }
        });
        findViewById(R.id.btn_send2).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (isThreadMain) {
                    MyEventBus.getDefault().post(new ViewEvent("主线程测试文字"));
                } else {
                    new Thread() {
                        @Override
                        public void run() {
                            super.run();
                            MyEventBus.getDefault().post(new ViewEvent("子线程测试文字"));
                        }
                    }.start();
                }
                isThreadMain = !isThreadMain;
            }
        });
        
 @Subscribe()
    public void onEvent(final WorkEvent event) {
     //...省略
    }

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void handleView(final ViewEvent event) {
      //...省略
        });
    }

测试结果:
手写简化EventBus,理解框架核心原理本Demo的github地址为:https://github.com/qingdaofu1/ZephyrBus,希望有需要的同学可以下载下来实操下加深理解。
下一步计划是要增加apt模式的策略,有需要的可以继续关注。

纸上得来终觉浅,绝知此事要躬行。
喜欢的可以点个赞鼓励下,有问题可以留言沟通。

上一篇:ROS入门三 话题中的发布者Publisher和订阅者Subscriber


下一篇:RACSignal的Subscription深入分析