EventBus是通过使用发布者/订阅者模式来实现解耦的Android和 Java 开源库。在Android开发中通常使用EventBus实现Activities, Fragments, Threads, Services等组件之间的通信。但EventBus不能实现跨进程间的通信。
源码分析 EventBus.java
package de.greenrobot.event; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.util.Log; /** * Class based event bus, optimized for Android. By default, subscribers will handle events in methods named "onEvent". * * @author Markus Junginger, greenrobot */ public class EventBus { /** * Log tag, apps may override it. */ public static String TAG = "Event"; private static final EventBus defaultInstance = new EventBus();//默认实例 public enum ThreadMode { /** * Subscriber will be called in the same thread, which is posting the event. */ PostThread,//发布事件所在线程订阅 /** * Subscriber will be called in Android's main thread (sometimes referred to as UI thread). */ MainThread,//主线程订阅 /* BackgroundThread */ } //表示某个类中同一个方法名的所有重载方法。(也就是一个类中所有的订阅方法),缓存,目的:提高性能 private static final Map<String, List<Method>> methodCache = new HashMap<String, List<Method>>(); //保存事件类型的所有的事件(包括父类和接口),懒加载的,缓存,目的:了提高性能 private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<Class<?>, List<Class<?>>>(); //一个事件类型的所有订阅者:发送事件时使用到(时间复杂度为:O(1)) //用CopyOnWriteArrayList是为了读取是线程安全的。 private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; //一个订阅对象的所有订阅事件: // 订阅(register)时是加入 // 取消订阅(unregister)时使用:获取当前订阅对象的所有订阅事件,然后在订阅事件中将这个订阅者删除。 private final Map<Object, List<Class<?>>> typesBySubscriber; //每个线程的事件队列 private final ThreadLocal<List<Object>> currentThreadEventQueue = new ThreadLocal<List<Object>>() { @Override protected List<Object> initialValue() { return new ArrayList<Object>(); } }; //每个线程的队列是否在运转中 private final ThreadLocal<BooleanWrapper> currentThreadIsPosting = new ThreadLocal<BooleanWrapper>() { @Override protected BooleanWrapper initialValue() { return new BooleanWrapper(); } }; private String defaultMethodName = "onEvent"; private PostViaHandler mainThreadPoster; public static EventBus getDefault() { return defaultInstance; } public EventBus() { subscriptionsByEventType = new HashMap<Class<?>, CopyOnWriteArrayList<Subscription>>(); typesBySubscriber = new HashMap<Object, List<Class<?>>>(); mainThreadPoster = new PostViaHandler(Looper.getMainLooper()); } public void register(Object subscriber) {//注册 register(subscriber, defaultMethodName, ThreadMode.PostThread);//默认方法名,即onEvent, } public void registerForMainThread(Object subscriber) { register(subscriber, defaultMethodName, ThreadMode.MainThread); } //这里应该加同步synchronized public void register(Object subscriber, String methodName, ThreadMode threadMode) {//传入订阅者,遍历查找其订阅的所有事件 List<Method> subscriberMethods = findSubscriberMethods(subscriber.getClass(), methodName); for (Method method : subscriberMethods) { Class<?> eventType = method.getParameterTypes()[0];//参数的类型 //查找的时候是根据参数的类型来确定订阅者的 subscribe(subscriber, method, eventType, threadMode); } } //如果当前类含有父类,是否查找父类中的所有含指定名称的方法 private List<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) {//查找某个类中所有指定名称的方法 String key = subscriberClass.getName() + '.' + methodName; //类名+方法名为Key //key相同的同时进入,会出现两次创建 List<Method> subscriberMethods; synchronized (methodCache) { subscriberMethods = methodCache.get(key);//方法名称一样的存在多个重载方法 } if (subscriberMethods != null) { return subscriberMethods; } //同时进入 subscriberMethods = new ArrayList<Method>(); Class<?> clazz = subscriberClass; HashSet<Class<?>> eventTypesFound = new HashSet<Class<?>>();// 同一个类参数相同的方法只加入一次(就是重写的方法) while (clazz != null) { String name = clazz.getName(); if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {//过滤掉系统类 // Skip system classes, this just degrades performance break; } Method[] methods = clazz.getDeclaredMethods(); for (Method method : methods) { if (method.getName().equals(methodName)) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) {//参数为1个 if (eventTypesFound.add(parameterTypes[0])) { // Only add if not already found in a sub class subscriberMethods.add(method); } } } } clazz = clazz.getSuperclass();//查找父类,也就是父类方法也会被调用 } if (subscriberMethods.isEmpty()) {//没有订阅方法,抛出异常 throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName); } else { synchronized (methodCache) { methodCache.put(key, subscriberMethods); } return subscriberMethods; } } public void register(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件类型 register(subscriber, defaultMethodName, ThreadMode.PostThread, eventType, moreEventTypes); } public void registerForMainThread(Object subscriber, Class<?> eventType, Class<?>... moreEventTypes) {//指定事件类型注册 register(subscriber, defaultMethodName, ThreadMode.MainThread, eventType, moreEventTypes); } public synchronized void register(Object subscriber, String methodName, ThreadMode threadMode, Class<?> eventType, Class<?>... moreEventTypes) { Class<?> subscriberClass = subscriber.getClass(); Method method = findSubscriberMethod(subscriberClass, methodName, eventType); subscribe(subscriber, method, eventType, threadMode); for (Class<?> anothereventType : moreEventTypes) { method = findSubscriberMethod(subscriberClass, methodName, anothereventType); subscribe(subscriber, method, anothereventType, threadMode); } } //应该在对象lock中调用 private void subscribe(Object subscriber, Method subscriberMethod, Class<?> eventType, ThreadMode threadMode) { //调用方:register(Object subscriber, String methodName, ThreadMode threadMode)没有加锁 //并发的问题:多个线程同时进入,不同页面都含有同一事件 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>(); subscriptionsByEventType.put(eventType, subscriptions);//事件类型为key,订阅者为value,一对多 } else { for (Subscription subscription : subscriptions) {//同一事件的订阅者,多次设置 if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } subscriberMethod.setAccessible(true); Subscription subscription = new Subscription(subscriber, subscriberMethod, threadMode); subscriptions.add(subscription); List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);//订阅者为key,订阅的事件为value,一对多 if (subscribedEvents == null) { subscribedEvents = new ArrayList<Class<?>>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); } /** * Class.getMethod is slow on Android 2.3 (and probably other versions), so use getDeclaredMethod and go up in the * class hierarchy if neccessary. */ private Method findSubscriberMethod(Class<?> subscriberClass, String methodName, Class<?> eventType) { Class<?> clazz = subscriberClass; while (clazz != null) { try { return clazz.getDeclaredMethod(methodName, eventType); } catch (NoSuchMethodException ex) { clazz = clazz.getSuperclass(); } } throw new RuntimeException("Method " + methodName + " not found in " + subscriberClass + " (must have single parameter of event type " + eventType + ")"); } /** * Unregisters the given subscriber for the given event classes. */ public synchronized void unregister(Object subscriber, Class<?>... eventTypes) { if (eventTypes.length == 0) { throw new IllegalArgumentException("Provide at least one event class"); } List<Class<?>> subscribedClasses = typesBySubscriber.get(subscriber); if (subscribedClasses != null) { for (Class<?> eventType : eventTypes) { unubscribeByEventType(subscriber, eventType); subscribedClasses.remove(eventType); } if (subscribedClasses.isEmpty()) { typesBySubscriber.remove(subscriber); } } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } /** * Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */ private void unubscribeByEventType(Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { if (subscriptions.get(i).subscriber == subscriber) { subscriptions.remove(i); i--; size--; } } } } /** * Unregisters the given subscriber from all event classes. */ public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);//删除所有的订阅事件 if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unubscribeByEventType(subscriber, eventType);//删除事件类型 } typesBySubscriber.remove(subscriber); } else { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } //post(Object event) //post(Object event) /** * Posts the given event to the event bus. */ public void post(Object event) {//String Object List<Object> eventQueue = currentThreadEventQueue.get(); //每个线程一个队列,保证同一个线程发送事件的有序性 eventQueue.add(event); BooleanWrapper isPosting = currentThreadIsPosting.get(); if (isPosting.value) {//当前线程队列正在运转 return; } else {//没有运转开始运转 isPosting.value = true; try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0)); //开始发送事件 } } finally { isPosting.value = false; } } } private void postSingleEvent(Object event) throws Error { List<Class<?>> eventTypes = findEventTypes(event.getClass()); //找到所有的事件类型(需要给父事件和接口事件也发送) boolean subscriptionFound = false; int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) {//只能保证Map的原子性 subscriptions = subscriptionsByEventType.get(clazz); } if (subscriptions != null) { for (Subscription subscription : subscriptions) {//因为是CopyOnWriteArrayList,所以写入时读取是线程安全的 if (subscription.threadMode == ThreadMode.PostThread) {//在发送线程中执行 postToSubscribtion(subscription, event); } else if (subscription.threadMode == ThreadMode.MainThread) {//在主线程中执行 mainThreadPoster.enqueue(event, subscription);//在主线程中执行 } else { throw new IllegalStateException("Unknown thread mode: " + subscription.threadMode); } } subscriptionFound = true; } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } //查找所有的事件类型:当前类型和所有父类型,所有接口,包括父类型 // /** * Finds all Class objects including super classes and interfaces. */ private List<Class<?>> findEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); //缓存中存在,直接返回 if (eventTypes == null) { eventTypes = new ArrayList<Class<?>>(); Class<?> clazz = eventClass; while (clazz != null) {//事件类型,父事件 eventTypes.add(clazz); //本身是一个事件类型 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } } /** * Recurses through super interfaces. */ static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } static void postToSubscribtion(Subscription subscription, Object event) throws Error { try { subscription.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), cause); if (cause instanceof Error) { throw (Error) cause; } } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } } final static class Subscription { final Object subscriber; final Method method; final ThreadMode threadMode; Subscription(Object subscriber, Method method, ThreadMode threadMode) { this.subscriber = subscriber; this.method = method; this.threadMode = threadMode; } @Override public boolean equals(Object other) { if (other instanceof Subscription) { Subscription otherSubscription = (Subscription) other; // Super slow (improve once used): http://code.google.com/p/android/issues/detail?id=7811 return subscriber == otherSubscription.subscriber && method.equals(otherSubscription.method); } else { return false; } } @Override public int hashCode() { // Check performance once used return subscriber.hashCode() + method.hashCode(); } } /** * For ThreadLocal, much faster to set than storing a new Boolean. */ final static class BooleanWrapper {//提高性能,使用Boolean,修改值时需要每次set boolean value; } final static class PostViaHandler extends Handler {//主线程发送队列 PostViaHandler(Looper looper) { super(looper); } void enqueue(Object event, Subscription subscription) { PendingPost pendingPost = PendingPost.obtainPendingPost(event, subscription); //待发送对象 Message message = obtainMessage(); message.obj = pendingPost; if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); package de.greenrobot.event; import java.util.ArrayList; import java.util.List; import de.greenrobot.event.EventBus.Subscription; final class PendingPost {//使用了享元模式 private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>(); Object event; Subscription subscription; private PendingPost(Object event, Subscription subscription) { this.event = event; this.subscription = subscription; } static PendingPost obtainPendingPost(Object event, Subscription subscription) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0) { PendingPost pendingPost = pendingPostPool.remove(size - 1); pendingPost.event = event; pendingPost.subscription = subscription; return pendingPost; } } return new PendingPost(event, subscription); } static void releasePendingPost(PendingPost pendingPost) { synchronized (pendingPostPool) { pendingPostPool.add(pendingPost); } } } 复制代码
1.在作者的注释中(Class based event bus[指Google guava库中的 event bus], optimized for Android. By default, subscribers will handle events in methods named "onEvent".)可以看到,为了优化性能,弃用了注解,使用固定或指定订阅方法的方式,
- 以下代码为何需要使用事件队列,直接发送事件给订阅者不是更快更直接吗?
public void post(Object event) {//String Object List<Object> eventQueue = currentThreadEventQueue.get(); //每个线程一个队列,保证同一个线程发送事件的有序性 eventQueue.add(event); BooleanWrapper isPosting = currentThreadIsPosting.get(); if (isPosting.value) {//当前线程队列正在运转 //在同一个线程中什么时候会走到这里呢?如果走不到这里那么这个队列就没有意义 return; } else {//没有运转开始运转 isPosting.value = true; try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0)); //开始发送事件 } } finally { isPosting.value = false; } } } 如果我们能找到队列正在运转的情况下,那么队列就有意义,能保证事件处理的有序性。 举例: public class RefreshEvent {} public class Activity1 extends AppCompatActivity{ @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); EventBus.getDefault().register(this); } @Override protected void onDestroy() { EventBus.getDefault().unregister(this); super.onDestroy(); } public void onEvent(TestEvent event) { EventBus.getDefault().post(new RefreshEvent()); } } public class ActivityB extends AppCompatActivity{ @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); EventBus.getDefault().register(this); } @Override protected void onDestroy() { EventBus.getDefault().unregister(this); super.onDestroy(); } public void onEvent(TestEvent event) { } } 我们在后台业务方法中发送了刷新事件: EventBus.getDefault().post(new RefreshEvent()); 上面的例子中:1、一个事件被多个订阅者订阅;2、订阅者收到事件后再次发送相同事件。 在这种情况下,如果我们不使用队列,那么在同一个线程中,先发送的事件订阅者就可能会被后收到。 这里使用ThreadLocal+队列,就很巧妙地维护了在同一个线程中事件处理的有序性。 复制代码
public void register(Object subscriber, String methodName, ThreadMode threadMode) 方法存在线程安全的问题,除非调用者都加了锁,但代码中最常用的调用public void register(Object subscriber, String methodName, ThreadMode threadMode)却没有加锁。
- 消息的订阅:register(Object subscriber)
- 消息的发布:post(Object event)
- 取消订阅:unregister(Object subscriber)
package org.hjb.eventbus; import android.util.Log; import androidx.annotation.NonNull; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; public class EventBus { private static final String TAG = "EventBus"; private static volatile EventBus mDefaultEventBus; private static final String DEFAULT_METHOD_NAME = "onEvent"; //key:订阅事件 //value:订阅者信息,订阅者信息包括:订阅对象,订阅方法 private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>(); private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() { @Override protected ArrayList<Object> initialValue() { return new ArrayList<>(); } }; private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return Boolean.FALSE; } }; public EventBus() { } public static EventBus getDefault() { if (mDefaultEventBus == null) { synchronized (EventBus.class) { if (mDefaultEventBus == null) { mDefaultEventBus = new EventBus(); } } } return mDefaultEventBus; } /** * 订阅 * * @param subscriber 订阅者 */ public synchronized void register(@NonNull Object subscriber) {//订阅 //保存当前订阅者的所有订阅事件 //因为查询是通过订阅事件来查找的,所以使用map来保存订阅数据 //查找当前订阅对象的所有订阅方法 ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME); for (Method method : methods) { //订阅事件 Class<?> eventType = method.getParameterTypes()[0]; CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //需要判断是否重复添加(同一个事件已经加过订阅者),抛出异常 for (Subscription subscription : subscriptions) { if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } method.setAccessible(true);//防止私有方法不能访问 subscriptions.add(new Subscription(subscriber, method)); } } /** * 需要查找当前对象所有带onEvent的重载方法,包括父类 * <p> * 注意点: * 1、覆盖的方法不要重复添加 * 2、过滤掉系统类,java/javax/android开头的 * 3、只查找单个参数的方法 * 4、私有方法也查找 * * @param subscriber 订阅者 * @return 所有订阅对象 */ private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) { ArrayList<Method> subscriberMethods = new ArrayList<>(); Class<?> clazz = subscriber.getClass(); while (clazz != null) { //过滤系统类 String name = clazz.getName(); if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) { break; } //查找目标方法 for (Method method : clazz.getDeclaredMethods()) { if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) { int i = 0, size = subscriberMethods.size(); for (; i < size; i++) { if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) { break; } } if (i == size) {//表示不存在,加入 subscriberMethods.add(method); } } } clazz = clazz.getSuperclass(); } return subscriberMethods; } /** * 发送事件 * <p> * 需要给所有订阅者发送事件 * <p> * 注意:我们要发送的不仅是当前对象的订阅者,同时包括当前父类对象和接口对象的订阅者。 * <p> * 解决有有序性的问题:我们需要保证同一个线程中 * * @param event 事件 */ public void post(Object event) { List<Object> eventQueue = currentThreadEventQueue.get(); eventQueue.add(event); Boolean isPosting = currentThreadIsPosting.get(); if (!isPosting.booleanValue()) { currentThreadIsPosting.set(Boolean.TRUE); while (!eventQueue.isEmpty()) { postEvent(eventQueue.remove(0)); } currentThreadIsPosting.set(Boolean.FALSE); } else { Log.i("hejunbin", "正在运转"); } } /** * 发送事件 * * @param event 待发送事件 */ private void postEvent(Object event) { //1、找到当前对象的所有事件类型 ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass()); //2、找到此事件所有的订阅者 boolean subscriptionFound = false; //是否找到了订阅者 for (Class<?> eventType : eventTypes) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventType); } if (subscriptions != null && subscriptions.size() > 0) { subscriptionFound = true; for (Subscription subscription : subscriptions) { try { subscription.method.invoke(subscription.subscriber, event); } catch (Exception e) { Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause()); } } } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } /** * 找到当前对象的所有事件类型(包括父类和接口) * * @param eventClass 事件类型 * @return 所有事件类型 */ private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) { ArrayList<Class<?>> eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); //接口本身本身存在层级 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } return eventTypes; } private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } /** * 取消订阅 * * @param subscriber 订阅者 */ public synchronized void unregister(Object subscriber) {//取消订阅 for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) { int size = entry.getValue().size(); for (int i = 0; i < size; i++) { if (entry.getValue().get(i).subscriber == subscriber) { entry.getValue().remove(i); i--; size--; } } } } /** * 订阅对象 */ static class Subscription { Object subscriber; Method method; public Subscription(Object subscriber, Method method) { this.subscriber = subscriber; this.method = method; } } } 复制代码
package org.hjb.eventbus; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.util.Log; import androidx.annotation.MainThread; import androidx.annotation.NonNull; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; public class EventBus { private static final String TAG = "EventBus"; private static volatile EventBus mDefaultEventBus; private static final String DEFAULT_METHOD_NAME = "onEvent"; public enum ThreadMode { PostThread, //post所在线程 MainThread,//主线程 } //key:订阅事件 //value:订阅者信息,订阅者信息包括:订阅对象,订阅方法 private final HashMap<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>(); private ThreadLocal<ArrayList<Object>> currentThreadEventQueue = new ThreadLocal<ArrayList<Object>>() { @Override protected ArrayList<Object> initialValue() { return new ArrayList<>(); } }; private ThreadLocal<Boolean> currentThreadIsPosting = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return Boolean.FALSE; } }; private MainThreadHandler mainThreadHandler; public EventBus() { mainThreadHandler = new MainThreadHandler(); } public static EventBus getDefault() { if (mDefaultEventBus == null) { synchronized (EventBus.class) { if (mDefaultEventBus == null) { mDefaultEventBus = new EventBus(); } } } return mDefaultEventBus; } /** * 订阅 * * @param subscriber 订阅者 */ public synchronized void register(@NonNull Object subscriber) {//订阅 subscribe(subscriber, ThreadMode.PostThread); } public synchronized void registerForMainThread(Object subscriber) { subscribe(subscriber, ThreadMode.MainThread); } public synchronized void register(Object subscriber, ThreadMode mode) { subscribe(subscriber, mode); } private void subscribe(Object subscriber, ThreadMode mode) {//需要在线程安全的方法中调用 //保存当前订阅者的所有订阅事件 //因为查询是通过订阅事件来查找的,所以使用map来保存订阅数据 //查找当前订阅对象的所有订阅方法 ArrayList<Method> methods = findSubscriberMethods(subscriber, DEFAULT_METHOD_NAME); for (Method method : methods) { //订阅事件 Class<?> eventType = method.getParameterTypes()[0]; CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //需要判断是否重复添加(同一个事件已经加过订阅者),抛出异常 for (Subscription subscription : subscriptions) { if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } method.setAccessible(true);//防止私有方法不能访问 subscriptions.add(new Subscription(subscriber, method, mode)); } } /** * 需要查找当前对象所有带onEvent的重载方法,包括父类 * <p> * 注意点: * 1、覆盖的方法不要重复添加 * 2、过滤掉系统类,java/javax/android开头的 * 3、只查找单个参数的方法 * 4、私有方法也查找 * * @param subscriber 订阅者 * @return 所有订阅对象 */ private ArrayList<Method> findSubscriberMethods(Object subscriber, String methodName) { ArrayList<Method> subscriberMethods = new ArrayList<>(); Class<?> clazz = subscriber.getClass(); while (clazz != null) { //过滤系统类 String name = clazz.getName(); if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) { break; } //查找目标方法 for (Method method : clazz.getDeclaredMethods()) { if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) { int i = 0, size = subscriberMethods.size(); for (; i < size; i++) { if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) { break; } } if (i == size) {//表示不存在,加入 subscriberMethods.add(method); } } } clazz = clazz.getSuperclass(); } return subscriberMethods; } /** * 发送事件 * <p> * 需要给所有订阅者发送事件 * <p> * 注意:我们要发送的不仅是当前对象的订阅者,同时包括当前父类对象和接口对象的订阅者。 * <p> * 解决有有序性的问题:我们需要保证同一个线程中 * * @param event 事件 */ public void post(Object event) { List<Object> eventQueue = currentThreadEventQueue.get(); eventQueue.add(event); Boolean isPosting = currentThreadIsPosting.get(); if (!isPosting.booleanValue()) { currentThreadIsPosting.set(Boolean.TRUE); while (!eventQueue.isEmpty()) { postEvent(eventQueue.remove(0)); } currentThreadIsPosting.set(Boolean.FALSE); } else { Log.i("hejunbin", "正在运转"); } } /** * 发送事件 * * @param event 待发送事件 */ private void postEvent(Object event) { //1、找到当前对象的所有事件类型 ArrayList<Class<?>> eventTypes = findEventTypes(event.getClass()); //2、找到此事件所有的订阅者 boolean subscriptionFound = false; //是否找到了订阅者 for (Class<?> eventType : eventTypes) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventType); } if (subscriptions != null && subscriptions.size() > 0) { subscriptionFound = true; for (Subscription subscription : subscriptions) { if (subscription.mode == ThreadMode.PostThread) { postEvent(subscription, event); } else if (subscription.mode == ThreadMode.MainThread) {//在主线程中发送 mainThreadHandler.enqueue(subscription, event); } else {//非法 throw new IllegalStateException("Unknown thread mode: " + subscription.mode); } } } } if (!subscriptionFound) { Log.d(TAG, "No subscripers registered for event " + event.getClass()); } } private static void postEvent(Subscription subscription, Object event) { try { subscription.method.invoke(subscription.subscriber, event); } catch (Exception e) { Log.e(TAG, "Could not dispatch event: " + event.getClass() + " to subscribing class " + subscription.subscriber.getClass(), e.getCause()); } } /** * 找到当前对象的所有事件类型(包括父类和接口) * * @param eventClass 事件类型 * @return 所有事件类型 */ private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) { ArrayList<Class<?>> eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); //接口本身本身存在层级 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } return eventTypes; } private void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) { for (Class<?> interfaceClass : interfaces) { if (!eventTypes.contains(interfaceClass)) { eventTypes.add(interfaceClass); addInterfaces(eventTypes, interfaceClass.getInterfaces()); } } } /** * 取消订阅 * * @param subscriber 订阅者 */ public synchronized void unregister(Object subscriber) {//取消订阅 for (Map.Entry<Class<?>, CopyOnWriteArrayList<Subscription>> entry : subscriptionsByEventType.entrySet()) { int size = entry.getValue().size(); for (int i = 0; i < size; i++) { if (entry.getValue().get(i).subscriber == subscriber) { entry.getValue().remove(i); i--; size--; } } } } /** * 订阅对象 */ static class Subscription { Object subscriber; Method method; ThreadMode mode; public Subscription(Object subscriber, Method method, ThreadMode mode) { this.subscriber = subscriber; this.method = method; this.mode = mode; } } static class MainThreadHandler extends Handler { MainThreadHandler() { super(Looper.getMainLooper()); } void enqueue(Subscription subscription, Object event) { Message message = Message.obtain(); message.obj = new PendingPost(subscription, event); //现在有两个对象需要传递,但Message只能传递单个对象,而Java没有元祖类型,所以需要自己创建对象或者放入数组中 if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; postEvent(pendingPost.subscription, pendingPost.event); package de.greenrobot.event; import java.util.ArrayList; import java.util.List; import de.greenrobot.event.EventBus.Subscription; final class PendingPost { Object event; Subscription subscription; PendingPost(Object event, Subscription subscription) { this.event = event; this.subscription = subscription; } 复制代码
findSubscriberMethods 每次都要循环遍历查找,可以第一次查找后缓存起来,下次直接从缓存中获取。(我们经常在进入页面时register,在返回页面时unregister,当第二次进入时就可以使用缓存中获取了)
private static final Map<String, ArrayList<Method>> methodCache = new HashMap<>(); /** * 需要查找当前对象所有带onEvent的重载方法,包括父类 * <p> * 注意点: * 1、覆盖的方法不要重复添加 * 2、过滤掉系统类,java/javax/android开头的 * 3、只查找单个参数的方法 * 4、私有方法也查找 * * @param subscriberClass 订阅者类 * @return 所有订阅对象 */ private ArrayList<Method> findSubscriberMethods(Class<?> subscriberClass, String methodName) { String key = subscriberClass.getName() + '.' + methodName; //类名+方法名为Key ArrayList<Method> subscriberMethods = methodCache.get(key); if (subscriberMethods != null) { return subscriberMethods; } synchronized (methodCache) { subscriberMethods = methodCache.get(key); if (subscriberMethods != null) { return subscriberMethods; } subscriberMethods = new ArrayList<>(); Class<?> clazz = subscriberClass; while (clazz != null) { //过滤系统类 String name = clazz.getName(); if (name.startsWith("java") || name.startsWith("javax") || name.startsWith("android")) { break; } //查找目标方法 for (Method method : clazz.getDeclaredMethods()) { if (method.getName().equals(methodName) && method.getParameterTypes().length == 1) { int i = 0, size = subscriberMethods.size(); for (; i < size; i++) { if (subscriberMethods.get(i).getParameterTypes()[0].equals(method.getParameterTypes()[0])) { break; } } if (i == size) {//表示不存在,加入 subscriberMethods.add(method); } } } clazz = clazz.getSuperclass(); } if (subscriberMethods.isEmpty()) {//没有订阅方法,抛出异常 throw new RuntimeException("Subscriber " + subscriberClass + " has no methods called " + methodName); } else { methodCache.put(key, subscriberMethods); return subscriberMethods; } } } 复制代码
findEventTypes 每次发送事件都会调用,而内部实现也需要循环遍历,也可以通过缓存来提高性能。
private static final Map<Class<?>, ArrayList<Class<?>>> eventTypesCache = new HashMap<>(); /** * 找到当前对象的所有事件类型(包括父类和接口) * * @param eventClass 事件类型 * @return 所有事件类型 */ private ArrayList<Class<?>> findEventTypes(Class<?> eventClass) { ArrayList<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes != null) { return eventTypes; } synchronized (eventTypesCache) { eventTypes = eventTypesCache.get(eventClass); if (eventTypes != null) { return eventTypes; } eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); //接口本身本身存在层级 addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); return eventTypes; } } 复制代码
//存储一个订阅对象的所有订阅事件(key:订阅对象,value:所有订阅事件) private final HashMap<Object, List<Class<?>>> eventTypesBySubscriber = new HashMap<>(); private void subscribe(Object subscriber, ThreadMode mode) {//需要在线程安全的方法中调用 //保存当前订阅者的所有订阅事件 //因为查询是通过订阅事件来查找的,所以使用map来保存订阅数据 //查找当前订阅对象的所有订阅方法 ArrayList<Method> methods = findSubscriberMethods(subscriber.getClass(), DEFAULT_METHOD_NAME); for (Method method : methods) { //订阅事件 Class<?> eventType = method.getParameterTypes()[0]; CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { //需要判断是否重复添加(同一个事件已经加过订阅者),抛出异常 for (Subscription subscription : subscriptions) { if (subscription.subscriber == subscriber) { throw new RuntimeException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } } method.setAccessible(true);//防止私有方法不能访问 subscriptions.add(new Subscription(subscriber, method, mode)); //订阅对象的所订阅的数据类型 List<Class<?>> subscribedEvents = eventTypesBySubscriber.get(subscriber);//订阅者为key,订阅的事件为value,一对多 if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); eventTypesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); } } /** * 取消订阅 * * @param subscriber 订阅者 */ public synchronized void unregister(Object subscriber) {//取消订阅 List<Class<?>> eventTypes = eventTypesBySubscriber.get(subscriber); if (eventTypes == null) { Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass()); return; } for (Class<?> eventType : eventTypes) { //虽然这里也是循环,但是这里的只是单个订阅者中的事件,而不是全局的事件的遍历 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size =subscriptions.size(); for (int i = 0; i < size; i++) { if (subscriptions.get(i).subscriber == subscriber) { subscriptions.remove(i); i--; size--; } } } } eventTypesBySubscriber.remove(subscriber); } 复制代码
package org.hjb.eventbus; import java.util.ArrayList; import java.util.List; import org.hjb.eventbus.EventBus.Subscription; public class PendingPost { //使用了享元模式 private static final List<PendingPost> pendingPostPool = new ArrayList<>(0); Subscription subscription; Object event; private PendingPost(EventBus.Subscription subscription, Object event) { this.subscription = subscription; this.event = event; } static PendingPost obtain(Subscription subscription, Object event) { synchronized (pendingPostPool) {//看池子里是否有对象,如果有,使用池子里但对象 // if (!pendingPostPool.isEmpty()) { // PendingPost pendingPost = pendingPostPool.remove(0); // pendingPost.subscription = subscription; // pendingPost.event = event; // return pendingPost; // } //上面注释的代码使用第一个元素,应该使用最后一个元素, //因为使用的是数组存储,如果remove第一个的话,那么每次取出的操作都涉及到数组的搬移操作时间复杂度为O(n) //如果删除最后一个元素,不涉及搬移操作时间复杂度为O(1) //所以应该写成 int size = pendingPostPool.size(); if (size > 0) { PendingPost pendingPost = pendingPostPool.remove(size - 1); pendingPost.subscription = subscription; pendingPost.event = event; return pendingPost; } } return new PendingPost(subscription, event);//没有,创建 } void recycle() { synchronized (pendingPostPool) { pendingPostPool.add(this); } } } 复制代码
static class MainThreadHandler extends Handler { MainThreadHandler() { super(Looper.getMainLooper()); } void enqueue(Subscription subscription, Object event) { Message message = Message.obtain(); //现在有两个对象需要传递,但Message只能传递单个对象,而Java没有元祖类型,所以需要自己创建对象或者放入数组中 message.obj = PendingPost.obtain(subscription, event); if (!sendMessage(message)) {//发送失败:usually because the looper processing the message queue is exiting. throw new RuntimeException("Could not send handler message"); } } @Override public void handleMessage(Message msg) { PendingPost pendingPost = (PendingPost) msg.obj; postEvent(pendingPost.subscription, pendingPost.event); pendingPost.recycle();//回收 } } 复制代码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
