Android——EventBus源码分析

简介: 停停走走又几天,断断续续这一年。时间还是如此之快,转眼间又到了17年的末尾了。趁着这些天还能静下心来看看代码,赶紧记录下自己的心得。这一周,工作任务不多,总是想要看点什么但又不清楚该看啥,有些迷茫~不过既然自己还想要在Android路上继续前行,那么请深入探索吧!OK,闲话不多说。

停停走走又几天,断断续续这一年。时间还是如此之快,转眼间又到了17年的末尾了。趁着这些天还能静下心来看看代码,赶紧记录下自己的心得。
这一周,工作任务不多,总是想要看点什么但又不清楚该看啥,有些迷茫~不过既然自己还想要在Android路上继续前行,那么请深入探索吧!
OK,闲话不多说。今天打算记录下关于EventBus的原理以及如何实现。

1. 关于EventBus的使用

还依稀记得第一份工作中,那是还是个零基础的菜鸟。修修补补那个项目,项目中就用到了EventBus,当时并没有能力深入了解。当时版本的EventBus还没有使用注解的形式,还需要自定义(记得是重写onEvent方法还是啥)。
不过,现在EventBus的版本已经到了3.+了,使用方法也和原来大不相同了。总体而言,代码越来越简洁,耦合度也越来越低了。说下简单用法吧:

// 把当前的Activity或Fragment注册到EventBus
// 相应的,需要在页面销毁是调用unRegister方法
EventBus.getDefault().register(this);
// 通过注解订阅方法
// threadMode :方法执行的线程
// sticky:是否接受粘性事件
// priority:优先级
@Subscribe(threadMode = ThreadMode.POSTING, sticky = false, priority = 1)
public onEvent(EventMessage eventMessage){
    // do something
}

// 使用:通过调用post或postSticky方法使用,在订阅的方法中执行要执行的代码
EventBus.getDefault().post(EventMessage);

其实使用是很简单的,不过3.x的版本中新加了Subscriber Index,使用起来比较麻烦,但是据说效率要高很多。这篇不算是教程,所以还是简单的来看基本的使用吧。

2. EventBus的register/unregister过程

如果不看源码的话,要我说这个注册过程是把对象保存起来的过程,而取消注册则是把对象释放的过程。那么,源码中如何实现的呢?下面进入源码阶段:

  1. EventBus.getDefault():
// 单例模式
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}

通过单例模式获得EventBus对象,对象的创建过程:

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
// 保存事件参数类和Subscription List的Map
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 保存注册对象和事件参数类
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 粘性事件
private final Map<Class<?>, Object> stickyEvents;

public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
    // 异常的设置
    boolean logSubscriberExceptions = true;
    boolean logNoSubscriberMessages = true;
    boolean sendSubscriberExceptionEvent = true;
    boolean sendNoSubscriberEvent = true;
    boolean throwSubscriberException;
    boolean eventInheritance = true;
    // 是否忽略生成的Index,默认为false,不忽略
    boolean ignoreGeneratedIndex;
    boolean strictMethodVerification;
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
    List<Class<?>> skipMethodVerificationForClasses;
    List<SubscriberInfoIndex> subscriberInfoIndexes;

    EventBusBuilder() {
    }
}

public EventBus() {
    this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();
    // 主线程发送器
    mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
    // 后台线程发送器
    backgroundPoster = new BackgroundPoster(this);
    // 异步线程发送器
    asyncPoster = new AsyncPoster(this);
    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);
    // 异常设置
    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}

EventBus是通过Builder形成创建,可以通过构建EventBusBuilder来设置参数,这里只记录下默认的情况。这里面最重要的是三个Map对象:

  • subscriptionsByEventType:用来保存订阅的方法参数类型和Subscription List
  • typesBySubscriber:用来保存订阅者对象和被订阅方法的参数类型
  • stickyEvents:粘性事件

当然,在创建EventBus时创建了三个不同类型的线程类型发送器,可以匹配注解中的threadMode字段。对象创建完成后需要调用register方法去将Activity/Fragment注册:

