停停走走又几天,断断续续这一年。时间还是如此之快,转眼间又到了17年的末尾了。趁着这些天还能静下心来看看代码,赶紧记录下自己的心得。
这一周,工作任务不多,总是想要看点什么但又不清楚该看啥,有些迷茫~不过既然自己还想要在Android路上继续前行,那么请深入探索吧!
OK,闲话不多说。今天打算记录下关于EventBus的原理以及如何实现。
1. 关于EventBus的使用
还依稀记得第一份工作中,那是还是个零基础的菜鸟。修修补补那个项目,项目中就用到了EventBus,当时并没有能力深入了解。当时版本的EventBus还没有使用注解的形式,还需要自定义(记得是重写onEvent方法还是啥)。
不过,现在EventBus的版本已经到了3.+了,使用方法也和原来大不相同了。总体而言,代码越来越简洁,耦合度也越来越低了。说下简单用法吧:
// 把当前的Activity或Fragment注册到EventBus
// 相应的,需要在页面销毁是调用unRegister方法
EventBus.getDefault().register(this);
// 通过注解订阅方法
// threadMode :方法执行的线程
// sticky:是否接受粘性事件
// priority:优先级
@Subscribe(threadMode = ThreadMode.POSTING, sticky = false, priority = 1)
public onEvent(EventMessage eventMessage){
// do something
}
// 使用:通过调用post或postSticky方法使用,在订阅的方法中执行要执行的代码
EventBus.getDefault().post(EventMessage);
其实使用是很简单的,不过3.x的版本中新加了Subscriber Index,使用起来比较麻烦,但是据说效率要高很多。这篇不算是教程,所以还是简单的来看基本的使用吧。
2. EventBus的register/unregister过程
如果不看源码的话,要我说这个注册过程是把对象保存起来的过程,而取消注册则是把对象释放的过程。那么,源码中如何实现的呢?下面进入源码阶段:
- EventBus.getDefault():
// 单例模式
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
通过单例模式获得EventBus对象,对象的创建过程:
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
// 保存事件参数类和Subscription List的Map
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 保存注册对象和事件参数类
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 粘性事件
private final Map<Class<?>, Object> stickyEvents;
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
// 异常的设置
boolean logSubscriberExceptions = true;
boolean logNoSubscriberMessages = true;
boolean sendSubscriberExceptionEvent = true;
boolean sendNoSubscriberEvent = true;
boolean throwSubscriberException;
boolean eventInheritance = true;
// 是否忽略生成的Index,默认为false,不忽略
boolean ignoreGeneratedIndex;
boolean strictMethodVerification;
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
List<Class<?>> skipMethodVerificationForClasses;
List<SubscriberInfoIndex> subscriberInfoIndexes;
EventBusBuilder() {
}
}
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
// 主线程发送器
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
// 后台线程发送器
backgroundPoster = new BackgroundPoster(this);
// 异步线程发送器
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
// 异常设置
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
EventBus是通过Builder形成创建,可以通过构建EventBusBuilder来设置参数,这里只记录下默认的情况。这里面最重要的是三个Map对象:
- subscriptionsByEventType:用来保存订阅的方法参数类型和Subscription List
- typesBySubscriber:用来保存订阅者对象和被订阅方法的参数类型
- stickyEvents:粘性事件
当然,在创建EventBus时创建了三个不同类型的线程类型发送器,可以匹配注解中的threadMode字段。对象创建完成后需要调用register方法去将Activity/Fragment注册:
Eventbus.java:
public void register(Object subscriber) {
// 获得注册对象的class
Class<?> subscriberClass = subscriber.getClass();
// 查找订阅者被订阅的方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
// 订阅
subscribe(subscriber, subscriberMethod);
}
}
}
register
方法,先要去查找订阅者的方法,subscriberMethodFinder
在创建对象的时候就被创建。接着调用subscriberMethodFinder.findSubscriberMethods(subscriberClass)
方法,通过传入注册对象的class来查找其需要订阅的方法:
SubscriberMethodFinder.java:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 从方法缓存中查找
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
// 找到直接返回
if (subscriberMethods != null) {
return subscriberMethods;
}
// 是否忽略生成的Index(高级用法会通过编译时注解生成,这里不会)
// ignoreGeneratedIndex默认为false,进入下面的代码
if (ignoreGeneratedIndex) {
// 通过反射获取方法
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 通过subscriberInfo获取方法
subscriberMethods = findUsingInfo(subscriberClass);
}
// 如果没有找到方法,抛出异常(常见的这个异常,没有@Subscribe方法)
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 找到后放入缓存中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
首先从缓存中检查是否拥有当前订阅类,如果没有则需要根据ignoreGeneratedIndex
来判断是否直接通过反射获取。这里ignoreGeneratedIndex
默认值为false
,那么需要调用findUsingInfo
:
SubscriberMethodFinder.java:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// 从池中或创建或使用
FindState findState = prepareFindState();
// 初始化
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 获得subscriberInfo,
findState.subscriberInfo = getSubscriberInfo(findState);
// 如果没有找到(不使用Subscribe Index的话也不会找到)
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 通过反射来查找
findUsingReflectionInSingleClass(findState);
}
// 获得其父类
findState.moveToSuperclass();
}
// 获得方法并释放
return getMethodsAndRelease(findState);
}
这里通过池取出或创建FindState
对象,并且调用getSubscriberInfo
查找是否有添加的SubscriberInfoIndex
,这个是需要添加编译器支持,并且在编译器自动生成,需要手动添加的。这里默认的情况下是不存在的,所以就会进入findUsingReflectionInSingleClass
又通过反射来调用(应该就是这里影响了效率,需要遍历每个方法并且判断。使用SubscriberInfoIndex 的话会标记类和订阅的方法,所以速度会比这种方法快很多),接着调用moveToSuperclass
获得其父类(结束条件clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")
),最后获得这些方法并将findState
回收:
SubscriberMethodFinder.java:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
// 获得类的方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
// 如果方法的修饰符是public并且不是 Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 获得参数类型数组
Class<?>[] parameterTypes = method.getParameterTypes();
// 如果参数是一个
if (parameterTypes.length == 1) {
// 判断该方法是否被Subscribe注解修饰
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 如果还有Subscribe注解,则将参数类型
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 检验下是否之前已经添加了,具体下面分析
// 这里返回false,说明其父类也有这个方法,这里只需要子类的threadMode
if (findState.checkAdd(method, eventType)) {
// 添加后获得threadMode,findState.subscriberMethods中添加SubscriberMethod对象
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 抛出参数过多的异常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 抛出方法修饰符不正确异常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
通过反射来获取主要分为:
- 获得类的方法
- 遍历类,并且检查:
2.1 检查方法修饰符是不是public
2.2 检查参数数量是不是等于1
2.3 检查方法是否还有Subscribe注解
三个条件都满足的话则需要检查方法和参数类型是否已经存在,是否此方法需要添加threadMode(我的理解是主要针对于子类覆盖了父类的方法) - 根据检查结果来判断是否添加threadMode
关于checkAdd
方法,这里面挺有意思的,看下吧:
SubscriberMethodFinder.java:
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
// 分两步检查:
// 1. 检查参数类型是不是已经存在
// 2. 通过方法签名(可以说是名称拼接)来判断
Object existing = anyMethodByEventType.put(eventType, method);
// 不存在接收该参数类型的方法
if (existing == null) {
return true;
} else {
// 存在接收该参数类型的方法,这里应该比较少见。应该就是不同的方法但是都有相同的参数
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}
SubscriberMethodFinder.java:
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
// 名称拼接 methodName>eventTypeName这种形式
// 如果有相同方法名称的话,那么需要将原methodClassOld保存
// 这里可以理解为方法的重写,子类重写了可以手动调用(super.method()),而此时只保留子类中的方法
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}
从上面所讲的可以知道,这个遍历是从子类到其父类的一个遍历过程。这里假设当前Activity中有两个相同参数的方法a(Object)
和b(Object)
,并且其中一个方法是覆盖了父类的a(Object)
方法,而且父类的这个a(Object)
方法也有注解@Subscribe
。
当第一次添加了某个类型的参数,其添加值为b(Object)
,此时existing
还为null
。
接着会添加a(Object)
方法,此时existing
存在,并且为第一次添加的方法b(Object)
,所以此时进入if (!checkAddWithMethodSignature((Method) existing, eventType))
代码块,这里面第一次并没有并且将b(Object)
添加到subscriberClassByMethodKey
中,此时subscriberClassByMethodKey
保存了b(Object)
。并且anyMethodByEventType.put(eventType, this)
将findState对象作为该参数类型的值。通过这里来判断有多个需要订阅方法具有相同的参数类型。
接着又会调用checkAddWithMethodSignature
方法,将a(Object)
添加到subscriberClassByMethodKey
中。
当添加其父类的a(Object)
时,此时methodClassOld
存在并且是当前methodClass
的子类methodClassOld.isAssignableFrom(methodClass)
返回false
,将原methodClassOld
和methodKey
重新保存到subscriberClassByMethodKey
中,并返回false,告诉上级,这里是其父类的方法,不需要根据这里的threadMode来添加。
这里判断确实挺复杂,但是可以从这里看出写一个框架的严谨性,所以情况都需要考虑到。
最后是将方法返回以及findState
的回收:
SubscriberMethodFinder.java:
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
// 创建List并返回
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
// 池回收并将给池中的为空的位置赋值,方便下次使用
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
现在需要订阅的方法已经找到,接下来我们需要将方法订阅:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 参数类型
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// subscriptionsByEventType检查是否存在
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 已经存在的话,抛出异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//优先级排序
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 把对象和subscribedEvents保存起来
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 添加新的参数类型
subscribedEvents.add(eventType);
// 如果是粘性
if (subscriberMethod.sticky) {
// 构造是默认为true
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 参数类型是存储的父类或者相同
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// 如果有值的话发送事件
// 意味着如果粘性事件的参数类型存在,并且对应的值也存在的话将发送事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
- 订阅的过程是一个对方法的遍历过程,每个方法需要单独订阅。如果订阅过程中该方法已经订阅,则抛出异常。
- 根据优先级对订阅的方法进行添加。
- 把订阅者和订阅方法的参数类型保存起来,需要时则可以根据类型来调用订阅的方法。
- 检查订阅方法的是否为粘性方法,如果是的话,则会调用此方法。也就是说粘性方法在订阅的时候会如果有粘性事件,那么会调用一次该订阅的方法。
整个register的过程结束。其中最重要的还是将订阅者和其订阅方法的参数类型保存、订阅方法参数类型和订阅的方法保存以及对粘性方法的调用。
看完了register的过程,我想对应unregister的过程也应该有了猜想。嗯~没错,就是将保存的数据移除即可。下面简单看下:
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 将订阅者订阅的方法参数类型列表移除,防止内存泄漏
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 获得保存此参数类型的Subscription
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
// 如果Subscription的订阅者就是取消订阅这个对象,那么将其移除
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
这些操作就是对订阅者所订阅的方法的移除,防止内存泄漏。
3. post做了什么
订阅完成后如果想要发送事件的话,一般使用
EventBus.getDefault().post(Object);
即可,这里看下post()
的源码:
public void post(Object event) {
// 使用ThreadLocal来确保线程中局部变量PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
// 获取事件队列
List<Object> eventQueue = postingState.eventQueue;
// 将最新的事件添加到队尾
eventQueue.add(event);
// 如果当前没有进行中的事件
if (!postingState.isPosting) {
// 设置当前线程是否为主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
// 设置正在发送状态
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 队列循环,将所有事件全部执行完成
while (!eventQueue.isEmpty()) {
// 发送事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
// 使用ThreadLocal来确保线程中局部变量PostingThreadState
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
在调用post
方法时,会通过使用ThreadLocal
来确保线程中存在一个PostingThreadState
局部变量,关于ThreadLocal
Handler中也使用了ThreadLocal
,有兴趣可以看下。接着将此次事件加入到PostingThreadState
的eventQueue
中,并且队列循环,依次调用postSingleEvent
发送处于队首的事件。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 获得事件的类型
Class<?> eventClass = event.getClass();
// 没有接受者被找到
boolean subscriptionFound = false;
if (eventInheritance) {
// 查找所有事件类型,这里将接口也全部放入
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
// 对查找后的结果遍历
Class<?> clazz = eventTypes.get(h);
// 为每个class发送事件
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 没有接受者
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
在postSingleEvent
中,根据参数类型在缓存中查找此参数类型的所有类包括接口(没有的话会放入缓存,并将类返回),遍历这些类,调用postSingleEventForEventType
方法:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 根据事件类型获得Subscription列表
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 遍历
for (Subscription subscription : subscriptions) {
// 事件赋值以及订阅赋值(postingState.subscription = subscription)
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 发送事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
根据传入的参数类型从subscriptionsByEventType
中获取订阅信息CopyOnWriteArrayList<Subscription> subscriptions
,如果存在的话,对postingState
的event
和subscription
赋值,并且调用postToSubscription
方法,最后将event
和subscription
置为null
:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 根据订阅方法的threadMode来判断如何执行
switch (subscription.subscriberMethod.threadMode) {
// 默认,在哪个线程发送在哪个线程执行
case POSTING:
invokeSubscriber(subscription, event);
break;
// 最终执行在主线程中
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
// 后台线程
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
// 异步线程
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
根据订阅信息的线程模型来判断在哪个线程执行此订阅方法,下面看一个BackgroundPoster
后台线程发送器吧:
BackgroundPoster.java:
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 入队操作
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 这里用到了线程池去执行
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
// 出队,执行队列中所有的事件
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
// 再次检测,如果还没有则执行结束
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 存在PendingPost对象,调用EventBus去执行订阅者订阅的方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
// executorService的默认值是下面的线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
BackgroundPoster这里面维护了一个队列,如果此发送器正在执行,则将此次事件的PendingPost
(将要发送的事件)对象加入队列的尾部,在run
方法中通过PendingPostQueue.poll()
方法获得第一个PendingPost
对象并且调用eventBus.invokeSubscriber(pendingPost)
去执行订阅的方法。
EventBus.java:
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
// 如果没有取消注册,这个变量是volatile修饰,每次都会取最新值
// 调用invokeSubscriber方法执行订阅的方法
invokeSubscriber(subscription, event);
}
}
EventBus.java:
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 执行订阅者订阅的方法,参数是event
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
到这里后台线程执行任务到这里结束,其他的最终也是调用invokeSubscriber
去执行订阅的方法,殊途同归,不多赘述。
4. 总结
EventBus的源码挺简单的,虽然简单,但是涉及的知识也挺多的,处理了同步问题,而且各种检测也是很细致。自己也根据原理写过一个框架,虽然代码少,但是咱们问题多啊!这里还是再说下原理:将订阅者和订阅的方法以及方法参数保存,根据发送的事件类型即可获得所有订阅此参数类型的订阅者和方法,最后可以通过Method.invok执行方法。
附美女一张~