Android Native层异步消息处理框架

 *本文系作者工作学习总结,尚有不完善及理解不恰当之处,欢迎批评指正*

一、前言

  在NuPlayer中,可以发现许多类似于下面的代码:

 //=======================================//
NuPlayerDriver::NuPlayerDriver(pid_t pid)
: ......
mLooper(new ALooper)
......{
mLooper->setName("NuPlayerDriver Looper");
mLooper->start(
false, /* runOnCallingThread */
true, /* canCallJava */
PRIORITY_AUDIO); mPlayer = new NuPlayer(pid);
mLooper->registerHandler(mPlayer)
} //=======================================//
sp<AMessage> msg = new AMessage(kWhatPerformSeek, this);
msg->setInt32("generation", ++mSeekGeneration);
msg->setInt64("timeUs", seekTimeUs);
status_t err = msg->postAndAwaitResponse(&response); //=======================================//
void NuPlayer::RTSPSource::onMessageReceived(const sp<AMessage> &msg) {
switch (what) {
case XXX:
case YYY:
......
default:
}
}

  这就是android在native层实现的一个异步消息处理机制,在这个机制中所有的处理都是异步的。其基本的处理流程可概述如下:

  • 将变量封装到一个消息AMessage结构体中,然后放到消息队列中去,后台专门有一个线程会从这个队列中取出消息并发送给指定的AHandler处理,在handler的onMessageReceived函数中根据AMessage的mWhat字段转向对应的分支进行处理。

  在这里我们可能会产生这样的疑问:在很多类中都会有各种消息post出来,而后台的异步消息处理线程又是怎么知道将一个消息发送给哪个类的onMessageReceived函数处理呢?

  要搞明白这个问题,就需要分析android在native层的异步消息处理框架。

二、基础类分析

  在android native层的异步消息处理框架中涉及到的类主要有:ALooper、AHandler、AMessage、ALooperRoster、LooperThread等。

  在android开源代码中,这些类的声明头文件位于:/frameworks/av/include/media/stagefright/foundation/ ,实现功能的源代码位于:/frameworks/av/media/libstagefright/foundation/ ,下面就分别对这些类进行分析。

  1、AMessage -- 消息的载体

  结构体AMessage是消息的载体,在传递消息的过程中可以携带各种信息。

 //AMessage类简析
struct AMessage : public RefBase {
//构造函数,在其中会对AMessage的两个重要数据成员mWhat和mTarget进行初始化设置。
AMessage();
AMessage(uint32_t what, const sp<const AHandler> &handler);
void setWhat(uint32_t what);//设置mWhat
uint32_t what() const;//返回mWhat
void setTarget(const sp<const AHandler> &handler);//设置mTarget
//一系列的set和find函数,用于在传递消息的过程中携带各种信息
void setInt32(const char *name, int32_t value);
void setInt64(const char *name, int64_t value);
//....
bool findInt32(const char *name, int32_t *value) const;
bool findInt64(const char *name, int64_t *value) const;
//投递消息到消息队列中
status_t post(int64_t delayUs = );
// Posts the message to its target and waits for a response (or error) before returning.
status_t postAndAwaitResponse(sp<AMessage> *response);
protected:
virtual ~AMessage();//析构函数
private:
friend struct ALooper; // deliver()
//两个重要的私有数据成员:
//mWhat指明这是一个什么消息,用于在onMessageReceived处理分支中进行匹配。
//mTarget用于后台线程在处理这个消息时判断发送给哪一个类处理。
uint32_t mWhat;
ALooper::handler_id mTarget; wp<AHandler> mHandler;
wp<ALooper> mLooper;

  进一步分析AMessage对象的构造函数,首先我们查看AMessage的source code,如下:

 AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler)
: mWhat(what),
mNumItems() {
setTarget(handler);
} void AMessage::setTarget(const sp<const AHandler> &handler) {
if (handler == NULL) {
mTarget = ;
mHandler.clear();
mLooper.clear();
} else {
mTarget = handler->id();
mHandler = handler->getHandler();
mLooper = handler->getLooper();
}
}

  在构造函数中设置mWhat的值为指定的what值,用于指明这是一个什么消息,以便在onMessageReceived函数中找到匹配的分支进行处理。然后会调用setTarget函数,来设置mTarget为指定handler的mID值,mHandler为指定的handler,mLooper为与handler相关的ALooper,这样后台处理线程就可以据此判断将该消息发送给哪一个类(handler)进行处理了。

  构造消息并投递出去的过程,如下示例:

 void NuPlayer::start() {
(new AMessage(kWhatStart, this))->post();
}

  2、ALooper类

  这个类定义了消息循环和后台处理线程。

 //ALooper类简析
