android系统核心机制 基础(06)handler message机制 Native

系列文章解读&说明:

Android系统核心机制基础 的 分析主要分为以下部分:

(01)智能指针wp & sp

(02)Thread类解析

(03)Thread同步机制

(04)handler message机制 java

(05)handler使用案例(Java)

(06)handler message机制 Native

(07)AsyncChannel机制浅析

(08)JNI 基础

(09)Android 添加新模块

本模块分享的内容:handler message机制 Native

本章关键点总结 & 说明:

android系统核心机制 基础(06)handler message机制 Native

这里关注➕ handler Message机制中Native层,通过MessageQueue创建来分析Native层消息处理流程,后以Java层投递Message来分析,最后发现都要走到native层的消息处理部分;最后以native的Activity案例来分析添加和处理 监控请求流程。

以前Android2.3 之前是只有Java层 可以向MessageQueue中添加消息,Native层不可以,在那之后MessageQueue的核心机制 转移到native层,Native层的代码也可以使用handler message机制。因此MessageQueue属于心系native层和java层的两个世界。

1 MessageQueue创建

MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();//构造函数由Native实现
    }

nativeInit()真正实现是android_os_MessageQueue_nativeInit()函数,其代码如下:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }
    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

nativeInit函数在Native层创建了一个与MessageQueue对应的NativeMessageQueue对象。

NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
     /* Looper在Native层中出现,一个线程会有一个Looper来循环处理消息队列中的消息。下面一行的调用就是取得保存在线程本地存储空间(Thread Local Storage)中的Looper对象 */
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        //如果第一次执行,该线程没有设置本地存储,会创建一个Looper,将其保存到TLS中
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

Native的Looper它的类名和Java层的Looper类一样,但此二者其实并无任何关系。

2 提取消息

一切就绪后,Java层Looper会在一个循环中提取并处理消息。MessageQueue同时支持Java层和Native层的事件,那么其next()方法如何实现呢?代码如下:

    Message next() {
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            //...
            // mPtr保存了NativeMessageQueue的指针,调用nativePollOnce进行等待
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // mMessages用来存储消息,这里从其中取一个消息进行处理
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (false) Log.v("MessageQueue", "Returning message: " + msg);
                        return msg;//返回一个Message给Looper进行派发和处理
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }
                //...
            }
            //...
            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }

这里要注意:要让nativePollOnce()返回,至少要添加一个消息到消息队列,否则nativePollOnce()不过是做了一次无用功。如果nativePollOnce()将在Native层等待,就表明Native层也可以投递Message,但是从Message类的实现代码上看,该类和Native层没有建立任何关系。那么nativePollOnce()在等待什么呢?因为nativePollOnce()不仅在等待Java层来的Message,实际上还在Native还做了大量的工作。

3 Message投递到MessageQueue

这里从Java层投递Message并触发nativePollOnce工作,根据之前文章的分析,MessageQueue的enqueueMessage函数完成将一个Message投递到MessageQueue中的工作,其代码如下:


    boolean enqueueMessage(Message msg, long when) {
        //...
        synchronized (this) {
            //...
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            //如果p为空,表明消息队列中没有消息,那么msg将是第一个消息,needWake需要根据mBlocked的情况考虑是否触发
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                //如果p不为空,表明消息队列中还有剩余消息,需要将新的msg加到消息尾
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    //因为消息队列之前还剩余有消息,所以这里不用调用nativeWakeup
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

上面代码主要功能:

  1. 将message按执行时间排序,并加入消息队。
  2. 根据情况调用nativeWake函数,以触发nativePollOnce函数,结束等待。

nativeWake函数的代码;实现如下:

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    return nativeMessageQueue->wake();
}

继续分析nativeMessageQueue->wake(); 实现如下:

void NativeMessageQueue::wake() {
    mLooper->wake();
}

继续分析wake的实现,代码如下:

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    ssize_t nWrite;
    do {
        // 向管道的写端写入一个字符
        nWrite = write(mWakeWritePipeFd, "W", 1);
    } while (nWrite == -1 && errno == EINTR);

    if (nWrite != 1) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

Wake()函数则更为简单,仅仅向管道的写端写入一个字符”W”,这样管道的读端就会因为有数据可读而从等待状态中醒来。

4 nativePollOnce函数分析

nativePollOnce()的实现函数是android_os_MessageQueue_nativePollOnce,代码如下:


static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    // 取出NativeMessageQueue对象,并调用它的pollOnce
    nativeMessageQueue->pollOnce(env, timeoutMillis);
}

