内容简介:EventBus是通过使用发布者/订阅者模式来实现解耦的Android和Java开源库。在Android开发中通常使用EventBus实现Activities, Fragments, Threads, Services等组件之间的通信。但EventBus不能实现跨进程间的通信。我们可以看出,1.0.1版本还是有些bug和不足的,后面我将分析2.x和3.x版本。
EventBus是通过使用发布者/订阅者模式来实现解耦的Android和 Java 开源库。在Android开发中通常使用EventBus实现Activities, Fragments, Threads, Services等组件之间的通信。但EventBus不能实现跨进程间的通信。
图文表达
源码剖析
-
UML类图
-
源码分析 EventBus.java
package de.greenrobot.event; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.util.Log; /** * Class based event bus, optimized for Android. By default, subscribers will handle events in methods named "onEvent". * * @author Markus Junginger, greenrobot */ public class EventBus { /** * Log tag, apps may override it. */ public static String TAG = "Event"; private static final EventBus defaultInstance = new EventBus();//默认实例 public enum ThreadMode { /** * Subscriber will be called in the same thread, which is posting the event. */ PostThread,//发布事件所在线程订阅 /** * Subscriber will be called in Android's main thread (sometimes referred to as UI thread). */ MainThread,//主线程订阅 /* BackgroundThread */ } //表示某个类中同一个方法名的所有重载方法。(也就是一个类中所有的订阅方法),缓存,目的:提高性能 private static final Map<String, List<Method>> methodCache = new HashMap<String, List<Method>>(); //保存事件类型的所有的事件(包括父类和接口),懒加载的,缓存,目的:了提高性能 private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<Class<?>, List<Class<?>>>(); //一个事件类型的所有订阅者:发送事件时使用到(时间复杂度为:O(1)) //用CopyOnWriteArrayList是为了读取是线程安全的。 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; //一个订阅对象的所有订阅事件: // 订阅(register)时是加入 // 取消订阅(unregister)时使用:获取当前订阅对象的所有订阅事件,然后在订阅事件中将这个订阅者删除。 private final Map<Object, List<Class<?>>> typesBySubscriber; //每个线程的事件队列 private final ThreadLocal<List<Object>> currentThreadEventQueue = new ThreadLocal<List<Object>>() { @Override protected List<Object> initialValue() { return new ArrayList<Object>(); } }; //每个线程的队列是否在运转中 private final ThreadLocal<BooleanWrapper> currentThreadIsPosting = new ThreadLocal<BooleanWrapper>() { @Override protected BooleanWrapper initialValue() { return new BooleanWrapper(); } }; private String defaultMethodName = "onEvent"; private PostViaHandler mainThreadPoster; public static EventBus getDefault() { return defaultInstance; } public EventBus() { subscriptionsByEventType = new HashMap<Class<?>, CopyOnWriteArrayList<Subscription>>(); typesBySubscriber = new HashMap<Object, List<Class<?>>>(); mainThreadPoster = new PostViaHandler(Looper.getMainLooper()); } public void register(Object subscriber) {//注册 register(subscriber, defaultMethodName, ThreadMode.PostThread);//默认方法名,即onEvent, } public void registerForMainThread(Object subscriber) { register(subscriber, defaultMethodName, ThreadMode.MainThread); } //这里应该加同步synchronized public void register(Object subscriber, String methodName, ThreadMode threadMode) {//传入订阅者,遍历查找其订阅的所有事件 List<Method> subscriberMethods = findSubscriberMethods(subscriber.getClass(), methodName); for (Method method : subscriberMethods) { Class<?> eventType = method.getParameterTypes()[0];//参数的类型 //查找的时候是根据参数的类型来确定订阅者的 subscribe(subscriber, method, eventType, threadMode); } } //如果当前类含有父类,是否查找父类中的所有含指定名称的方法 private List<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {//查找某个类中所有指定名称的方法 String key = subscriberClass.getName() + '.' + methodName; //类名+方法名为Key //key相同的同时进入,会出现两次创建 List<Method> subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key);//方法名称一样的存在多个重载方法 } if (subscriberMethods != null) { return subscriberMethods; } //同时进入 subscriberMethods = new ArrayList<Method>(); Class<?> clazz = subscriberClass; HashSet<Class<?>> eventTypesFound = new HashSet<Class<?>>();// 同一个类参数相同的方法只加入一次(就是重写的方法) while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {//过滤掉系统类 // Skip system classes, this just degrades performance break; } Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { if (method.getName().equals(methodName)) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) {//参数为1个 if (eventTypesFound.add(parameterTypes[0])) { // Only add if not already found in a sub class subscriberMethods.add(method); } } } } clazz = clazz.getSuperclass();//查找父类,也就是父类方法也会被调用 } if (subscriberMethods.isEmpty()) {//没有订阅方法,抛出异常 throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName); } else { synchronized (methodCache) { methodCache.put(key, subscriberMethods); } return subscriberMethods; } } public void register(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件类型 register(subscriber, defaultMethodName, ThreadMode.PostThread, eventType, moreEventTypes); } public void registerForMainThread(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件类型注册 register(subscriber, defaultMethodName, ThreadMode.MainThread, eventType, moreEventTypes); } public synchronized void register(Object subscriber, String methodName, ThreadMode threadMode, Class<?> eventType, Class<?>... moreEventTypes) { Class<?> subscriberClass = subscriber.getClass(); Method method = findSubscriberMethod(subscriberClass, methodName, eventType); subscribe(subscriber, method, eventType, threadMode); for (Class<?> anothereventType : moreEventTypes) { method = findSubscriberMethod(subscriberClass, methodName, anothereventType); subscribe(subscriber, method, anothereventType, threadMode); } } //应该在对象lock中调用 private void subscribe(Object subscriber, Method subscriberMethod, Class<?> eventType, ThreadMode threadMode) { //调用方:register(Object subscriber, String methodName, ThreadMode threadMode)没有加锁 //并发的问题:多个线程同时进入,不同页面都含有同一事件 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>(); subscriptionsByEventType.put(eventType, subscriptions);//事件类型为key,订阅者为value,一对多 } else { for (Subscription subscription : subscriptions) {//同一事件的订阅者,多次设置 if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } subscriberMethod.setAccessible(true); Subscription subscription = new Subscription(subscriber, subscriberMethod, threadMode); subscriptions.add(subscription); List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);//订阅者为key,订阅的事件为value,一对多 if (subscribedEvents == null) { subscribedEvents = new ArrayList<Class<?>>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); } /** * Class.getMethod is slow on Android 2.3 (and probably other versions), so use getDeclaredMethod and go up in the * class hierarchy if neccessary. */ private Method findSubscriberMethod(Class<?> subscriberClass, String methodName, Class<?> eventType) { Class<?> clazz = subscriberClass; while (clazz != null) { try { return clazz.getDeclaredMethod(methodName, eventType); } catch (NoSuchMethodException ex) { clazz = clazz.getSuperclass(); } } throw new RuntimeException("Method " + methodName + " not found in " + subscriberClass + " (must have single parameter of event type " + eventType + ")"); } /** * Unregisters the given subscriber for the given event classes. */ public synchronized void unregister(Object subscriber, Class<?>... eventTypes) { if (eventTypes.length == 0) { throw new IllegalArgumentException("Provide at least one event class"); } List<Class<?>> subscribedClasses = typesBySubscriber.get(subscriber); if (subscribedClasses != null) { for (Class<?> eventType : eventTypes) { unubscribeByEventType(subscriber, eventType); subscribedClasses.remove(eventType); } if (subscribedClasses.isEmpty()) { typesBySubscriber.remove(subscriber); } } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } /** * Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */ private void unubscribeByEventType(Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { if (subscriptions.get(i).subscriber == subscriber) { subscriptions.remove(i); i--; size--; } } } } /** * Unregisters the given subscriber from all event classes. */ public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);//删除所有的订阅事件 if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unubscribeByEventType(subscriber, eventType);//删除事件类型 } typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } //post(Object event) //post(Object event) /** * Posts the given event to the event bus. */ public void post(Object event) {//String Object List<Object> eventQueue = currentThreadEventQueue.get(); //每个线程一个队列,保证同一个线程发送事件的有序性 eventQueue.add(event); BooleanWrapper isPosting = currentThreadIsPosting.get(); if (isPosting.value) {//当前线程队列正在运转 return; } else {//没有运转开始运转 isPosting.value = true; try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0)); //开始发送事件 } } finally { isPosting.value = false; } } } private void postSingleEvent(Object event) throws Error { List<Class<?>> eventTypes = findEventTypes(event.getClass()); //找到所有的事件类型(需要给父事件和接口事件也发送) boolean subscriptionFound = false; int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) {//只能保证Map的原子性 subscriptions = subscriptionsByEventType.get(clazz); } if (subscriptions != null) { for (Subscription subscription : subscriptions) {//因为是CopyOnWriteArrayList,所以写入时读取是线程安全的 if (subscription.threadMode == ThreadMode.PostThread) {//在发送线程中执行 postToSubscribtion(subscription, event); } else if (subscription.threadMode == ThreadMode.MainThread) {//在主线程中执行 mainThreadPoster.enqueue(event, subscription);//在主线程中执行 } else { throw new IllegalStateException("Unknown thread mode: " + subscription.threadMode); } } subscriptionFound = true; } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } //查找所有的事件类型:当前类型和所有父类型,所有接口,包括父类型 // /** * Finds all Class objects including super classes and interfaces. */ private List<Class<?>> findEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); //缓存中存在,直接返回 if (eventTypes == null) { eventTypes = new ArrayList<Class<?>>(); Class<?> clazz = eventClass; while (clazz != null) {//事件类型,父事件 eventTypes.add(clazz); //本身是一个事件类型 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } } /** * Recurses through super interfaces. */ static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } static void postToSubscribtion(Subscription subscription, Object event) throws Error { try { subscription.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause); if (cause instanceof Error) { throw (Error) cause; } } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } } final static class Subscription { final Object subscriber; final Method method; final ThreadMode threadMode; Subscription(Object subscriber, Method method, ThreadMode threadMode) { this.subscriber = subscriber; this.method = method; this.threadMode = threadMode; } @Override public boolean equals(Object other) { if (other instanceof Subscription) { Subscription otherSubscription = (Subscription) other; // Super slow (improve once used): http://code.google.com/p/android/issues/detail?id=7811 return subscriber == otherSubscription.subscriber && method.equals(otherSubscription.method); } else { return false; } } @Override public int hashCode() { // Check performance once used return subscriber.hashCode() + method.hashCode(); } } /** * For ThreadLocal, much faster to set than storing a new Boolean. */ final static class BooleanWrapper {//提高性能,使用Boolean,修改值时需要每次set boolean value; } final static class PostViaHandler extends Handler {//主线程发送队列 PostViaHandler(Looper looper) { super(looper); } void enqueue(Object event, Subscription subscription) { PendingPost pendingPost = PendingPost.obtainPendingPost(event, subscription); //待发送对象 Message message = obtainMessage(); message.obj = pendingPost; if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); postToSubscribtion(subscription, event); } } } 复制代码
PendingPost.java
package de.greenrobot.event; import java.util.ArrayList; import java.util.List; import de.greenrobot.event.EventBus.Subscription; final class PendingPost {//使用了享元模式 private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>(); Object event; Subscription subscription; private PendingPost(Object event, Subscription subscription) { this.event = event; this.subscription = subscription; } static PendingPost obtainPendingPost(Object event, Subscription subscription) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0) { PendingPost pendingPost = pendingPostPool.remove(size - 1); pendingPost.event = event; pendingPost.subscription = subscription; return pendingPost; } } return new PendingPost(event, subscription); } static void releasePendingPost(PendingPost pendingPost) { synchronized (pendingPostPool) { pendingPostPool.add(pendingPost); } } } 复制代码
-
优点
1.在作者的注释中(Class based event bus[指Google guava库中的 event bus], optimized for Android. By default, subscribers will handle events in methods named "onEvent".)可以看到,为了优化性能,弃用了注解,使用固定或指定订阅方法的方式,
2.轻巧,不依赖Context。一些处理细节上也做了相应的优化,如:使用BooleanWrapper代替Boolean,这样就不需要每次更改值都调用ThreadLocal的set方法;PendingPost的创建采用了享元模式提高了获取其实例的性能,不需要频繁创建和销毁;利用CopyOnWriteArrayList在写入安全的情况下提高了读取性能。
3.能指定在主线程中订阅事件。因为通常情况下,我们会在非UI线程中发送事件来更新UI,而更新UI需要在主线程中执行。
-
不足
1.为了性能最求性能的极致,失去了订阅方法的灵活性,只能指定onEvent方法才能监听,或者注册时调用指定方法名,没有使用注解那么灵活。
2.订阅者接收事件的线程不能根据订阅事件设置。因为经常我们在一个类中会订阅多个事件,而每个事件基本上是没有相关性的,在哪个线程上订阅是根据事件本身来决定的。
3、强制使用事件的传递性,如:我们发送某个事件,那么这个事件所有的父类及接口都会被强制触发,最好有设置开关,让用户决定。
-
解读
- 以下代码为何需要使用事件队列,直接发送事件给订阅者不是更快更直接吗?
public void post(Object event) {//String Object List<Object> eventQueue = currentThreadEventQueue.get(); //每个线程一个队列,保证同一个线程发送事件的有序性 eventQueue.add(event); BooleanWrapper isPosting = currentThreadIsPosting.get(); if (isPosting.value) {//当前线程队列正在运转 //在同一个线程中什么时候会走到这里呢?如果走不到这里那么这个队列就没有意义 return; } else {//没有运转开始运转 isPosting.value = true; try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0)); //开始发送事件 } } finally { isPosting.value = false; } } } 如果我们能找到队列正在运转的情况下,那么队列就有意义,能保证事件处理的有序性。 举例: public class RefreshEvent {} public class Activity1 extends AppCompatActivity{ @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); EventBus.getDefault().register(this); } @Override protected void onDestroy() { EventBus.getDefault().unregister(this); super.onDestroy(); } public void onEvent(TestEvent event) { EventBus.getDefault().post(new RefreshEvent()); } } public class ActivityB extends AppCompatActivity{ @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); EventBus.getDefault().register(this); } @Override protected void onDestroy() { EventBus.getDefault().unregister(this); super.onDestroy(); } public void onEvent(TestEvent event) { } } 我们在后台业务方法中发送了刷新事件: EventBus.getDefault().post(new RefreshEvent()); 上面的例子中:1、一个事件被多个订阅者订阅;2、订阅者收到事件后再次发送相同事件。 在这种情况下,如果我们不使用队列,那么在同一个线程中,先发送的事件订阅者就可能会被后收到。 这里使用ThreadLocal+队列,就很巧妙地维护了在同一个线程中事件处理的有序性。 复制代码
- 以下代码为何需要使用事件队列,直接发送事件给订阅者不是更快更直接吗?
-
bug
public void register(Object subscriber, String methodName, ThreadMode threadMode) 方法存在线程安全的问题,除非调用者都加了锁,但代码中最常用的调用public void register(Object subscriber, String methodName, ThreadMode threadMode)却没有加锁。
自实现
-
第一步,实现主逻辑:
- 消息的订阅:register(Object subscriber)
- 消息的发布:post(Object event)
- 取消订阅:unregister(Object subscriber)
EventBus.java 代码
package org.hjb.eventbus; import android.util.Log; import androidx.annotation.NonNull; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; public class EventBus { private static final String TAG = "EventBus"; private static volatile EventBus mDefaultEventBus; private static final String DEFAULT_METHOD_NAME = "onEvent"; //key:订阅事件 //value:订阅者信息,订阅者信息包括:订阅对象,订阅方法 private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>(); private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() { @Override protected ArrayList<Object> initialValue() { return new ArrayList<>(); } }; private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return Boolean.FALSE; } }; public EventBus() { } public static EventBus getDefault() { if (mDefaultEventBus == null) { synchronized (EventBus.class) { if (mDefaultEventBus == null) { mDefaultEventBus = new EventBus(); } } } return mDefaultEventBus; } /** * 订阅 * * @param subscriber 订阅者 */ public synchronized void register(@NonNull Object subscriber) {//订阅 //保存当前订阅者的所有订阅事件 //因为查询是通过订阅事件来查找的,所以使用map来保存订阅数据 //查找当前订阅对象的所有订阅方法 ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME); for (Method method : methods) { //订阅事件 Class<?> eventType = method.getParameterTypes()[0]; CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //需要判断是否重复添加(同一个事件已经加过订阅者),抛出异常 for (Subscription subscription : subscriptions) { if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } method.setAccessible(true);//防止私有方法不能访问 subscriptions.add(new Subscription(subscriber, method)); } } /** * 需要查找当前对象所有带onEvent的重载方法,包括父类 * <p> * 注意点: * 1、覆盖的方法不要重复添加 * 2、过滤掉系统类,java/javax/android开头的 * 3、只查找单个参数的方法 * 4、私有方法也查找 * * @param subscriber 订阅者 * @return 所有订阅对象 */ private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) { ArrayList<Method> subscriberMethods = new ArrayList<>(); Class<?> clazz = subscriber.getClass(); while (clazz != null) { //过滤系统类 String name = clazz.getName(); if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) { break; } //查找目标方法 for (Method method : clazz.getDeclaredMethods()) { if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) { int i = 0, size = subscriberMethods.size(); for (; i < size; i++) { if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) { break; } } if (i == size) {//表示不存在,加入 subscriberMethods.add(method); } } } clazz = clazz.getSuperclass(); } return subscriberMethods; } /** * 发送事件 * <p> * 需要给所有订阅者发送事件 * <p> * 注意:我们要发送的不仅是当前对象的订阅者,同时包括当前父类对象和接口对象的订阅者。 * <p> * 解决有有序性的问题:我们需要保证同一个线程中 * * @param event 事件 */ public void post(Object event) { List<Object> eventQueue = currentThreadEventQueue.get(); eventQueue.add(event); Boolean isPosting = currentThreadIsPosting.get(); if (!isPosting.booleanValue()) { currentThreadIsPosting.set(Boolean.TRUE); while (!eventQueue.isEmpty()) { postEvent(eventQueue.remove(0)); } currentThreadIsPosting.set(Boolean.FALSE); } else { Log.i("hejunbin", "正在运转"); } } /** * 发送事件 * * @param event 待发送事件 */ private void postEvent(Object event) { //1、找到当前对象的所有事件类型 ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass()); //2、找到此事件所有的订阅者 boolean subscriptionFound = false; //是否找到了订阅者 for (Class<?> eventType : eventTypes) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventType); } if (subscriptions != null && subscriptions.size() > 0) { subscriptionFound = true; for (Subscription subscription : subscriptions) { try { subscription.method.invoke(subscription.subscriber, event); } catch (Exception e) { Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause()); } } } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } /** * 找到当前对象的所有事件类型(包括父类和接口) * * @param eventClass 事件类型 * @return 所有事件类型 */ private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) { ArrayList<Class<?>> eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); //接口本身本身存在层级 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } return eventTypes; } private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } /** * 取消订阅 * * @param subscriber 订阅者 */ public synchronized void unregister(Object subscriber) {//取消订阅 for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) { int size = entry.getValue().size(); for (int i = 0; i < size; i++) { if (entry.getValue().get(i).subscriber == subscriber) { entry.getValue().remove(i); i--; size--; } } } } /** * 订阅对象 */ static class Subscription { Object subscriber; Method method; public Subscription(Object subscriber, Method method) { this.subscriber = subscriber; this.method = method; } } } 复制代码
-
可以在主线程中订阅事件
在Android利用Handler的机制,将事件发送到主队列中执行。
因为需要将订阅信息(Subscription)和事件(event)都需要传递,而Handler中的Message只能传递单个Object,所以自己创建了PendingPost包装类。
EventBus.java代码
package org.hjb.eventbus; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.util.Log; import androidx.annotation.MainThread; import androidx.annotation.NonNull; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; public class EventBus { private static final String TAG = "EventBus"; private static volatile EventBus mDefaultEventBus; private static final String DEFAULT_METHOD_NAME = "onEvent"; public enum ThreadMode { PostThread, //post所在线程 MainThread,//主线程 } //key:订阅事件 //value:订阅者信息,订阅者信息包括:订阅对象,订阅方法 private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>(); private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() { @Override protected ArrayList<Object> initialValue() { return new ArrayList<>(); } }; private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return Boolean.FALSE; } }; private MainThreadHandler mainThreadHandler; public EventBus() { mainThreadHandler = new MainThreadHandler(); } public static EventBus getDefault() { if (mDefaultEventBus == null) { synchronized (EventBus.class) { if (mDefaultEventBus == null) { mDefaultEventBus = new EventBus(); } } } return mDefaultEventBus; } /** * 订阅 * * @param subscriber 订阅者 */ public synchronized void register(@NonNull Object subscriber) {//订阅 subscribe(subscriber, ThreadMode.PostThread); } public synchronized void registerForMainThread(Object subscriber) { subscribe(subscriber, ThreadMode.MainThread); } public synchronized void register(Object subscriber, ThreadMode mode) { subscribe(subscriber, mode); } private void subscribe(Object subscriber, ThreadMode mode) {//需要在线程安全的方法中调用 //保存当前订阅者的所有订阅事件 //因为查询是通过订阅事件来查找的,所以使用map来保存订阅数据 //查找当前订阅对象的所有订阅方法 ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME); for (Method method : methods) { //订阅事件 Class<?> eventType = method.getParameterTypes()[0]; CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //需要判断是否重复添加(同一个事件已经加过订阅者),抛出异常 for (Subscription subscription : subscriptions) { if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } method.setAccessible(true);//防止私有方法不能访问 subscriptions.add(new Subscription(subscriber, method, mode)); } } /** * 需要查找当前对象所有带onEvent的重载方法,包括父类 * <p> * 注意点: * 1、覆盖的方法不要重复添加 * 2、过滤掉系统类,java/javax/android开头的 * 3、只查找单个参数的方法 * 4、私有方法也查找 * * @param subscriber 订阅者 * @return 所有订阅对象 */ private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) { ArrayList<Method> subscriberMethods = new ArrayList<>(); Class<?> clazz = subscriber.getClass(); while (clazz != null) { //过滤系统类 String name = clazz.getName(); if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) { break; } //查找目标方法 for (Method method : clazz.getDeclaredMethods()) { if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) { int i = 0, size = subscriberMethods.size(); for (; i < size; i++) { if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) { break; } } if (i == size) {//表示不存在,加入 subscriberMethods.add(method); } } } clazz = clazz.getSuperclass(); } return subscriberMethods; } /** * 发送事件 * <p> * 需要给所有订阅者发送事件 * <p> * 注意:我们要发送的不仅是当前对象的订阅者,同时包括当前父类对象和接口对象的订阅者。 * <p> * 解决有有序性的问题:我们需要保证同一个线程中 * * @param event 事件 */ public void post(Object event) { List<Object> eventQueue = currentThreadEventQueue.get(); eventQueue.add(event); Boolean isPosting = currentThreadIsPosting.get(); if (!isPosting.booleanValue()) { currentThreadIsPosting.set(Boolean.TRUE); while (!eventQueue.isEmpty()) { postEvent(eventQueue.remove(0)); } currentThreadIsPosting.set(Boolean.FALSE); } else { Log.i("hejunbin", "正在运转"); } } /** * 发送事件 * * @param event 待发送事件 */ private void postEvent(Object event) { //1、找到当前对象的所有事件类型 ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass()); //2、找到此事件所有的订阅者 boolean subscriptionFound = false; //是否找到了订阅者 for (Class<?> eventType : eventTypes) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventType); } if (subscriptions != null && subscriptions.size() > 0) { subscriptionFound = true; for (Subscription subscription : subscriptions) { if (subscription.mode == ThreadMode.PostThread) { postEvent(subscription, event); } else if (subscription.mode == ThreadMode.MainThread) {//在主线程中发送 mainThreadHandler.enqueue(subscription, event); } else {//非法 throw new IllegalStateException("Unknown thread mode: " + subscription.mode); } } } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } private static void postEvent(Subscription subscription, Object event) { try { subscription.method.invoke(subscription.subscriber, event); } catch (Exception e) { Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause()); } } /** * 找到当前对象的所有事件类型(包括父类和接口) * * @param eventClass 事件类型 * @return 所有事件类型 */ private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) { ArrayList<Class<?>> eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); //接口本身本身存在层级 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } return eventTypes; } private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } /** * 取消订阅 * * @param subscriber 订阅者 */ public synchronized void unregister(Object subscriber) {//取消订阅 for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) { int size = entry.getValue().size(); for (int i = 0; i < size; i++) { if (entry.getValue().get(i).subscriber == subscriber) { entry.getValue().remove(i); i--; size--; } } } } /** * 订阅对象 */ static class Subscription { Object subscriber; Method method; ThreadMode mode; public Subscription(Object subscriber, Method method, ThreadMode mode) { this.subscriber = subscriber; this.method = method; this.mode = mode; } } static class MainThreadHandler extends Handler { MainThreadHandler() { super(Looper.getMainLooper()); } void enqueue(Subscription subscription, Object event) { Message message = Message.obtain(); message.obj = new PendingPost(subscription, event); //现在有两个对象需要传递,但Message只能传递单个对象,而Java没有元祖类型,所以需要自己创建对象或者放入数组中 if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; postEvent(pendingPost.subscription, pendingPost.event); } } } 复制代码
PendingPost.java代码
package de.greenrobot.event; import java.util.ArrayList; import java.util.List; import de.greenrobot.event.EventBus.Subscription; final class PendingPost { Object event; Subscription subscription; PendingPost(Object event, Subscription subscription) { this.event = event; this.subscription = subscription; } 复制代码
-
性能优化
-
findSubscriberMethods 每次都要循环遍历查找,可以第一次查找后缓存起来,下次直接从缓存中获取。(我们经常在进入页面时register,在返回页面时unregister,当第二次进入时就可以使用缓存中获取了)
实现代码为:
private static final Map<String, ArrayList<Method>> methodCache = new HashMap<>(); /** * 需要查找当前对象所有带onEvent的重载方法,包括父类 * <p> * 注意点: * 1、覆盖的方法不要重复添加 * 2、过滤掉系统类,java/javax/android开头的 * 3、只查找单个参数的方法 * 4、私有方法也查找 * * @param subscriberClass 订阅者类 * @return 所有订阅对象 */ private ArrayList<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) { String key = subscriberClass.getName() + '.' + methodName; //类名+方法名为Key ArrayList<Method> subscriberMethods = methodCache.get(key); if (subscriberMethods != null) { return subscriberMethods; } synchronized (methodCache) { subscriberMethods = methodCache.get(key); if (subscriberMethods != null) { return subscriberMethods; } subscriberMethods = new ArrayList<>(); Class<?> clazz = subscriberClass; while (clazz != null) { //过滤系统类 String name = clazz.getName(); if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) { break; } //查找目标方法 for (Method method : clazz.getDeclaredMethods()) { if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) { int i = 0, size = subscriberMethods.size(); for (; i < size; i++) { if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) { break; } } if (i == size) {//表示不存在,加入 subscriberMethods.add(method); } } } clazz = clazz.getSuperclass(); } if (subscriberMethods.isEmpty()) {//没有订阅方法,抛出异常 throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName); } else { methodCache.put(key, subscriberMethods); return subscriberMethods; } } } 复制代码
-
findEventTypes 每次发送事件都会调用,而内部实现也需要循环遍历,也可以通过缓存来提高性能。
修改代码为:
private static final Map<Class<?>, ArrayList<Class<?>>> eventTypesCache = new HashMap<>(); /** * 找到当前对象的所有事件类型(包括父类和接口) * * @param eventClass 事件类型 * @return 所有事件类型 */ private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) { ArrayList<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes != null) { return eventTypes; } synchronized (eventTypesCache) { eventTypes = eventTypesCache.get(eventClass); if (eventTypes != null) { return eventTypes; } eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); //接口本身本身存在层级 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); return eventTypes; } } 复制代码
-
unregister方法,不管有没有订阅过,都需要双层遍历,时间复杂度为O(n2),可以建立以subscriber为key的哈希表,将时间复杂度降为O(n)。
修改代码为:
//存储一个订阅对象的所有订阅事件(key:订阅对象,value:所有订阅事件) private final HashMap<Object, List<Class<?>>> eventTypesBySubscriber = new HashMap<>(); private void subscribe(Object subscriber, ThreadMode mode) {//需要在线程安全的方法中调用 //保存当前订阅者的所有订阅事件 //因为查询是通过订阅事件来查找的,所以使用map来保存订阅数据 //查找当前订阅对象的所有订阅方法 ArrayList<Method> methods = findSubscriberMethods(subscriber.getClass(), DEFAULT_METHOD_NAME); for (Method method : methods) { //订阅事件 Class<?> eventType = method.getParameterTypes()[0]; CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //需要判断是否重复添加(同一个事件已经加过订阅者),抛出异常 for (Subscription subscription : subscriptions) { if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } method.setAccessible(true);//防止私有方法不能访问 subscriptions.add(new Subscription(subscriber, method, mode)); //订阅对象的所订阅的数据类型 List<Class<?>> subscribedEvents = eventTypesBySubscriber.get(subscriber);//订阅者为key,订阅的事件为value,一对多 if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); eventTypesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); } } /** * 取消订阅 * * @param subscriber 订阅者 */ public synchronized void unregister(Object subscriber) {//取消订阅 List<Class<?>> eventTypes = eventTypesBySubscriber.get(subscriber); if (eventTypes == null) { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); return; } for (Class<?> eventType : eventTypes) { //虽然这里也是循环,但是这里的只是单个订阅者中的事件,而不是全局的事件的遍历 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size =subscriptions.size(); for (int i = 0; i < size; i++) { if (subscriptions.get(i).subscriber == subscriber) { subscriptions.remove(i); i--; size--; } } } } eventTypesBySubscriber.remove(subscriber); } 复制代码
-
PendingPost使用享元模式(重复利用对象)
修改代码为:
package org.hjb.eventbus; import java.util.ArrayList; import java.util.List; import org.hjb.eventbus.EventBus.Subscription; public class PendingPost { //使用了享元模式 private static final List<PendingPost> pendingPostPool = new ArrayList<>(0); Subscription subscription; Object event; private PendingPost(EventBus.Subscription subscription, Object event) { this.subscription = subscription; this.event = event; } static PendingPost obtain(Subscription subscription, Object event) { synchronized (pendingPostPool) {//看池子里是否有对象,如果有,使用池子里但对象 // if (!pendingPostPool.isEmpty()) { // PendingPost pendingPost = pendingPostPool.remove(0); // pendingPost.subscription = subscription; // pendingPost.event = event; // return pendingPost; // } //上面注释的代码使用第一个元素,应该使用最后一个元素, //因为使用的是数组存储,如果remove第一个的话,那么每次取出的操作都涉及到数组的搬移操作时间复杂度为O(n) //如果删除最后一个元素,不涉及搬移操作时间复杂度为O(1) //所以应该写成 int size = pendingPostPool.size(); if (size > 0) { PendingPost pendingPost = pendingPostPool.remove(size - 1); pendingPost.subscription = subscription; pendingPost.event = event; return pendingPost; } } return new PendingPost(subscription, event);//没有,创建 } void recycle() { synchronized (pendingPostPool) { pendingPostPool.add(this); } } } 复制代码
使用时也需要修改,代码为:
static class MainThreadHandler extends Handler { MainThreadHandler() { super(Looper.getMainLooper()); } void enqueue(Subscription subscription, Object event) { Message message = Message.obtain(); //现在有两个对象需要传递,但Message只能传递单个对象,而Java没有元祖类型,所以需要自己创建对象或者放入数组中 message.obj = PendingPost.obtain(subscription, event); if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; postEvent(pendingPost.subscription, pendingPost.event); pendingPost.recycle();//回收 } } 复制代码
-
总结
我们可以看出,1.0.1版本还是有些bug和不足的,后面我将分析2.x和3.x版本。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Usability for the Web
Tom Brinck、Darren Gergle、Scott D. Wood / Morgan Kaufmann / 2001-10-15 / USD 65.95
Every stage in the design of a new web site is an opportunity to meet or miss deadlines and budgetary goals. Every stage is an opportunity to boost or undercut the site's usability. Thi......一起来看看 《Usability for the Web》 这本书的介绍吧!