struct ALooper : public RefBase {
//定义的两个整型变量类型
typedef int32_t event_id;//event id
typedef int32_t handler_id;//handler id ALooper();//构造函数 // Takes effect in a subsequent call to start().
void setName(const char *name); handler_id registerHandler(const sp<AHandler> &handler);//注册handler
void unregisterHandler(handler_id handlerID);//注销handler
//启动后台处理线程处理事件
status_t start(
bool runOnCallingThread = false,
bool canCallJava = false,
int32_t priority = PRIORITY_DEFAULT
);
protected:
virtual ~ALooper();
private:
friend struct AMessage; // post()
//事件的结构体封装
struct Event {
int64_t mWhenUs;
sp<AMessage> mMessage;
};
AString mName;
List<Event> mEventQueue;
struct LooperThread;
sp<LooperThread> mThread;//后台处理线程,LooperThread继承自Thread
// posts a message on this looper with the given timeout
void post(const sp<AMessage> &msg, int64_t delayUs);
bool loop();
}

  3、LooperThread -- 后台处理线程

 struct ALooper::LooperThread : public Thread {
LooperThread(ALooper *looper, bool canCallJava)
: Thread(canCallJava),
mLooper(looper),
mThreadId(NULL) {
}
....
virtual bool threadLoop() {
return mLooper->loop();
}
....
protected:
virtual ~LooperThread() {} private:
ALooper *mLooper;
android_thread_id_t mThreadId;
};

  LooperThread类继承自Thread,其函数threadLoop()用于在线程启动后进行消息处理。在调用run()函数后便会执行threadLoop()函数中的程序,当其返回值为true时,该函数会被再次调用。

  4、AHandler -- 消息处理类的父类

  AHandler类用于对接收到的消息进行对应的处理。下面我们结合source code进行分析如下:

 struct AHandler : public RefBase {
AHandler() //构造函数
: mID(), //初始化mID为0
mVerboseStats(false),
mMessageCounter() {
} ALooper::handler_id id() const {//这个函数用于返回内部变量mID的值,其初始值为0,但是可以通过setID()设置
return mID;
} protected:
virtual void onMessageReceived(const sp<AMessage> &msg) = ; private:
friend struct AMessage; // deliverMessage()
friend struct ALooperRoster; // setID() ALooper::handler_id mID;
wp<ALooper> mLooper;
//setID()在其友元类ALooperRoster的registerHandler()函数中调用。
inline void setID(ALooper::handler_id id, wp<ALooper> looper) {
mID = id;
mLooper = looper;
} };

  5、ALooperRoster类

  ALooperRoster可以看做是ALooper的一个辅助类,主要用于完成消息handler的注册与注销工作。下面结合source code分析如下:

 //ALooperRoster简析
struct ALooperRoster {
ALooperRoster(); ALooper::handler_id registerHandler(//注册handler
const sp<ALooper> looper, const sp<AHandler> &handler); void unregisterHandler(ALooper::handler_id handlerID);//注销handler
void unregisterStaleHandlers();//注销旧有的handler,在构造新的ALooper对象时会调用
private:
struct HandlerInfo {
wp<ALooper> mLooper;
wp<AHandler> mHandler;
}; Mutex mLock;
KeyedVector<ALooper::handler_id, HandlerInfo> mHandlers;
ALooper::handler_id mNextHandlerID;
};

三、异步消息处理框架

  首先我们先copy一段代码来看一看异步消息处理框架的搭建过程:

 sp<ALooper> mLooper;
mLooper(new ALooper)//构造函数的初始化列表中
mLooper->setName("NuPlayerDriver Looper");
mLooper->start(false, true, PRIORITY_AUDIO);
mPlayer = new NuPlayer;// struct NuPlayer : public AHandler
mLooper->registerHandler(mPlayer);

  搭建框架1

  调用ALooper的构造函数,实例化一个新的ALooper对象:

 ALooper::ALooper()
