10.EventBus3.0源码分析和手写

简介: EventBus3.0源码写的很有代表性,其中涉及到的设计模式以及数据结构都很值得拿来一读,看源码的目的不能仅仅局限于了解其原理,更大的作用在于能提高自己构建稳健架构的能力。

EventBus3.0源码写的很有代表性,其中涉及到的设计模式以及数据结构都很值得拿来一读,看源码的目的不能仅仅局限于了解其原理,更大的作用在于能提高自己构建稳健架构的能力。EventBus中涉及了很多东西值得我们去学一学,比如设计模式中的享元 模式单例模式,涉及到并发编程的volatile,线程池,ThreadLocal,以及基本上每个开源库都会用到的反射和注解等。今天我们就来看看源码中是如何构建的

implementation 'org.greenrobot:eventbus:3.0.0'

我们通过三个切入点来看源码,分别是register,unregister和post方法

register

EvnetBus通过register将订阅者注册,从而在需要的地方将信息传递给这些注册的订阅者

EventTransition.getInstance().register(this);

EventBus的对象是单利模式构建的,保证实例的唯一性,双层保护锁再加上volatile关键字,同时保证了对象defaultInstance的原子性 可见性和有序性,在多线程操作中也能保证只创建一个实例对象,有一点要注意,双层锁保证的是对象的原子性操作,在一个线程操作这个对象时,其他线程进入等待状态,volatile关键字保证对象的有序性和可见性,并不能保证原子性,关于volatile关键字的详细介绍,可以查看这里http://www.importnew.com/24082.html

static volatile EventBus defaultInstance;

 public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

进入register方法,subscriber就是当前注册的对象,可以理解为当前activity或者fragment或者其他注册的东西,这里我们以MainActivity为例,假设在MainActivity中注册了EventBus,通过获取到当前类的class,从这个class中获取到所有添加注解的方法保存在集合中。subscriberMethodFinder是一个封装好的对象,专门用于获取注解方法集合

public void register(Object subscriber) {
        //获取MainActivity的class
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

来到findSubscriberMethods方法中

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //这里做了一层缓存,以订阅者的class为key,以注解方法集合为value,第一次进来是取不到缓存的,往下走
        //做缓存的目的在于。如果同一个页面被打开了多次,可以减少反射执行的次数,优化效率
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }
        //ignoreGeneratedIndex默认值为false,可以看到是EventBusBuilder中
        //传入的,EventBusBuilder是EventBus使用的一个默认的构造器,如果我们自
        //定义这个builder的话,可以设置这个值,这里先不看
        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //存入缓存
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

findUsingInfo
这个方法的作用就是将subscriberClass中的所有添加注解的方法解析出来,保存在FindState 的subscriberMethods集合中,然后返回一个拷贝了一份的相同的集合

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        //FindState是一个封装类,用于存放subscriberClass和其相关信息,
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            //第一次会返回null,直接执行下边的findUsingReflectionInSingleClass方法
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        //返回存储着注解方法的集合
        return getMethodsAndRelease(findState);
    }

Tips:(获取FindState对象涉及到的享元设计模式)
可以看到,FindState对象的获取并非是单纯的new出来的,而是从一个固定大小的静态数组中去取,如果存在则取出,如果不存在再new出来,最终这个new出来的对象会被放入熟组中留用,这样可以减少频繁的new对象的操作,提高程序效率和减少内存占用,这就是享元设计模式

private static final int POOL_SIZE = 4;
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

......

    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        return new FindState();
    }
    //可以看到最终会在这里将使用完成的对象重新初始化并放入静态数组中,类似线程池的操作
    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        findState.recycle();
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        return subscriberMethods;
    }

findUsingReflectionInSingleClass(findState);
通过反射获取注解的方法保存在subscriberMethods集合中,这里是真正的开始获取注解的方法并且进行存储

