【从源码看Android】03Android MessageQueue消息循环处理机制(epoll实现)

1 enqueueMessage


handler发送一条消息

mHandler.sendEmptyMessage(1);
经过层层调用,进入到sendMessageAtTime函数块,最后调用到enqueueMessage

Handler.java

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }


最后调用到Handler私有的函数enqueueMessage,把handler对象赋值给msg.target,调用queue.enqueueMessage

Handler.java
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }


下面是核心代码,首先是获得同步锁,

MessageQueue.java

boolean enqueueMessage(Message msg, long when) {
        if (msg.isInUse()) {
            throw new AndroidRuntimeException(msg + " This message is already in use.");
        }
        if (msg.target == null) {
            throw new AndroidRuntimeException("Message must have a target.");
        }

        synchronized (this) {
            if (mQuitting) {
                RuntimeException e = new RuntimeException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w("MessageQueue", e.getMessage(), e);
                return false;
            }

            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

首先是获得自身的同步锁synchronized (this),接着这个msg跟MessageQueue实例的头结点Message进行触发时间先后的比较,

如果触发时间比现有的头结点Message前,则这个新的Message作为整个MessageQueue的头结点,如果阻塞着,则立即唤醒线程处理

如果触发时间比头结点晚,则按照触发时间先后,在消息队列中间插入这个结点

接着如果需要唤醒,则调用nativeWake函数


在android_os_MessageQueue.cpp里定义了nativeWake函数

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    return nativeMessageQueue->wake();
}

实际调用到mLooper->wake();

android_os_MessageQueue.cpp

void NativeMessageQueue::wake() {
    mLooper->wake();
}
而mLooper是cpp层的Looper对象,

framework/base/libs/utils/Looper.cpp

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ wake", this);
#endif

#ifdef LOOPER_STATISTICS
    // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
    if (mPendingWakeCount++ == 0) {
        mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
    }
#endif

    ssize_t nWrite;
    do {
        nWrite = write(mWakeWritePipeFd, "W", 1);
    } while (nWrite == -1 && errno == EINTR);

    if (nWrite != 1) {
        if (errno != EAGAIN) {
            LOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

是不是很熟悉?基本就是上一讲epoll原型的唤醒函数,向mWakeWritePipeFD写入1字节,唤醒监听block在mWakeReadPipeFD端口的epoll_wait



2 dequeueMessage


首先dequeueMessage只是我取的一个叫法,当java层的Looper进行loop的时候,就已经在不停地读取MessageQueue里的Message了

Looper.java

public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;

        // Make sure the identity of this thread is that of the local process,
        // and keep track of what that identity token actually is.
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        for (;;) {
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }

            msg.target.dispatchMessage(msg);

            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // Make sure that during the course of dispatching the
            // identity of the thread wasn't corrupted.
            final long newIdent = Binder.clearCallingIdentity();
            if (ident != newIdent) {
                Log.wtf(TAG, "Thread identity changed from 0x"
                        + Long.toHexString(ident) + " to 0x"
                        + Long.toHexString(newIdent) + " while dispatching to "
                        + msg.target.getClass().getName() + " "
                        + msg.callback + " what=" + msg.what);
            }

            msg.recycle();
        }
    }

调用queue.next()读取下一条消息(在loop调用的线程中),如果读取到了就msg,target.dispatchMessage,

下面来看看queue.next()如何实现


MessageQueue.java

Message next() {
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            // We can assume mPtr != 0 because the loop is obviously still running.
            // The looper will not call this method after the loop quits.
            nativePollOnce(mPtr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (false) Log.v("MessageQueue", "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf("MessageQueue", "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }

首先是个包内函数,所以在同一个包中(android.os)的Looper对象能调用到

nativePollOnce(mPtr, nextPollTimeoutMillis);函数待会展开,功能是调用上一讲的epoll_wait,

nextPollTimeoutMillis超时时间为下一条Message的触发时间,如果没有消息则会一直阻塞到超过超时时间

被唤醒后,我们暂时先忽略barrier类型的Message(这是android4.1后加入的一个特性Choreographer,http://blog.csdn.net/innost/article/details/8272867),

如果头结点msg不为null,就判断现在到了这条msg触发时间没有,

如果没到,则nextPollTimeoutMillis设置为这个条消息需要执行的时间和现在的时间差,给for循环下一次调用nativePollOnce时使用

如果到了甚至超过了,则取出这条msg,退出for循环返回这条msg,给上面上的handler进行dispatch


那么nativePollOnce具体是如何实现的呢?

android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jint ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(timeoutMillis);
}


调用到了nativeMessageQueue->pollOnce

android_os_MessageQueue.cpp

void NativeMessageQueue::pollOnce(int timeoutMillis) {
    mLooper->pollOnce(timeoutMillis);
}


调用到了mLooper->pollOnce

同样,在framework/base/libs/utils/Looper.cpp中

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            if (! response.request.callback) {
#if DEBUG_POLL_AND_WAKE
                LOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p", this,
                        response.request.ident, response.request.fd,
                        response.events, response.request.data);
#endif
                if (outFd != NULL) *outFd = response.request.fd;
                if (outEvents != NULL) *outEvents = response.events;
                if (outData != NULL) *outData = response.request.data;
                return response.request.ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            LOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = NULL;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}
因为这个流程和mResponses无关,先忽略这部分,

调用到pollInner

framework/base/libs/utils/Looper.cpp

nt Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

#ifdef LOOPER_STATISTICS
    nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
#endif

#ifdef LOOPER_USES_EPOLL
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    bool acquiredLock = false;
#else
    // Wait for wakeAndLock() waiters to run then set mPolling to true.
    mLock.lock();
    while (mWaiters != 0) {
        mResume.wait(mLock);
    }
    mPolling = true;
    mLock.unlock();

    size_t requestedCount = mRequestedFds.size();
    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
#endif

    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }

        LOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = ALOOPER_POLL_ERROR;
        goto Done;
    }

    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        LOGD("%p ~ pollOnce - timeout", this);