: mRunningLocally(false) {
// clean up stale AHandlers. Doing it here instead of in the destructor avoids
// the side effect of objects being deleted from the unregister function recursively.
gLooperRoster.unregisterStaleHandlers();
}

  搭建框架2

  调用ALooper::setName(const char *name)设置ALooper的名字:

 void ALooper::setName(const char *name) {
mName = name;
}

  搭建框架3

  调用ALooper::start()启动异步消息处理后台线程:

 status_t ALooper::start(
bool runOnCallingThread, bool canCallJava, int32_t priority) {
// ...... if (mThread != NULL || mRunningLocally) {
return INVALID_OPERATION;
}
mThread = new LooperThread(this, canCallJava);//构造LooperThread对象
status_t err = mThread->run( //run()之后就会执行LooperThread::threadLooper()函数,在threadlooper()函数中又会调用ALooper::loop()函数
mName.empty() ? "ALooper" : mName.c_str(), priority);
if (err != OK) {
mThread.clear();
}
return err;
}

  搭建框架4

  调用ALooper::registerHandler()注册消息处理器类AHandler:

 ALooper::handler_id ALooper::registerHandler(const sp<AHandler> &handler) {
return gLooperRoster.registerHandler(this, handler);
}

  从这个函数的定义可以看出,能作为handler进行注册的类都必须继承自AHandler这个类,注册的过程也是交给了LooperRoster::registerHandler()函数来处理的。接下来我们分析LooperRoster::registerHandler()处理的具体过程。

 //注册handler的过程可以理解为建立ALooper对象与AHandler对象间的联系
ALooper::handler_id ALooperRoster::registerHandler(
const sp<ALooper> looper, const sp<AHandler> &handler) {
Mutex::Autolock autoLock(mLock);
//由前面对AHandler的介绍可知,在AHandler构造函数中,其mID初始值为0.
//调用AHandler::id()可获取mID值
//调用AHandler::setID()可设置mID值
if (handler->id() != ) {
//任何一个AHandler对象只能注册一次,未注册的mID==0
CHECK(!"A handler must only be registered once.");
return INVALID_OPERATION;
}
//结构体ALooperRoster::HandlerInfo的定义为:
//struct HandlerInfo {
// wp<ALooper> mLooper;
// wp<AHandler> mHandler;
//};
HandlerInfo info;
info.mLooper = looper;
info.mHandler = handler;
//mNextHandlerID是ALooperRoster的私有数据成员,其初始值为1,每注册一个handler其值加1
//gALooperRoster是一个ALooperRoster类型的全局对象,利用ALooperRoster的registerHandler()
//函数注册handler过程中mNextHandlerID从1开始递增,这样保证每一个AHandler对象均有一个独一无二的handler_id
//最后在发送Message进行处理时就是根据这个独一无二的handler_id找到对应得handler进行处理的。
ALooper::handler_id handlerID = mNextHandlerID++;
//在gALooperRoster中有定义KeyedVector<ALooper::handler_id, HandlerInfo> mHandlers;
//mHandlers是一个存储handler_id与HandlerInfo对的容器
//注:一个looper可以有多个handler,但一个handler只能有一个对应的looper
mHandlers.add(handlerID, info);
//设置handler的mID
handler->setID(handlerID, looper); return handlerID;
}

  注:LooperRoster对象gLooperRoster并不是某一个ALooper对象的私有数据成员,而是一个全局的对象,这样就保证了在整个程序运行中利用gLooperRoster::registerHandler()进行handler注册时,每一个AHandler对象均会有一个独一无二的handler_id.

四、消息处理

  消息处理首先要产生消息并将其投递出去,然后等待处理。下面是一段产生消息并投递的实例:

     sp<AMessage> msg = new AMessage(kWhatPerformSeek, mReflector->id());
msg->setInt32("generation", ++mSeekGeneration);
msg->setInt64("timeUs", seekTimeUs);
msg->post(200000ll);

  可以看到通过调用AMessage::post()函数将消息投递出去,在分析post()函数前,我们先了解一下AMessage对象如何建立与ALooper、AHandler的对象的联系?

  首先我们看一下AMessage的构造函数:

 AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler)
  : mWhat(what),
  mNumItems() {
    setTarget(handler);
}

  在构造函数中传递进来what(这是个什么消息)和handler(由谁处理)两个参数,首先初始化mWhat字段,然后调用setTarget函数:

 void AMessage::setTarget(const sp<const AHandler> &handler) {
if (handler == NULL) {
mTarget = ;
mHandler.clear();
mLooper.clear();
} else {
mTarget = handler->id();
mHandler = handler->getHandler();
mLooper = handler->getLooper();
}
}

  在setTarget函数中将mTarget初始化为handler_id,mTarget用于后台线程在处理这个消息时判断发送给哪一个类处理。同时获取与handler相关联的ALooper对象--mLooper = handler->getLooper()。这样就建立了AMessage对象与ALooper、AHandler的对象的联系。

  接下来我们分析AMessage::post()函数,source code如下:

 status_t AMessage::post(int64_t delayUs) {
sp<ALooper> looper = mLooper.promote();
if (looper == NULL) {
ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);
return -ENOENT;
} looper->post(this, delayUs);
return OK;
}

