内容简介:首先从订阅开始
首先从订阅开始
EventBus.getDefault().register(this); 复制代码
register
方法会获取传入的object对象的class,通过 findSubscriberMethods
方法来查找这个class中订阅的方法,如下
public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } } 复制代码
findSubscriberMethods
方法实现如下,其中有一个 ConcurrentHashMap
类型的静态对象 METHOD_CACHE
,是用来缓存对应类的订阅方法的,以便后续再次订阅时不用重新去findMethods,可以直接从缓存中读取。
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } } 复制代码
查找订阅方法通过 ignoreGeneratedIndex
字段分为两种方式
第一种 findUsingReflection
是通过反射来查找,找到被 @Subscribe
注解修饰的方法,并且根据具体的注解以及方法参数生成一个 SubscriberMethod
对象:
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); 复制代码
第二种 findUsingInfo
是通过apt的方式,提前找到订阅的方法,可以避免通过反射查找方法带来的耗时。
具体使用方法:在gradle配置apt,rebuild项目,会生成一个注解方法索引类,在 EventBusBuilder
中通过 addIndex
方法新建一个该类的对象传入即可。
这边还有一个问题,对于子类重写父类的订阅方法如何处理。在上面的两种方式中在查找完子类的订阅方法后都会继续去查找父类的订阅方法,都通过一个叫做 checkAdd
的方法进行支撑,该方法返回 true
表示可以添加到订阅方法的集合中去。
boolean checkAdd(Method method, Class<?> eventType) { // 2 level check: 1st level with event type only (fast), 2ndlevelwith complete signature when required. // Usually a subscriber doesn't have methods listening to thesameevent type. Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method)existing,eventType)) { // Paranoia check throw new IllegalStateException(); } // Put any non-Method object to "consume" theexistingMethod anyMethodByEventType.put(eventType, this); } return checkAddWithMethodSignature(method, eventType); } } 复制代码
checkAdd
中设置了两种检查方式,第一种是通过 eventType
也就是订阅方法的入参来检查,这种方式比较快,只需要看下之前有没有这种入参的方法就可以了。注释中也指出了通常一个类不会有多个入参相同的订阅方法。
第二种是通过 checkAddWithMethodSignature
方法来检查,源码如下:
private boolean checkAddWithMethodSignature(Method method,Class<?>eventType) { methodKeyBuilder.setLength(0); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>').append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class<?> methodClass = method.getDeclaringClass(); Class<?> methodClassOld=subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null||methodClassOld.isAssignableFrom(methodClass)) { // Only add if not already found in a sub class return true; } else { // Revert the put, old class is further down theclasshierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld); return false; } } 复制代码
通过 method
以及 eventType
来生成一个key,来存储方法所在的类。其中 methodClassOld == null ||methodClassOld.isAssignableFrom(methodClass)
这个判断条件对应着两种情况, methodClassOld == null
说明是入参相同但是方法名不同的方法正在被添加,直接返回 true
就可以了 methodClassOld.isAssignableFrom(methodClass)
这个条件是为了过滤掉父类被子类重写的方法,前面说过了查找订阅方法是从子类开始遍历的,此时如果子类重写了父类的订阅方法,那么 methodClassOld
对应的是子类, methodClass
对应的是父类,显然这个判断就会为false,之后进入下面的else分支 return false
,也就是忽略掉父类被子类重写的方法。
查找订阅方法基本就这么点,查找完毕之后需要执行订阅操作,也就是 register
方法中的 subscribe(subscriber, subscriberMethod);
操作,直接看下该方法的实现:
private void subscribe(Object subscriber, SubscriberMethodsubscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber,subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions =subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " +subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } List<Class<?>> subscribedEvents =typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (subscriberMethod.sticky) { if (eventInheritance) { // Existing sticky events of all subclasses of eventTypehave to be considered. // Note: Iterating over all events may be inefficientwith lots of sticky events, // thus data structure should be changed to allow a moreefficient lookup // (e.g. an additional map storing sub classes of superclasses: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries =stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscriptin, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription,stickyEvent); } } } 复制代码
subscriptionsByEventType
这个Map是将 eventType
作为key保存其所对应的订阅方法的集合。该方法将刚查找到的方法添加到对应的集合中去,添加时有这样一层判断 i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority
这表示这个集合里的方法会按照所设定的优先级进行排序。紧接着又出现了个Map typesBySubscriber
将订阅者作为key保存一个 Class
的集合,暂时看不出有啥用,就先不管,最后再检查下是不是粘性事件,如果是粘性事件就根据所保存的粘性事件来执行该方法。 eventInheritance
也是在bulider中设置的,如果为 true
则会考虑事件的继承性,如果现在有 eventType
为正在订阅的方法的 eventType
的子类的粘性事件存在,那么这个粘性事件也会被正在订阅的方法接收到,直接说可能比较绕,举个栗子,现在我有两个事件,其中一个是另一个的子类,并且有两个粘性订阅方法,如下:
class EventMessage { } class SubEventMessage extends EventMessage { } @Subscribe(sticky = true) public void onEvent(EventMessage message) { // do something } @Subscribe(sticky = true) public void onSubEvent(SubEventMessage message) { // do something } 复制代码
当执行 register
时,如果内存中存在着一个类型为 SubEventMessage
的事件,那么订阅的时候 onEvent
方法会被执行,入参是内存中类型为 SubEventMessage
的事件。
现在 register
大致就分析完了,再来看下 unregister
方法:
public synchronized void unregister(Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } typesBySubscriber.remove(subscriber); } else { logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } } 复制代码
unregister
方法十分简单, typesBySubscriber
是刚才在进行订阅的时候不知道用来干什么的Map,现在知道是在取消订阅时用到的,这个Map将订阅者作为key,将其所有的订阅方法的 eventType
存入到对应的List中去,取消订阅时将这个List取出来,遍历去移除对应的订阅方法,具体实现在 unsubscribeByEventType
中,也十分简单,就不赘述了。
订阅和取消订阅都看过了,还差个发送事件,发送事件分为 post
和 postSticky
两种,先看 post
:
public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } } 复制代码
currentPostingThreadState
是个 ThreadLocal
,然后从中取出当前线程的 postingState
,也就是说每个线程都会维护一个自己的 posting
状态,之后会有个循环将事件队列清空,通过 postSingleEvent
方法来进一步处理:
private void postSingleEvent(Object event, PostingThreadStatepostingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event,postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event,postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered forevent " + eventClass); } if (sendNoSubscriberEvent && eventClass !=NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } } 复制代码
同样是通过 eventInheritance
来判断是否要涉及 eventType
的父类,之后再通过 postSingleEventForEventType
方法的返回值来得到该事件是否被处理,如果没有被处理,那么会返回 false
进入下一个分支, logNoSubscriberMessages
和 sendNoSubscriberEvents
都是在 builder
中传入的,前者用于没有订阅者处理事件时打印日志,后者用于没有订阅者处理事件时发送一个 NoSubscriberEvent
类型的事件,所以具体是怎么处理事件的还要继续看 postSingleEventForEventType
方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; } 复制代码
postSingleEventForEventType
方法从 subscriptionsByEventType
中去获取对应事件类型的所有订阅者,如果没有订阅者就返回 false
表示事件没有被处理,否则就遍历所有的订阅者,通过 postToSubscription
方法来处理事件,接着往里看:
private void postToSubscription(Subscription subscription, Objectevent, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case MAIN_ORDERED: if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { // temporary: technically not correct as poster notdecoupled from subscriber invokeSubscriber(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " +subscription.subscriberMethod.threadMode); } } 复制代码
在这个方法内终于看到通过区分注解中的 threadMode
来区分不同的处理方式了,先来看下这几种 threadMode
分别代表什么意思:
Mode | 含义 |
---|---|
POSTING | 在当前线程执行 |
MAIN | 在主线程执行 |
MAIN_ORDERED | 在主线程有序执行 |
BACKGROUND | 在后台线程执行 |
ASYNC | 在新的线程执行 |
可以看到有几个差不多,那具体有什么区别呢?直接从代码里看,先说明几个东西, invokeSubscriber
就是直接调用订阅方法,还有几个后缀为 poster
的变量暂时先理解为调用了 enqueue
方法后,订阅方法就会在某个时间被执行,后面再详细讲。
现在可以看代码了, POSTING
没什么好说的,直接调用 invokeSubscriber
,也就是说在调用 eventBus.post
的线程执行。
MAIN
和 MAIN_ORDERED
都是在主线程执行,后者的 ORDERED
体现在什么地方呢,先看下 MAIN
的分支,其中通过 mainThreadPoster.enqueue
插入的事件会在主线程执行,判断当前线程是否是主线程来决定直接调用订阅方法还是通过 mainThreadPoster
来发布,这里应该没什么疑惑的,主要是 MAIN_ORDERED
:
if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { // temporary: technically not correct as poster notdecoupled from subscriber invokeSubscriber(subscription, event); } 复制代码
mainThreadPoster
不为空时,通过 mainThreadPoster
来发布事件,为空时直接调用订阅方法,说好的在主线程调用呢?这里注释也说明了是不正确的,实际上 mainThreadPoster
为空本身就是种异常情况,具体可以看下它的初始化过程,这里就不细说了。所以下面的 else
分支就先不管了,那么为什么说通过 mainThreadPoster
发布的事件就是“有序”的呢,实际上 mainThreadPoster
内部实现是个 handler
,可以将事件 post
到主线程中去执行,所以说是有序的,这里简单说明下原因:
主线程维护着一个消息队列,循环从里面取出消息来处理,我们知道可以通过 view
的 post
方法来获取它绘制完成之后的宽高,原因是 post
方法里的事件会被插入到消息队列的尾部,而 view
的 measure
, layout
, draw
都在新插入的消息的前面,所以当 post
的方法执行时, view
肯定已经绘制好了。
而 handler
通过 sendMessage
发送的消息也会被插入到主线程消息队列的尾部,这就是“有序”,比如现在有一个 ImageView
,在它的 onMeasure
中去发布一个事件,如果订阅方法的模式是 MAIN
那么会在 onMeasure
中调用订阅方法,而如果模式是 MAIN_ORDERED
那么会在 ImageView
绘制完成后调用订阅方法。
再来看下 BACKGROUND
和 ASYNC
的区别:
case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; 复制代码
其中 backgroundPoster
和 asyncPoster
都会开启一个新线程来执行订阅方法,暂时当成是一样的就行,那么区别就是 BACKGROUND
模式如果在子线程 post
一个事件,那么会直接在该线程调用订阅方法,只有在主线程 post
事件才会开启一个新线程。而 ASYNC
模式,不管是在哪 post
事件,都会开启一个新线程来调用订阅方法。
最后再看下几个 poster
基本上就看完了,几个 poster
都实现了同一个接口 Poster
:
interface Poster { /** * Enqueue an event to be posted for a particular subscription. * * @param subscription Subscription which will receive the event. * @param event Event that will be posted to subscribers. */ void enqueue(Subscription subscription, Object event); } 复制代码
可以看到里面只有一个需要实现的方法 enqueue
,是用来插入事件的,这个接口被三个类实现,分别是 HandlerPoster
, BackgroundPoster
和 AsyncPoster
,上面的 mainThreadPoster
对应的就是 HandlerPoster
,这三个类中都有个类型为 PendingPostQueue
的成员变量,这是个事件队列,具体实现就不看了,这个队列提供了入队和出队的方法。
先看下 HandlerPoster
的 enqueue
方法:
public class HandlerPoster extends Handler implements Poster { public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } } } 复制代码
HandlerPoster
继承了 Handler
,调用 enqueue
方法后会向事件队列中插入一个事件,然后将标记位 handlerActive
设置为 true
表示正在处理事件,然后调用 sendMessage
发送消息通知处理事件。 PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
这行是用来获取一个消息队列的 Node
用来插入到队列中去, EventBus
维护着一个 pool
用来保存闲置的 Node
当有需要时从中取出一个给事件使用, pool
不够用时才会 new
新的 Node
出来,具体可以看下 PendingPost
,这样做的好处是可以避免频繁创建对象带来的开销。
再看下 HandlerPoster
的 handleMessage
方法:
public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } } 复制代码
首先会记录下开始处理事件的时间,然后从事件队列中取出事件,如果为空就将 handlerActive
设置为 false
直接 return
了,如果不为空,就调用 eventBus.invokeSubscriber(pendingPost);
来调用订阅方法,执行完后,再看下时间,如果超出了规定的时间那么重新发送一条消息,本次消息处理结束,等下次轮到自己的时候再处理事件,毕竟不能一直处理队列里的事件而阻塞了主线程,如果没有超出规定事件,那么说明还可以有事件可以处理下一个事件,就会再次进入循环。
BackgroundPoster
和 AsyncPoster
其实和 HandlerPoster
差不多,只是没有用 Handler
而是用了线程池去处理事件,具体就不看了。
对了,还有个发送粘性事件:
public void postSticky(Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } // Should be posted after it is putted, in case the subscriberwants to remove immediately post(event); } 复制代码
就是在 stickyEvents
这个 map
里存一下。
好了,完了。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Windows黑客编程技术详解
甘迪文 / 人民邮电出版社 / 2018-12 / 108
《Windows黑客编程技术详解》介绍的是黑客编程的基础技术,涉及用户层下的Windows编程和内核层下的Rootkit编程。本书分为用户篇和内核篇两部分,用户篇包括11章,配套49个示例程序源码;内核篇包括7章,配套28个示例程序源码。本书介绍的每个技术都有详细的实现原理,以及对应的示例代码(配套代码均支持32位和64位Windows 7、Windows 8.1及Windows 10系统),旨在......一起来看看 《Windows黑客编程技术详解》 这本书的介绍吧!