Eventbus.java:
public void register(Object subscriber) {
    // 获得注册对象的class
    Class<?> subscriberClass = subscriber.getClass();
    // 查找订阅者被订阅的方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            // 订阅
            subscribe(subscriber, subscriberMethod);
        }
    }
}

register方法,先要去查找订阅者的方法,subscriberMethodFinder在创建对象的时候就被创建。接着调用subscriberMethodFinder.findSubscriberMethods(subscriberClass)方法,通过传入注册对象的class来查找其需要订阅的方法:

SubscriberMethodFinder.java:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    // 从方法缓存中查找
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    // 找到直接返回
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    // 是否忽略生成的Index(高级用法会通过编译时注解生成,这里不会)
    // ignoreGeneratedIndex默认为false,进入下面的代码
    if (ignoreGeneratedIndex) {
        // 通过反射获取方法
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        // 通过subscriberInfo获取方法
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    // 如果没有找到方法,抛出异常(常见的这个异常,没有@Subscribe方法)
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        // 找到后放入缓存中
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

首先从缓存中检查是否拥有当前订阅类,如果没有则需要根据ignoreGeneratedIndex来判断是否直接通过反射获取。这里ignoreGeneratedIndex默认值为false,那么需要调用findUsingInfo

SubscriberMethodFinder.java:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    // 从池中或创建或使用
    FindState findState = prepareFindState();
    // 初始化
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        // 获得subscriberInfo,
        findState.subscriberInfo = getSubscriberInfo(findState);
        // 如果没有找到(不使用Subscribe Index的话也不会找到)
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            // 通过反射来查找
            findUsingReflectionInSingleClass(findState);
        }
        // 获得其父类
        findState.moveToSuperclass();
    }
    // 获得方法并释放
    return getMethodsAndRelease(findState);
}