继续分析pollOnce,代码如下:

void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
    mInCallback = true;
    mLooper->pollOnce(timeoutMillis);//关键点
    mInCallback = false;
    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

这里传递到了Looper的pollOnce,代码如下:

inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL);
}

上面的函数将调用另外一个有4个参数的pollOnce函数,这个函数的原型如下:

int pollOnce(int timeoutMillis, int* outFd, int*outEvents, void** outData)

这里简单说下 该方法参数:

  1. timeoutMillis表示延迟
  2. outFd就是监听句柄
  3. outEvents用来存储在该文件描述符1上发生了哪些事件,目前支持可读、可写、错误和中断4个事件。这4个事件其实是从epoll事件转化而来。
  4. outData用于存储上下文数据,这个上下文数据是由用户在添加监听句柄时传递的,它的作用和pthread_create函数最后一个参数param一样,用来传递用户自定义的数据。

pollOnce函数的返回值也具有特殊的意义,具体如下:

返回值 含义
ALOOPER_POLL_WAKE 这次返回是由wake函数触发的,也就是管道写端的那次写事件触发的。
ALOOPER_POLL_TIMEOUT 等待超时
ALOOPER_POLL_ERROR 等待过程中发生错误
ALOOPER_POLL_CALLBACK 某个被监听的句柄因某种原因被触发。这时,outFd参数用于存储发生事件的文件句柄,outEvents用于存储所发生的事件。

 

继续分析pollOnce,代码如下:


int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            //mResponses是一个Vector,这里首先需要处理response
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                //对于没有callback的Response,pollOnce只是返回它的ident,并没有做什么处理。        
                //因为没有callback,所以系统也不知道如何处理
                return ident;
            }
        }

        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        //调用pollInner函数。注意,它在for循环内部
        result = pollInner(timeoutMillis);
    }
}

继续pollInner,代码分析:

int Looper::pollInner(int timeoutMillis) {
    if(timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
       nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
       //...根据Native Message的信息计算此次需要等待的时间
       timeoutMillis = messageTimeoutMillis;
    }
	
    intresult = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
#ifdef LOOPER_USES_EPOLL  // 只讨论使用epoll进行I/O复用的方式
    structepoll_event eventItems[EPOLL_MAX_EVENTS];
    //调用epoll_wait,等待感兴趣的事件或超时发生,关键点1
    inteventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS,timeoutMillis);
#else
    //...使用别的方式进行I/O复用
#endif
 
    //从epoll_wait返回,这时候一定发生了什么事情
    mLock.lock();
    if(eventCount < 0) { //返回值小于零,表示发生错误
        if(errno == EINTR) {
           goto Done;
        }
        //设置result为ALLOPER_POLL_ERROR,并跳转到Done
       result = ALOOPER_POLL_ERROR;
        gotoDone;
    }
 
    //eventCount为零,表示发生超时,因此直接跳转到Done
    if(eventCount == 0) {
      result = ALOOPER_POLL_TIMEOUT;
        gotoDone;
    }
