【Android 异步操作】Handler 机制 ( MessageQueue 消息队列的阻塞机制 | Java 层机制 | native 层阻塞机制 | native 层解除阻塞机制 )(一)

简介: 【Android 异步操作】Handler 机制 ( MessageQueue 消息队列的阻塞机制 | Java 层机制 | native 层阻塞机制 | native 层解除阻塞机制 )(一)

一、MessageQueue 的 Java 层机制


之前在 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 ) 中 , 模仿 Android 的 MessageQueue 手写的 MessageQueue , 使用了如下同步机制 ,


从 消息队列 MessageQueue 中取出 消息 Message ,


如果当前链表为空 , 此时会 调用 wait 方法阻塞 , 直到消息入队时 , 链表中有了元素 , 会调用 notify 解除该阻塞 ;



在实际的 Android 中的 消息队列 MessageQueue 的同步机制 是在 native 层实现 的 ;


在创建 消息队列 MessageQueue 时 , 调用了 nativeInit() 方法 , 销毁 MessageQueue 时调用 nativeDestroy 方法 ;


如果调用 next 获取下一个消息时 , 如果当前消息队列 MessageQueue 中没有消息 , 此时需要阻塞 , 调用 nativePollOnce 即可实现在 native 阻塞线程 ;


 

// 初始化 MessageQueue 时调用的方法 
    private native static long nativeInit();
    // 销毁 MessageQueue 时调用的方法 
    private native static void nativeDestroy(long ptr);
    // 线程阻塞方法
    @UnsupportedAppUsage
    private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
    // 线程唤醒方法 
    private native static void nativeWake(long ptr);
    private native static boolean nativeIsPolling(long ptr);
    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
  // 此处初始化 MessageQueue , 调用了 nativeInit 方法 
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }
  // 获取消息队列中的下一个消息 
    @UnsupportedAppUsage
    Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
    // 此处阻塞线程 
            nativePollOnce(ptr, nextPollTimeoutMillis);
        }
    }






二、MessageQueue 的 native 层阻塞机制


线程阻塞方法 private native void nativePollOnce(long ptr, int timeoutMillis) , 是 native 方法 , 该方法在 frameworks/base/core/jni/android_os_MessageQueue.cpp 中实现 ,


从 Java 层传入 long 类型 , 然后转为 NativeMessageQueue* 类型指针 ,


该 Java 层传入的 long 类型是初始化消息队列时 , 由 nativeInit 方法返回 , 是 消息队列在 Native 层的指针 ,


之后 NativeMessageQueue 指针调用了其本身的 pollOnce 函数 , 该函数中 , 主要调用了 Looper 的 pollOnce 函数 , mLooper->pollOnce(timeoutMillis) ;


frameworks/base/core/jni/android_os_MessageQueue.cpp 中阻塞相关源码 :


void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    // 这是主要操作 
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;
    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}


参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp



继续查看 Native 层 Looper.cpp 的 pollOnce 方法 ,


Looper.cpp 源码路径是 system/core/libutils/Looper.cpp ,


在该方法中 , 最终调用 Looper.cpp 的 pollInner 方法 ,


在 pollInner 方法中 , 调用了 epoll_wait 方法 , 该方法就是等待方法 , 在该方法中会监听 mEpollFd 文件句柄 ,


   

#include <sys/epoll.h>
       int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
       int epoll_pwait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout,
                      const sigset_t *sigmask);


参考 : epoll_wait



system/core/libutils/Looper.cpp 中阻塞相关源码 :


int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        result = pollInner(timeoutMillis);
    }
}
int Looper::pollInner(int timeoutMillis) {
  // ... 
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
}



参考 : system/core/libutils/Looper.cpp






三、MessageQueue 的 native 层解除阻塞机制


在 MessageQueue 消息队列的 Java 层 , 将 Message 消息插入到链表表头后 , 调用了 nativeWake 方法 , 唤醒了线程 , 即解除了阻塞 ;


public final class MessageQueue {
    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
        synchronized (this) {
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
}



在 native 层的 frameworks/base/core/jni/android_os_MessageQueue.cpp 实现了上述


Java 层定义的 private native static void nativeWake(long ptr) 方法 ,


注册 JNI 方法方式是动态注册 , 注册的参数如下 , Java 层的 nativeWake 对应的 native 层方法是 android_os_MessageQueue_nativeWake 方法 ,


static const JNINativeMethod gMessageQueueMethods[] = {
    /* name, signature, funcPtr */
    { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
    { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
    { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
    { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
    { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
    { "nativeSetFileDescriptorEvents", "(JII)V",
            (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};


参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp



下面是 frameworks/base/core/jni/android_os_MessageQueue.cpp 中的相关方法实现 ,


在 android_os_MessageQueue_nativeWake 方法中调用了 本身的 wake 方法 ,


在 wake 方法中调用了 system/core/libutils/Looper.cpp 中的 wake 方法 ;


void NativeMessageQueue::wake() {
  // 此处调用了 Looper 的 wake 函数 
    mLooper->wake();
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}


参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp



查看 system/core/libutils/Looper.cpp 中的 wake 方法 , 在该方法中 ,


ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))) 代码说明 ,


向 mWakeEventFd 文件句柄写入了数据 ;


void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif
    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno));
        }
    }
}