这里通过池取出或创建FindState对象,并且调用getSubscriberInfo查找是否有添加的SubscriberInfoIndex,这个是需要添加编译器支持,并且在编译器自动生成,需要手动添加的。这里默认的情况下是不存在的,所以就会进入findUsingReflectionInSingleClass又通过反射来调用(应该就是这里影响了效率,需要遍历每个方法并且判断。使用SubscriberInfoIndex 的话会标记类和订阅的方法,所以速度会比这种方法快很多),接着调用moveToSuperclass获得其父类(结束条件clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")),最后获得这些方法并将findState回收:

SubscriberMethodFinder.java:
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        // 获得类的方法
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        // 如果方法的修饰符是public并且不是 Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            // 获得参数类型数组
            Class<?>[] parameterTypes = method.getParameterTypes();
            // 如果参数是一个
            if (parameterTypes.length == 1) {
                // 判断该方法是否被Subscribe注解修饰
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                // 如果还有Subscribe注解,则将参数类型
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    // 检验下是否之前已经添加了,具体下面分析
                    // 这里返回false,说明其父类也有这个方法,这里只需要子类的threadMode
                    if (findState.checkAdd(method, eventType)) {
                        // 添加后获得threadMode,findState.subscriberMethods中添加SubscriberMethod对象
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                // 抛出参数过多的异常
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            // 抛出方法修饰符不正确异常
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

通过反射来获取主要分为:

  1. 获得类的方法
  2. 遍历类,并且检查:
    2.1 检查方法修饰符是不是public
    2.2 检查参数数量是不是等于1
    2.3 检查方法是否还有Subscribe注解
    三个条件都满足的话则需要检查方法和参数类型是否已经存在,是否此方法需要添加threadMode(我的理解是主要针对于子类覆盖了父类的方法)
  3. 根据检查结果来判断是否添加threadMode
    关于checkAdd方法,这里面挺有意思的,看下吧:
SubscriberMethodFinder.java:
boolean checkAdd(Method method, Class<?> eventType) {
    // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
    // Usually a subscriber doesn't have methods listening to the same event type.
    // 分两步检查:
    // 1. 检查参数类型是不是已经存在
    // 2. 通过方法签名(可以说是名称拼接)来判断
    Object existing = anyMethodByEventType.put(eventType, method);
    // 不存在接收该参数类型的方法
    if (existing == null) {
        return true;
    } else {
        // 存在接收该参数类型的方法,这里应该比较少见。应该就是不同的方法但是都有相同的参数
        if (existing instanceof Method) {
            if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                // Paranoia check
                throw new IllegalStateException();
            }
            // Put any non-Method object to "consume" the existing Method
            anyMethodByEventType.put(eventType, this);
        }
        return checkAddWithMethodSignature(method, eventType);
    }
}

SubscriberMethodFinder.java:
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append('>').append(eventType.getName());
    // 名称拼接 methodName>eventTypeName这种形式
    // 如果有相同方法名称的话,那么需要将原methodClassOld保存
    // 这里可以理解为方法的重写,子类重写了可以手动调用(super.method()),而此时只保留子类中的方法
    String methodKey = methodKeyBuilder.toString();
    Class<?> methodClass = method.getDeclaringClass();
    Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
    if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
        // Only add if not already found in a sub class
        return true;
    } else {
        // Revert the put, old class is further down the class hierarchy
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}

从上面所讲的可以知道,这个遍历是从子类到其父类的一个遍历过程。这里假设当前Activity中有两个相同参数的方法a(Object)b(Object),并且其中一个方法是覆盖了父类的a(Object)方法,而且父类的这个a(Object)方法也有注解@Subscribe
当第一次添加了某个类型的参数,其添加值为b(Object),此时existing还为null
接着会添加a(Object)方法,此时existing存在,并且为第一次添加的方法b(Object),所以此时进入if (!checkAddWithMethodSignature((Method) existing, eventType))代码块,这里面第一次并没有并且将b(Object)添加到subscriberClassByMethodKey中,此时subscriberClassByMethodKey保存了b(Object)。并且anyMethodByEventType.put(eventType, this)将findState对象作为该参数类型的值。通过这里来判断有多个需要订阅方法具有相同的参数类型。
接着又会调用checkAddWithMethodSignature方法,将a(Object)添加到subscriberClassByMethodKey中。
当添加其父类的a(Object)时,此时methodClassOld存在并且是当前methodClass的子类methodClassOld.isAssignableFrom(methodClass)返回false,将原methodClassOldmethodKey重新保存到subscriberClassByMethodKey中,并返回false,告诉上级,这里是其父类的方法,不需要根据这里的threadMode来添加。
这里判断确实挺复杂,但是可以从这里看出写一个框架的严谨性,所以情况都需要考虑到。
最后是将方法返回以及findState的回收:

SubscriberMethodFinder.java:
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
    // 创建List并返回
    List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
    
    // 池回收并将给池中的为空的位置赋值,方便下次使用
    findState.recycle();
    synchronized (FIND_STATE_POOL) {
        for (int i = 0; i < POOL_SIZE; i++) {
            if (FIND_STATE_POOL[i] == null) {
                FIND_STATE_POOL[i] = findState;
                break;
            }
        }
    }
    return subscriberMethods;
}

现在需要订阅的方法已经找到,接下来我们需要将方法订阅:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    // 参数类型
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    // subscriptionsByEventType检查是否存在
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        // 已经存在的话,抛出异常
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }
    
    //优先级排序
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    // 把对象和subscribedEvents保存起来
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    // 添加新的参数类型
    subscribedEvents.add(eventType);
    // 如果是粘性
    if (subscriberMethod.sticky) {
        // 构造是默认为true
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                // 参数类型是存储的父类或者相同
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    // 如果有值的话发送事件
                    // 意味着如果粘性事件的参数类型存在,并且对应的值也存在的话将发送事件
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}
  1. 订阅的过程是一个对方法的遍历过程,每个方法需要单独订阅。如果订阅过程中该方法已经订阅,则抛出异常。
  2. 根据优先级对订阅的方法进行添加。
  3. 把订阅者和订阅方法的参数类型保存起来,需要时则可以根据类型来调用订阅的方法。
  4. 检查订阅方法的是否为粘性方法,如果是的话,则会调用此方法。也就是说粘性方法在订阅的时候会如果有粘性事件,那么会调用一次该订阅的方法。