#ifdef LOOPER_USES_EPOLL
    // 根据epoll的用法,此时的eventCount表示发生事件的个数
    for (inti = 0; i < eventCount; i++) {
        intfd = eventItems[i].data.fd;
       uint32_t epollEvents = eventItems[i].events;
        /* 之前通过pipe函数创建过两个fd,这里根据fd知道是管道读端有可读事件。
         读者还记得对nativeWake函数的分析吗?在那里我们向管道写端写了一个”W”字符,这样
         就能触发管道读端从epoll_wait函数返回了 */
        if(fd == mWakeReadPipeFd) {
           if (epollEvents & EPOLLIN) {
                // awoken函数直接读取并清空管道数据,读者可自行研究该函数
               awoken();
           }
           //......
        }else {
          /* mRequests和前面的mResponse相对应,它也是一个KeyedVector,其中存储了
           fd和对应的Request结构体,该结构体封装了和监控文件句柄相关的一些上下文信息,
            例如回调函数等。我们在后面的小节会再次介绍该结构体 */
           ssize_t requestIndex = mRequests.indexOfKey(fd);
           if (requestIndex >= 0) {
               int events = 0;
               // 将epoll返回的事件转换成上层LOOPER使用的事件
               if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
               if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
               if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
               if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
               // 每处理一个Request,就相应构造一个Response
               pushResponse(events, mRequests.valueAt(requestIndex));
           } 
           //...
        }
    }
Done: ;
#else
     //...
#endif
 
    // 除了处理Request外,还处理Native的Message
    mNextMessageUptime = LLONG_MAX;
    while(mMessageEnvelopes.size() != 0) {
       nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
       const MessageEnvelope& messageEnvelope =mMessageEnvelopes.itemAt(0);
        if(messageEnvelope.uptime <= now) {
           {
               sp<MessageHandler> handler = messageEnvelope.handler;
               Message message = messageEnvelope.message;
               mMessageEnvelopes.removeAt(0);
               mSendingMessage = true;
               mLock.unlock();
               /* 调用Native的handler处理Native的Message
                从这里也可看出Native Message和Java层的Message没有什么关系 */
               handler->handleMessage(message);
           }
           mLock.lock();
           mSendingMessage = false;
           result = ALOOPER_POLL_CALLBACK;
        }else {
            mNextMessageUptime = messageEnvelope.uptime;
           break;
        }
    }

    mLock.unlock();
    // 处理那些带回调函数的Response
    for (size_t i = 0; i < mResponses.size();i++) {
       const Response& response = mResponses.itemAt(i);
       ALooper_callbackFunc callback = response.request.callback;
        if(callback) {// 有了回调函数,就能知道如何处理所发生的事情了
           int fd = response.request.fd;
           int events = response.events;
           void* data = response.request.data;
           // 调用回调函数处理所发生的事件
           int callbackResult = callback(fd, events, data);
           if (callbackResult == 0) {
               // callback函数的返回值很重要,如果为0,表明不需要再次监视该文件句柄
                removeFd(fd);
           }
           result = ALOOPER_POLL_CALLBACK;
        }
    }
    returnresult;
}

这里回顾下pollInner函数几个关键点:

计算一下真正需要等待的时间;调用epoll_wait函数等待;epoll_wait函数返回,这时候可能有三种情况:

  1. 发生错误,则跳转到Done处。
  2. 超时,这时候也跳转到Done处。
  3. epoll_wait监测到FD上有事件发生。

如果e poll_wait因为文件句柄有事件而返回,此时需要根据文件句柄来分别处理:

  1. 如果是管道读这一端有事件,认为是控制命令,可以直接读取管道中的数据。
  2. 如果是其他FD发生事件,则根据Request构造Response,并push到Response数组中。

真正开始处理事件是在有Done标志的位置:

  1. 首先处理Native的Message。调用Native Handler的handleMessage处理该Message。
  2. 处理Response数组中那些带有callback的事件。

以上 处理流程比较清晰,另一个关键点是mRequests,接下来分析下mRequests。

5 添加监控请求

添加监控请求其实就是调用epoll_ctl增加文件句柄。下面通过从Native的Activity找到的一个例子来分析mRequests。

static jint loadNativeCode_native(JNIEnv* env, jobject clazz,jstring path,

                          jstringfuncName,jobject messageQueue,

                          jstringinternalDataDir, jstring obbDir,

                          jstringexternalDataDir, int sdkVersion,

                          jobject jAssetMgr,jbyteArray savedState)