#endif
        result = ALOOPER_POLL_TIMEOUT;
        goto Done;
    }

#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

#ifdef LOOPER_USES_EPOLL
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
        } else {
            if (! acquiredLock) {
                mLock.lock();
                acquiredLock = true;
            }

            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    if (acquiredLock) {
        mLock.unlock();
    }
Done: ;
#else
    for (size_t i = 0; i < requestedCount; i++) {
        const struct pollfd& requestedFd = mRequestedFds.itemAt(i);

        short pollEvents = requestedFd.revents;
        if (pollEvents) {
            if (requestedFd.fd == mWakeReadPipeFd) {
                if (pollEvents & POLLIN) {
                    awoken();
                } else {
                    LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
                }
            } else {
                int events = 0;
                if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
                if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
                if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
                if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
                pushResponse(events, mRequests.itemAt(i));
            }
            if (--eventCount == 0) {
                break;
            }
        }
    }

Done:
    // Set mPolling to false and wake up the wakeAndLock() waiters.
    mLock.lock();
    mPolling = false;
    if (mWaiters != 0) {
        mAwake.broadcast();
    }
    mLock.unlock();
#endif

#ifdef LOOPER_STATISTICS
    nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
    mSampledPolls += 1;
    if (timeoutMillis == 0) {
        mSampledZeroPollCount += 1;
        mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
    } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
        mSampledTimeoutPollCount += 1;
        mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
                - milliseconds_to_nanoseconds(timeoutMillis);
    }
    if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
        LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
                0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
                0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
        mSampledPolls = 0;
        mSampledZeroPollCount = 0;
        mSampledZeroPollLatencySum = 0;
        mSampledTimeoutPollCount = 0;
        mSampledTimeoutPollLatencySum = 0;
    }
#endif

    for (size_t i = 0; i < mResponses.size(); i++) {
        const Response& response = mResponses.itemAt(i);
        if (response.request.callback) {
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
                    response.request.fd, response.events, response.request.data);
#endif
            int callbackResult = response.request.callback(
                    response.request.fd, response.events, response.request.data);
            if (callbackResult == 0) {
                removeFd(response.request.fd);
            }

            result = ALOOPER_POLL_CALLBACK;
        }
    }
    return result;
}

主要看#ifdef  LOOPER_USES_EPOLL部分

int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

等待所有attach到mEpollFd上的事件,如果收到唤醒信号继续执行,否则阻塞等待

之后的#ifdef  LOOPER_USES_EPOLL部分

#ifdef LOOPER_USES_EPOLL
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
        } else {
            if (! acquiredLock) {
                mLock.lock();
                acquiredLock = true;
            }

            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    if (acquiredLock) {
        mLock.unlock();
    }
Done: ;

对所有attach在mEpollFd上的事件进行遍历,如果对象文件描述符有mWakeReadPipeFd,则awoken()

framework/base/libs/utils/Looper.cpp

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ awoken", this);
#endif

#ifdef LOOPER_STATISTICS
    if (mPendingWakeCount == 0) {
        LOGD("%p ~ awoken: spurious!", this);
    } else {
        mSampledWakeCycles += 1;
        mSampledWakeCountSum += mPendingWakeCount;
        mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
        mPendingWakeCount = 0;
        mPendingWakeTime = -1;
        if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
            LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
                    0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
                    float(mSampledWakeCountSum) / mSampledWakeCycles);
            mSampledWakeCycles = 0;
            mSampledWakeCountSum = 0;
            mSampledWakeLatencySum = 0;
        }
    }
#endif

    char buffer[16];
    ssize_t nRead;
    do {
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

awoken()即上一讲中得awoken()函数,用于把mWakeReadPipeFd上的数据读取干净,因为mWakeWriteReadPipeFd可能写入多次

读取干净后下一次epoll_wait时就会等待mWakeWriteReadPipeFd写入,如果没有读取干净,即通知epoll内核和mWakeReadPipeFd这个事件相关的处理完毕了,

否则epoll_wait就一直会触发对应的事件了(不等待新的写入,一直不阻塞)


3 总结

那么至此,enqueueMessage和定义dequeueMessage都解释清楚,感觉豁然开朗了有木有!!!!

下一讲讲nativeapp的线程消息循环处理过程(主要解读android_native_app_glue.c)

欢迎各位指正!!


4 reference

android sdk sourcecode

android framework sourcecode




【从源码看Android】03Android MessageQueue消息循环处理机制(epoll实现),布布扣,bubuko.com

【从源码看Android】03Android MessageQueue消息循环处理机制(epoll实现)

上一篇:移动中间件产品的解决方案


下一篇:关于苹果最新语言Swift