private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            //获取到类中所有的方法
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            //这里是一个可能存在的bug,并且贴出了bug在stackoverflow中的地址
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            //判断方法修饰符,public private protested
            int modifiers = method.getModifiers();
            //必须是public类型并且不能是静态和抽象的,否则抛出异常
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                //获取方法参数的class,例如int add(String.class,String.class)
                Class<?>[] parameterTypes = method.getParameterTypes();
                //又一层限制,只能有一个参数,否则抛出异常
                if (parameterTypes.length == 1) {
                     //获取注解
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        //获取到方法第一个参数的class,(第一个参数也是唯一一个参数)
                        Class<?> eventType = parameterTypes[0];
                        //检查是否已经添加过
                        if (findState.checkAdd(method, eventType)) {
                            //将每一个方法封装成SubscriberMethod对象,对象包括的信息有
                            //方法的method(用于后期反射执行方法),方法的参数类型class,
                            //方法运行的线程mode,方法执行的优先级和是否有粘性
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

Tips:(java知识)checkAdd是如何验证map中是否已经添加过这个方法

      boolean checkAdd(Method method, Class<?> eventType) {
            //通常一个订阅者中不会存在两个相同类型的订阅者回调方法,这里是做了一层判断,往map中添加一个key value,如果map中
            //已经存在相同的key值,那么会反回这个key对应的value值,返回值不等于null说明有相同的,等于null则是还没有添加过这个key值
            //这里也说了,通常一个订阅者中不会存在两个相同方法参数的回调方法,但是如果存在了,那么就要做处理
            // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
            // Usually a subscriber doesn't have methods listening to the same event type.
            Object existing = anyMethodByEventType.put(eventType, method);
            if (existing == null) {
                return true;
            } else {
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }

也就是说执行完这一步之后

List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);

得到了一个集合,这个集合中存储着所有注解方法的信息,然后循环遍历去订阅这些方法,来到subscribe方法
,这里是将SubscriberMethod和subscriber封装为Subscription对象之后按照方法的优先级排序,保存在了两个集合中,一个集合subscriptionsByEventType根据方法参数类型进行存储,可以根据参数类型获取到所有参数类型相同的方法的集合,一个集合typesBySubscriber根据订阅者进行存储,可以根据订阅者获取到当前订阅者中的所有方法集合,这两个数据结构的设置都是为了unregister方法服务,在unregister的时候,先根据当前的订阅者subscriber获取到当前页中所有注解方法的参数class类型,然后从subscriptionsByEventType中获取到所有这个参数类型的方法(这些方法也包括其他页面的),根据比较这个方法所持有的订阅者对象来判断这个方法是不是当前需要注销的页面中的方法,如果是则进行移除,具体代码可以看后边的unregister方法

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        //获取方法的参数class
        Class<?> eventType = subscriberMethod.eventType;
        //将SubscriberMethod再进行一层封装,将订阅者对象和注解方法绑定在一起
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        //subscriptionsByEventType这个map集合以注解方法参数的class为key,以存储着Subscription对象的集合为value,这样定义数据结构
        //的目的是为了方便在unregister的时候,可以找到所有的注解方法的参数类型相同的方法集合
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }
        //将方法按照优先级顺序加入集合,这个优先级指的是相同参数类型的方法的优先级,不同参数类型的方法不存在优先级
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
        //以订阅者subscriber为key,以事件参数类型为value保存到typesBySubscriber集合
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
        //粘性处理
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

总结一下register方法做了哪些操作
1 获取到所有页面中加了注解的方法
2 把注解方法按照两种类型存入集合
a)以订阅者为key,以方法参数类型为value存入
b)以方法参数类型为key,以方法的封装对象为value存入

unregister

unregister方法取消订阅的原理在上边已经提到了,就是操作那两个集合,从一个集合中根据订阅者获取到当前页面所有的注解方法的参数class类型,然后从另一个集合中根据注解方法的参数class类型获取到所有这种类型的方法,和当前订阅者做对比,将当前页面的所有方法从集合中移除