{
    //...
    /* 调用Looper的addFd函数。
        第一个参数表示监听的fd;
        第二个参数0表示ident;
        第三个参数表示需要监听的事件,这里为只监听可读事件;
        第四个参数为回调函数,当该fd发生,指定事件时,looper将回调该函数;
        第五个参数code为回调函数的参数*/
    code->messageQueue->getLooper()->addFd(code->mainWorkRead, 0,                   
                        ALOOPER_EVENT_INPUT, mainWorkCallback, code);
        
    //...
}

继续分析addFd()。代码如下:

//addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data)->
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
    if (!callback.get()) {
        //...
    } else {
        ident = POLL_CALLBACK;
    }

    // 将用户的事件转换成epoll使用的值
    int epollEvents = 0;
    if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
    if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;

    { // acquire lock
        AutoMutex _l(mLock);

        Request request;// 创建一个Request对象
        request.fd = fd;
        request.ident = ident;
        request.callback = callback;
        request.data = data;// 保存用户自定义数据

        struct epoll_event eventItem;
        memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
        eventItem.events = epollEvents;
        eventItem.data.fd = fd;

        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
            // 如果是新的文件句柄,则需要为epoll增加该fd
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
            //...
            // 保存Request到mRequests键值数组
            mRequests.add(fd, request);
        } else {
            // 如果之前加过,那么就修改该监听句柄的一些信息
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
            //...
            mRequests.replaceValueAt(requestIndex, request);
        }
    } // release lock
    return 1;
}

6 处理监控请求

我们发现在pollInner()函数中,当某个监控fd上发生事件后,就会把对应的Request取出来。pushResponse()的实现如下所示:


void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;//保存所发生的事情和对应的Request
    mResponses.push(response); //保存到mResponse数组
}

根据前面的知识可知,并不是单独处理Request,而是需要先收集Request,等到Native Message消息处理完之后再做处理。这表明,在处理逻辑上,Native Message的优先级高于监控FD的优先级。

7 Native的sendMessage

从4.0开始,Native层也支持sendMessage()了。sendMessage()的代码如下:

void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
    nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    sendMessageAtTime(now, handler, message);
}

继续分析 sendMessageAtTime,代码如下:

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
        const Message& message) {
    size_t i = 0;
    { // acquire lock
        AutoMutex _l(mLock);

        size_t messageCount = mMessageEnvelopes.size();
        // 按时间排序,将消息插入到正确的位置上
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }

        MessageEnvelope messageEnvelope(uptime, handler, message);
        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
        // mSendingMessage和Java层中的那个mBlocked一样,是一个小小的优化措施
        if (mSendingMessage) {
            return;
        }
    } // release lock
    // Wake the poll loop only when we enqueue a new message at the head.
    if (i == 0) {// 唤醒epoll_wait,让它处理消息
        wake();
    }
}

8 MessageQueue总结

接下来在累的关系图的角度上来认识一下MessageQueue,

android系统核心机制 基础(06)handler message机制 Native

这里,Java层提供了Looper类(循环处理消息)和MessageQueue类(消息队列封装),Handler辅助消息处理。MessageQueue的相关总结如下:

  1. MessageQueue内部通过(mPtr=NativeMessageQueue)保存对象,mMessages保存来自Java层的Message消息。
  2. NativeMessageQueue保存一个native的Looper对象,该Looper从ALooper派生,提供pollOnce和addFd等函数。
  3. Native层对应Java层也有Message类和MessageHandler抽象类。在编码时,一般使用WeakMessageHandler类。
  4. MessageQueue支持之前Java层的Message加Handler的处理方式。同时MessageQueue在Native层的代表NativeMessageQueue支持来自Native层的Message,通过Native的Message和MessageHandler来处理。
  5. NativeMessageQueue还处理通过addFd添加的Request。

 

上一篇:RocketMQ学习笔记(11)----RocketMQ的PushConsumer和PullConsumer


下一篇:2021年字节跳动+京东+美团面试总结!附超全教程文档