可以看到,post之后调用了ALooper::post()来处理,下面是器source code:

 void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
Mutex::Autolock autoLock(mLock); int64_t whenUs;
if (delayUs > ) {
whenUs = GetNowUs() + delayUs;
} else {
whenUs = GetNowUs();
} List<Event>::iterator it = mEventQueue.begin();
while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {
++it;
}
//上面这一段代码,主要是遍历消息队列中的消息的时间,然后和我们的消息做对比,最终目的就是所有的消息必须按照时间先后顺序放在队列中等待执行。
//构造一个Event消息,将我们的msg设置进去
Event event;
event.mWhenUs = whenUs;
event.mMessage = msg; if (it == mEventQueue.begin()) {
mQueueChangedCondition.signal();
}
//将我们的消息放入到消息队列中,等待执行
mEventQueue.insert(it, event);
}

五、后台线程消息处理

  前面有提到创建ALooper对象后调用ALooper::start()函数启动了一个后台消息处理线程,接下来我们进行详细分析:

 status_t ALooper::start(
bool runOnCallingThread, bool canCallJava, int32_t priority) {
//......
if (mThread != NULL || mRunningLocally) {
return INVALID_OPERATION;
}
mThread = new LooperThread(this, canCallJava);//创建LooperThread对象
//**********************************************************************
status_t err = mThread->run( //
mName.empty() ? "ALooper" : mName.c_str(), priority);
//**********************************************************************
if (err != OK) {
mThread.clear();
}
return err;
}

  调用LooperThread::run()函数启动后台处理线程,就会转去执行LooperThread::threadLoop()函数:

virtual bool threadLoop() {
return mLooper->loop();
}

  在threadLoop函数中又会调用创建LooperThread对象时传进来的ALooper的loop()函数:

 bool ALooper::loop() {
Event event; {
Mutex::Autolock autoLock(mLock);
....
//取出Event队列的队头消息,取出后然后把它从队列中删掉
event = *mEventQueue.begin();
mEventQueue.erase(mEventQueue.begin());
}
//调用AMessage的deliver函数发送出去执行
event.mMessage->deliver(); return true;
}

  注:如果ALooper::loop()返回true,则threadLoop()函数会被再次调用,后台线程继续执行,如果返回false则后台线程停止运行。

 在ALooper::loop()函数中取出队头的消息并调用AMessage::deliver()函数发送出去:

 void AMessage::deliver() {
sp<AHandler> handler = mHandler.promote();
if (handler == NULL) {
ALOGW("failed to deliver message as target handler %d is gone.", mTarget);
return;
}
handler->deliverMessage(this);
}

  AMessage::deliver()函数中,

  首先得到处理message的handler -- sp<AHandler> handler = mHandler.promote();

  然后会调用AHandler::deliverMessage()函数 -- handler->deliverMessage(this);

  转到处理该message的handler进行处理:

 void AHandler::deliverMessage(const sp<AMessage> &msg) {
onMessageReceived(msg);
mMessageCounter++; if (mVerboseStats) {
uint32_t what = msg->what();
ssize_t idx = mMessages.indexOfKey(what);
if (idx < ) {
mMessages.add(what, );
} else {
mMessages.editValueAt(idx)++;
}
}
}

  在AHandler::deliverMessage()函数中调用了AHandler::onMessageReceived(msg)函数,上面有提到该函数中多多个case分支,根据message的what字段转到对应的分支进行处理,比如在NuPlayer中:

 void NuPlayer::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatSetDataSource:
case kWhatVideoNotify:
case kWhatAudioNotify:
case kWhatRendererNotify:
case kWhatMoreDataQueued:
case kWhatReset:
case kWhatSeek:
case kWhatPause:
case kWhatResume:
default:
}
}

  至此消息处理的整个流程结束。

六、总结

  1. 首先需要构造一个AMessage对象,必须携带两个参数:what(什么消息)和handler(谁处理);
  2. ALooper中有一个后台线程--LooperThread,该线程维护着一个消息队列List<Event> mEventQueue,线程函数不断从这个队列中取出消息执行;
  3. 消息的发送和取出需要调用ALooper,AMessage,AHandler等的函数。

微信扫一扫,关注玖零日记,获取更多相关资讯及源码 -- 虽无面朝大海,依旧春暖花开

Android Native层异步消息处理框架

上一篇:回收进程用户空间资源 exit()函数 _exit()函数 atexit()函数 on_exit()函数


下一篇:终止函数 atexit()