整个register的过程结束。其中最重要的还是将订阅者和其订阅方法的参数类型保存、订阅方法参数类型和订阅的方法保存以及对粘性方法的调用
看完了register的过程,我想对应unregister的过程也应该有了猜想。嗯~没错,就是将保存的数据移除即可。下面简单看下:

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        // 将订阅者订阅的方法参数类型列表移除,防止内存泄漏
        typesBySubscriber.remove(subscriber);
    } else {
        Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    // 获得保存此参数类型的Subscription
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            // 如果Subscription的订阅者就是取消订阅这个对象,那么将其移除
            if (subscription.subscriber == subscriber) {
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

这些操作就是对订阅者所订阅的方法的移除,防止内存泄漏。

3. post做了什么

订阅完成后如果想要发送事件的话,一般使用

EventBus.getDefault().post(Object);

即可,这里看下post()的源码:

public void post(Object event) {
    // 使用ThreadLocal来确保线程中局部变量PostingThreadState
    PostingThreadState postingState = currentPostingThreadState.get();
    // 获取事件队列
    List<Object> eventQueue = postingState.eventQueue;
    // 将最新的事件添加到队尾
    eventQueue.add(event);
    
    // 如果当前没有进行中的事件
    if (!postingState.isPosting) {
        // 设置当前线程是否为主线程
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
        // 设置正在发送状态
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            // 队列循环,将所有事件全部执行完成
            while (!eventQueue.isEmpty()) {
                // 发送事件
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}
// 使用ThreadLocal来确保线程中局部变量PostingThreadState
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<Object>();
    boolean isPosting;
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}

在调用post方法时,会通过使用ThreadLocal来确保线程中存在一个PostingThreadState局部变量,关于ThreadLocalHandler中也使用了ThreadLocal,有兴趣可以看下。接着将此次事件加入到PostingThreadStateeventQueue中,并且队列循环,依次调用postSingleEvent发送处于队首的事件。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    // 获得事件的类型
    Class<?> eventClass = event.getClass();
    // 没有接受者被找到
    boolean subscriptionFound = false;
    if (eventInheritance) {
        // 查找所有事件类型,这里将接口也全部放入
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            // 对查找后的结果遍历
            Class<?> clazz = eventTypes.get(h);
            // 为每个class发送事件
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    // 没有接受者
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            Log.d(TAG, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

postSingleEvent中,根据参数类型在缓存中查找此参数类型的所有类包括接口(没有的话会放入缓存,并将类返回),遍历这些类,调用postSingleEventForEventType方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        // 根据事件类型获得Subscription列表
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        // 遍历
        for (Subscription subscription : subscriptions) {
            // 事件赋值以及订阅赋值(postingState.subscription = subscription)
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                // 发送事件
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

根据传入的参数类型从subscriptionsByEventType中获取订阅信息CopyOnWriteArrayList<Subscription> subscriptions,如果存在的话,对postingStateeventsubscription赋值,并且调用postToSubscription方法,最后将eventsubscription置为null

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    // 根据订阅方法的threadMode来判断如何执行
    switch (subscription.subscriberMethod.threadMode) {
        // 默认,在哪个线程发送在哪个线程执行
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        // 最终执行在主线程中
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        // 后台线程
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        // 异步线程
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

根据订阅信息的线程模型来判断在哪个线程执行此订阅方法,下面看一个BackgroundPoster后台线程发送器吧:

BackgroundPoster.java:
final class BackgroundPoster implements Runnable {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 入队操作
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                // 这里用到了线程池去执行
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                // 出队,执行队列中所有的事件
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            // 再次检测,如果还没有则执行结束
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    // 存在PendingPost对象,调用EventBus去执行订阅者订阅的方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

// executorService的默认值是下面的线程池
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

BackgroundPoster这里面维护了一个队列,如果此发送器正在执行,则将此次事件的PendingPost(将要发送的事件)对象加入队列的尾部,在run方法中通过PendingPostQueue.poll()方法获得第一个PendingPost对象并且调用eventBus.invokeSubscriber(pendingPost)去执行订阅的方法。

EventBus.java:
void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {
        // 如果没有取消注册,这个变量是volatile修饰,每次都会取最新值
        // 调用invokeSubscriber方法执行订阅的方法
        invokeSubscriber(subscription, event);
    }
}

EventBus.java:
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        // 执行订阅者订阅的方法,参数是event
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

到这里后台线程执行任务到这里结束,其他的最终也是调用invokeSubscriber去执行订阅的方法,殊途同归,不多赘述。

4. 总结

EventBus的源码挺简单的,虽然简单,但是涉及的知识也挺多的,处理了同步问题,而且各种检测也是很细致。自己也根据原理写过一个框架,虽然代码少,但是咱们问题多啊!这里还是再说下原理:将订阅者和订阅的方法以及方法参数保存,根据发送的事件类型即可获得所有订阅此参数类型的订阅者和方法,最后可以通过Method.invok执行方法
附美女一张~

img_a89c052ca6328558c8ae37d10cebdd60.jpe

目录
相关文章
|
存储 监控 Android开发
Android卡顿优化 | ANR分析与实战(附ANR-WatchDog源码分析及实战、与AndroidPerformanceMonitor的区别)
Android卡顿优化 | ANR分析与实战(附ANR-WatchDog源码分析及实战、与AndroidPerformanceMonitor的区别)
|
9月前
|
Android开发
Android PackageManagerService源码分析和APK安装原理详解
Android PackageManagerService源码分析和APK安装原理详解
215 1
|
XML Java Android开发
Android8.1 MTK平台 SystemUI源码分析之 网络信号栏显示刷新(下)
Android8.1 MTK平台 SystemUI源码分析之 网络信号栏显示刷新(下)
170 0
|
XML 数据库 网络虚拟化
Android8.1 MTK平台 SystemUI源码分析之 网络信号栏显示刷新(上)
Android8.1 MTK平台 SystemUI源码分析之 网络信号栏显示刷新
221 0
|
XML Java Linux
Android8.1 MTK平台 SystemUI源码分析之 电池时钟刷新
Android8.1 MTK平台 SystemUI源码分析之 电池时钟刷新
273 0
|
XML 存储 搜索推荐
|
Java 调度 Android开发
android体系课-系统启动流程-之zygote进程启动过程源码分析
笔者刚开始学习Android的时候也和大部分同学一样,只会使用一些应用层面的知识,对于一些比较常见的开源框架如<mark>RxJava</mark>,<mark>OkHttp</mark>,<mark>Retrofit</mark>,以及后来谷歌推出的<mark>协程</mark>等,都只在使用层面,对于他们<mark>内部原理</mark>,基本没有去了解觉得够用就可以了,又比如Activity,Service等四大组件的使用原理,系统开机过程,Launcher启动过程等知之甚少,知其然而不知其所以然,结果就是出现某些问题,不知道从哪里找原因,只能依赖万能的百度,但是百度看多了,你会发现自己
|
Java 调度 Android开发
android体系课-系统启动流程-之SystemServer启动过程源码分析
笔者刚开始学习Android的时候也和大部分同学一样,只会使用一些应用层面的知识,对于一些比较常见的开源框架如<mark>RxJava</mark>,<mark>OkHttp</mark>,<mark>Retrofit</mark>,以及后来谷歌推出的<mark>协程</mark>等,都只在使用层面,对于他们<mark>内部原理</mark>,基本没有去了解觉得够用就可以了,又比如Activity,Service等四大组件的使用原理,系统开机过程,Launcher启动过程等知之甚少,知其然而不知其所以然,结果就是出现某些问题,不知道从哪里找原因,只能依赖万能的百度,但是百度看多了,你会发现自己
|
设计模式 XML 缓存
Android体系课学习 之 网络请求库Retrofit源码分析-看这一篇就够了
- 网络请求在我们开发中起的很大比重,有一个好的网络框架可以节省我们的开发工作量,也可以避免一些在开发中不该出现的bug - *Retrofit*是一个轻量级框架,基于*OkHttp*的一个*Restful*框架
|
存储 缓存 Java
Android体系课-开源框架-这是一份详细的Glide源码分析文章
最近在`组件化`开发中准备封装一个`图片加载库`,于是乎就有了这篇文章 本篇文章对`Glide`源码过程做了一个详细的讲解,也是为了记录下自己对`Glide`的理解,以后忘记还可以从这里查找。