/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
    private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

    /** Unregisters the given subscriber from all event classes. */
    public synchronized void unregister(Object subscriber) {
        //获取到当前页面注解方法参数类型的集合
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                //根据参数类型获取到当前页面所有注解方法并移除
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

post

    /** Posts the given event to the event bus. */
    public void post(Object event) {
        //这里获取了一个PostingThreadState对象,我们等下看下这个对象是什么,如何获取的(涉及到线程安全操作)
        PostingThreadState postingState = currentPostingThreadState.get();
        //获取postingState中的一个队列,这个队列是一个list集合,可见postingState中保存了一个集合作为队列使用
        List<Object> eventQueue = postingState.eventQueue;
        //将发送的事件存入队列
        eventQueue.add(event);
        //PostingThreadState中保存了一系列的消息发送状态,如果正在发送就不再进行发送
        if (!postingState.isPosting) {
            //判断是否是主线程
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            //设置状态为正在发送
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //开启一个循环,将集合队列中的全部事件发送出去
                while (!eventQueue.isEmpty()) {
                    //eventQueue.remove(0)操作做了两步,第一将集合中第一个对象移除,移除后会返回这个移除的对象作为postSingleEvent的参数
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                //发送完成初始化状态信息
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

我们来看看PostingThreadState这个对象是什么,如何获取的,如何保证线程安全的
PostingThreadState只是一个普通的内部类,里边保存了一个集合和一些状态信息,那么他是如何获取的?

/** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<Object>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

currentPostingThreadState.get()会得到这个PostingThreadState对象,这个currentPostingThreadState是什么?这里可以看出,currentPostingThreadState是一个将PostingThreadState对象包装了一层的对象,那么他有什么作用,为什么要这样做。ThreadLocal是Java为解决多线程程序的并发问题提供的一种新思路,当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本,所以看到这里我们可以大概猜到,用ThreadLocal是维护变量PostingThreadState是为了保证多线程的安全性,PostingThreadState是一个共享变量,所以多个线程如果同时操作这个变量,会导致一系列的问题,每一次单独的进行事件的分发,这个事件应该是有自己独立的队列和独立的发送取消状态,如果两个线程同时操作,会导致状态的紊乱,从而发生问题,那么用ThreadLocal进行维护之后,由于他为每个使用该变量的线程提供独立的变量副本的特性,可以保证每个事件队列的互不干扰

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };

进入postSingleEvent方法

rivate void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        //获取到发送事件的class
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //默认为true
        if (eventInheritance) {
            //获取到所有这个参数class类型的的class集合,包括父类和接口
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                //发送event
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //从subscriptionsByEventType集合中获取到所有的参数类型是eventClass的注解方法的封装对象subscriptions
            //这里是按照优先级排序过的集合,所以直接取出来已经是按照先后顺序执行了
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                //将信息封装到Subscription中
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    //发送之后重制postingState状态信息
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

真正的开始发送事件

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                //在当前线程执行,如果当前是主线程则执行在主线程,如果当前是自线程则执行在子线程
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                //在主线程执行,如果当前已经是主线程了,那么直接执行,如果当前是自线程,那么发送到主线程执行
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                //在子线程执行,如果当前是主线程,通过线程池执行在子线程
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                //异步执行,每次都开启一个新的线程执行
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

执行方法的原理很简单,直接通过反射调用

void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

关键在于线程的调度,重点来看这三行代码

mainThreadPoster.enqueue(subscription, event);
backgroundPoster.enqueue(subscription, event);
asyncPoster.enqueue(subscription, event);

mainThreadPoster是在EventBus构建的时候创建的HandlerPoster对象,构造方法中传入了一个主线程的Looper对象,可以猜想是通过Handler进行消息通讯的

HandlerPoster mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);

找到他的enqueue

void enqueue(Subscription subscription, Object event) {
        //封装了发送的event和对应的方法类Subscription的对象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //PendingPostQueue是一个链表形式的队列
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                //使用handler发送消息到主线程
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

final class PendingPostQueue {
    private PendingPost head;
    private PendingPost tail;

    synchronized void enqueue(PendingPost pendingPost) {
        if (pendingPost == null) {
            throw new NullPointerException("null cannot be enqueued");
        }
        if (tail != null) {
            tail.next = pendingPost;
            tail = pendingPost;
        } else if (head == null) {
            head = tail = pendingPost;
        } else {
            throw new IllegalStateException("Head present, but no tail");
        }
        notifyAll();
    }

    synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

    synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
        if (head == null) {
            wait(maxMillisToWait);
        }
        return poll();
    }

}

PendingPost对象的构建同样使用了享元设计模式,他是一个封装了发送的event和对应的方法类Subscription的对象

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

在handleMessage中处理消息,此时已经进入了主线程

@Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                //循环获取到队列中所有的PendingPost
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }  
                //反射执行所有的方法
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

所以从子线程发送消息到主线程,底层还是handler消息机制实现的,它继承自Runnable

backgroundPoster也是在EventBus构建的时候就创建的一个对象

BackgroundPoster backgroundPoster = new BackgroundPoster(this);

来到enque方法

public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //同样是把PengdingPost对象加入到PendingPostQueue队列中,不同的是,执行的时候是通过线程池在子线程执行的
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

执行run方法,从队列中取出然后反射执行

@Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

asyncPoster的创建时机和上边两者相同

AsyncPoster asyncPoster = new AsyncPoster(this);

来到enque方法,原理和上边相同

public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

这种构建线程池和handler的写法值得借鉴,手写了一个简易版的EventBus,GitHub地址https://github.com/renzhenming/MyEventBus.git,有助于加深理解

相关文章
|
7月前
|
存储 NoSQL 搜索推荐
若依框架----源码分析(@RateLimiter)
若依框架----源码分析(@RateLimiter)
506 0
|
XML Java Maven
源码分析系列教程(04) - 手写SpringIOC
源码分析系列教程(04) - 手写SpringIOC
41 0
|
4月前
|
Java 开发者 Spring
【SpringBoot 异步魔法】@Async 注解:揭秘 SpringBoot 中异步方法的终极奥秘!
【8月更文挑战第25天】异步编程对于提升软件应用的性能至关重要,尤其是在高并发环境下。Spring Boot 通过 `@Async` 注解简化了异步方法的实现。本文详细介绍了 `@Async` 的基本用法及配置步骤,并提供了示例代码展示如何在 Spring Boot 项目中创建与管理异步任务,包括自定义线程池、使用 `CompletableFuture` 处理结果及异常情况,帮助开发者更好地理解和运用这一关键特性。
288 1
|
7月前
|
开发框架 Java API
java反射机制的原理与简单使用
java反射机制的原理与简单使用
|
7月前
|
Java C++
Java反射的简单使用
Java反射的简单使用
49 3
|
Java 测试技术 API
源码分析系列教程(02) - 手写Spring事务框架
源码分析系列教程(02) - 手写Spring事务框架
74 0
|
7月前
|
Python
Validator 类的设计和简单实现
Validator 类的设计和简单实现
63 0
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
RxJava2源码分析(二):操作符原理分析
|
设计模式 存储 JavaScript
前端面试100道手写题(3)—— EventBus
EventBus作为发布订阅设计模式的经典应用场景,很值得我们去学习研究它的实现原理。
164 0