一、MessageQueue 的 Java 层机制
之前在 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 ) 中 , 模仿 Android 的 MessageQueue 手写的 MessageQueue , 使用了如下同步机制 ,
从 消息队列 MessageQueue 中取出 消息 Message ,
如果当前链表为空 , 此时会 调用 wait 方法阻塞 , 直到消息入队时 , 链表中有了元素 , 会调用 notify 解除该阻塞 ;
在实际的 Android 中的 消息队列 MessageQueue 的同步机制 是在 native 层实现 的 ;
在创建 消息队列 MessageQueue 时 , 调用了 nativeInit() 方法 , 销毁 MessageQueue 时调用 nativeDestroy 方法 ;
如果调用 next 获取下一个消息时 , 如果当前消息队列 MessageQueue 中没有消息 , 此时需要阻塞 , 调用 nativePollOnce 即可实现在 native 阻塞线程 ;
// 初始化 MessageQueue 时调用的方法 private native static long nativeInit(); // 销毁 MessageQueue 时调用的方法 private native static void nativeDestroy(long ptr); // 线程阻塞方法 @UnsupportedAppUsage private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/ // 线程唤醒方法 private native static void nativeWake(long ptr); private native static boolean nativeIsPolling(long ptr); private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events); // 此处初始化 MessageQueue , 调用了 nativeInit 方法 MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); } // 获取消息队列中的下一个消息 @UnsupportedAppUsage Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } // 此处阻塞线程 nativePollOnce(ptr, nextPollTimeoutMillis); } }
二、MessageQueue 的 native 层阻塞机制
线程阻塞方法 private native void nativePollOnce(long ptr, int timeoutMillis) , 是 native 方法 , 该方法在 frameworks/base/core/jni/android_os_MessageQueue.cpp 中实现 ,
从 Java 层传入 long 类型 , 然后转为 NativeMessageQueue* 类型指针 ,
该 Java 层传入的 long 类型是初始化消息队列时 , 由 nativeInit 方法返回 , 是 消息队列在 Native 层的指针 ,
之后 NativeMessageQueue 指针调用了其本身的 pollOnce 函数 , 该函数中 , 主要调用了 Looper 的 pollOnce 函数 , mLooper->pollOnce(timeoutMillis) ;
frameworks/base/core/jni/android_os_MessageQueue.cpp 中阻塞相关源码 :
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; // 这是主要操作 mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } } static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
继续查看 Native 层 Looper.cpp 的 pollOnce 方法 ,
Looper.cpp 源码路径是 system/core/libutils/Looper.cpp ,
在该方法中 , 最终调用 Looper.cpp 的 pollInner 方法 ,
在 pollInner 方法中 , 调用了 epoll_wait 方法 , 该方法就是等待方法 , 在该方法中会监听 mEpollFd 文件句柄 ,
#include <sys/epoll.h> int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);
参考 : epoll_wait
system/core/libutils/Looper.cpp 中阻塞相关源码 :
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } } int Looper::pollInner(int timeoutMillis) { // ... int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); }
参考 : system/core/libutils/Looper.cpp
三、MessageQueue 的 native 层解除阻塞机制
在 MessageQueue 消息队列的 Java 层 , 将 Message 消息插入到链表表头后 , 调用了 nativeWake 方法 , 唤醒了线程 , 即解除了阻塞 ;
public final class MessageQueue { boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; } }
在 native 层的 frameworks/base/core/jni/android_os_MessageQueue.cpp 实现了上述
Java 层定义的 private native static void nativeWake(long ptr) 方法 ,
注册 JNI 方法方式是动态注册 , 注册的参数如下 , Java 层的 nativeWake 对应的 native 层方法是 android_os_MessageQueue_nativeWake 方法 ,
static const JNINativeMethod gMessageQueueMethods[] = { /* name, signature, funcPtr */ { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit }, { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy }, { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce }, { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake }, { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling }, { "nativeSetFileDescriptorEvents", "(JII)V", (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents }, };
参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
下面是 frameworks/base/core/jni/android_os_MessageQueue.cpp 中的相关方法实现 ,
在 android_os_MessageQueue_nativeWake 方法中调用了 本身的 wake 方法 ,
在 wake 方法中调用了 system/core/libutils/Looper.cpp 中的 wake 方法 ;
void NativeMessageQueue::wake() { // 此处调用了 Looper 的 wake 函数 mLooper->wake(); } static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); }
参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
查看 system/core/libutils/Looper.cpp 中的 wake 方法 , 在该方法中 ,
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))) 代码说明 ,
向 mWakeEventFd 文件句柄写入了数据 ;
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s", mWakeEventFd, strerror(errno)); } } }
参考 : system/core/libutils/Looper.cpp
阻塞的时候使用的是 mEpollFd 文件句柄 ,
唤醒的时候使用的是 mWakeEventFd 文件句柄 ,
下面分析这两个文件句柄之间的联系 ;
Looper 构造函数 , 调用了 rebuildEpollLocked() 方法 ,
在 rebuildEpollLocked 方法 中调用 mEpollFd = epoll_create(EPOLL_SIZE_HINT) , 创建了一个句柄 ,
调用 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem) 注册事件监听 ,
注册 mEpollFd 句柄 , 监听 mWakeEventFd 句柄的 eventItem 事件 ,
监听的事件是 eventItem.events = EPOLLIN 事件 ,
该事件代表 , 向 mWakeEventFd 文件句柄写入数据 , 此时就对应解除 epoll_wait 阻塞 ;
system/core/libutils/Looper.cpp 中 Looper 构造函数 , rebuildEpollLocked 方法 :
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s", strerror(errno)); AutoMutex _l(mLock); rebuildEpollLocked(); } void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. // 创建句柄 mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union // 注册监听的事件 eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; // 注册事件监听 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno)); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } } }
MessageQueue 消息队列是通过 Linux 的 epoll 机制实现的阻塞 ;