参考 : system/core/libutils/Looper.cpp



阻塞的时候使用的是 mEpollFd 文件句柄 ,


唤醒的时候使用的是 mWakeEventFd 文件句柄 ,


下面分析这两个文件句柄之间的联系 ;



Looper 构造函数 , 调用了 rebuildEpollLocked() 方法 ,


在 rebuildEpollLocked 方法 中调用 mEpollFd = epoll_create(EPOLL_SIZE_HINT) , 创建了一个句柄 ,


调用 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem) 注册事件监听 ,


注册 mEpollFd 句柄 , 监听 mWakeEventFd 句柄的 eventItem 事件 ,


监听的事件是 eventItem.events = EPOLLIN 事件 ,


该事件代表 , 向 mWakeEventFd 文件句柄写入数据 , 此时就对应解除 epoll_wait 阻塞 ;



system/core/libutils/Looper.cpp 中 Looper 构造函数 , rebuildEpollLocked 方法 :


Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));
    AutoMutex _l(mLock);
    rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.
    if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
        close(mEpollFd);
    }
    // Allocate the new epoll instance and register the wake pipe.
    // 创建句柄 
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    // 注册监听的事件
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    // 注册事件监听 
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));
    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
                  request.fd, strerror(errno));
        }
    }
}



MessageQueue 消息队列是通过 Linux 的 epoll 机制实现的阻塞 ;


目录
相关文章
|
8月前
|
Java API 调度
从阻塞到畅通:Java虚拟线程开启并发新纪元
从阻塞到畅通:Java虚拟线程开启并发新纪元
427 83
|
Java Android开发 C++
🚀Android NDK开发实战!Java与C++混合编程,打造极致性能体验!📊
在Android应用开发中,追求卓越性能是不变的主题。本文介绍如何利用Android NDK(Native Development Kit)结合Java与C++进行混合编程,提升应用性能。从环境搭建到JNI接口设计,再到实战示例,全面展示NDK的优势与应用技巧,助你打造高性能应用。通过具体案例,如计算斐波那契数列,详细讲解Java与C++的协作流程,帮助开发者掌握NDK开发精髓,实现高效计算与硬件交互。
620 1
|
Java 调度 Android开发
Android经典实战之Kotlin的delay函数和Java中的Thread.sleep有什么不同?
本文介绍了 Kotlin 中的 `delay` 函数与 Java 中 `Thread.sleep` 方法的区别。两者均可暂停代码执行,但 `delay` 适用于协程,非阻塞且高效;`Thread.sleep` 则阻塞当前线程。理解这些差异有助于提高程序效率与可读性。
405 1
|
安全 Java 测试技术
Java 中的阻塞方法
【8月更文挑战第22天】
305 4
|
Android开发
Cannot create android app from an archive...containing both DEX and Java-bytecode content
Cannot create android app from an archive...containing both DEX and Java-bytecode content
225 2
|
Java Android开发 C++
🚀Android NDK开发实战!Java与C++混合编程,打造极致性能体验!📊
【7月更文挑战第28天】在 Android 开发中, NDK 让 Java 与 C++ 混合编程成为可能, 从而提升应用性能。**为何选 NDK?** C++ 在执行效率与内存管理上优于 Java, 特别适合高性能需求场景。**环境搭建** 需 Android Studio 和 NDK, 工具如 CMake。**JNI** 构建 Java-C++ 交互, 通过声明 `native` 方法并在 C++ 中实现。**实战** 示例: 使用 C++ 计算斐波那契数列以提高效率。**总结** 混合编程增强性能, 但增加复杂性, 使用前需谨慎评估。
538 4
|
SQL Java Unix
Android经典面试题之Java中获取时间戳的方式有哪些?有什么区别?
在Java中获取时间戳有多种方式,包括`System.currentTimeMillis()`(毫秒级,适用于日志和计时)、`System.nanoTime()`(纳秒级,高精度计时)、`Instant.now().toEpochMilli()`(毫秒级,ISO-8601标准)和`Instant.now().getEpochSecond()`(秒级)。`Timestamp.valueOf(LocalDateTime.now()).getTime()`适用于数据库操作。选择方法取决于精度、用途和时间起点的需求。
407 3
|
Java Android开发 Kotlin
Android面试题:App性能优化之Java和Kotlin常见的数据结构
Java数据结构摘要:ArrayList基于数组,适合查找和修改;LinkedList适合插入删除;HashMap1.8后用数组+链表/红黑树,初始化时预估容量可避免扩容。SparseArray优化查找,ArrayMap减少冲突。 Kotlin优化摘要:Kotlin的List用`listOf/mutableListOf`,Map用`mapOf/mutableMapOf`,支持操作符重载和扩展函数。序列提供懒加载,解构用于遍历Map,扩展函数默认参数增强灵活性。
263 0
|
5月前
|
JSON 网络协议 安全
【Java】(10)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
274 1
|
5月前
|
JSON 网络协议 安全
【Java基础】(1)进程与线程的关系、Tread类;讲解基本线程安全、网络编程内容;JSON序列化与反序列化
几乎所有的操作系统都支持进程的概念,进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位一般而言,进程包含如下三个特征。独立性动态性并发性。
294 1