观察者模式的应用,主要的行为就是注册和移除观察者(observer),以及通知所有已注册的Observers。这里介绍的是Chromium项目中实现的线程安全的观察者管理及通知的基础类ObserverListThreadSafe, 它的能力包括:
- 观察者可以在任意线程中注册,消息回调会发生在注册时所在的线程。
- 任意线程可以Notify()触发通知消息。
- 观察者可以在回调时从列表中移除自己。
- 如果一个线程正在通知观察者, 此时一个观察者正在从列表中移除自己, 通知会被丢弃。
线程安全的基础
实现这个基础是记录线程与观察者列表的对应关系,即某个线程上存在的观察者的列表。定义如下:
typedef std::map<base::PlatformThreadId, ObserverListContext*>
ObserversListMap;
其中PlatformThreadID为线程的ID, 而ObserverListContext是一个数组,其定义如下:
struct ObserverListContext {
scoped_refptr<base::MessageLoopProxy> loop;
ObserverList<ObserverType> list;
};
其中loop为Chromium线程机制中的MessageLoopProxy, 也可以理解为线程的消息队列代理,使用它就可以完成将某个操作抛到指定线程上运行。list就很好理解了,记录的是该线程中的观察者列表。
只要全局的持有这个表(ObserversListMap),就可以将观察者与线程关联起来,进而保证通知一定可以运行到它注册时所在的线程。
设计总览及使用方式
其中ObserverListThreadSafe就是我们的主角。它本身是为一个模板类:
template <class ObserverType>
class ObserverListThreadSafe
: public base::RefCountedThreadSafe<
ObserverListThreadSafe<ObserverType>,
ObserverListThreadSafeTraits<ObserverType> > {
下面是一个使用的示例:
// The interface to receive mouse movement events.
class MEDIA_EXPORT MouseEventListener {
public:
// |position| is the new mouse position.
virtual void OnMouseMoved(const SkIPoint& position) = 0;
protected:
virtual ~MouseEventListener() {}
};
typedef ObserverListThreadSafe<UserInputMonitor::MouseEventListener>
MouseListenerList;
scoped_refptr<MouseListenerList> mouse_listeners_;
添加/删除Observer方法很简单:
void UserInputMonitor::AddMouseListener(MouseEventListener* listener) {
mouse_listeners_->AddObserver(listener);
......
}
void UserInputMonitor::RemoveMouseListener(MouseEventListener* listener) {
mouse_listeners_->RemoveObserver(listener);
......
}
当需要通知各个观察者时,代码如下:
SkIPoint position(SkIPoint::Make(event->u.keyButtonPointer.rootX,
event->u.keyButtonPointer.rootY));
mouse_listeners_->Notify(
&UserInputMonitor::MouseEventListener::OnMouseMoved, position);
ObserverListThreadSafe实现要点
操作线程-观察者列表时加锁
即内部成员变量list_lock_,在操作observer_list_要使用如下方法加锁:
base::AutoLock lock(list_lock_);
添加及删除观察者
基本思路就是:
* 以当前线程ID,找到ObserverListContext。如果新增但又没有,则新建。
* 操作找到的ObserverListContext进行添加或删除操作。
下面是AddObserver()的实现:
// Add an observer to the list. An observer should not be added to
// the same list more than once.
void AddObserver(ObserverType* obs) {
// If there is not a current MessageLoop, it is impossible to notify on it,
// so do not add the observer.
if (!base::MessageLoop::current())
return;
ObserverList<ObserverType>* list = NULL;
base::PlatformThreadId thread_id = base::PlatformThread::CurrentId();
{
base::AutoLock lock(list_lock_);
if (observer_lists_.find(thread_id) == observer_lists_.end())
observer_lists_[thread_id] = new ObserverListContext(type_);
list = &(observer_lists_[thread_id]->list);
}
list->AddObserver(obs);
}
事件通知
这个过程实现上,因为需要兼容不同数理的参数,所以定义了一组模板方法。先说明一下基本思路:
* 封装要调用的通知方法为Callback形式:函数,及使用tuple(不是C++11的元组,而是base::tuple)封装起来的参数。
* 遍历observer_lists_,找到每个线程对应的ObserverListContext。
* 使用ObserverListContext中记录的MessageProxyLoop,执行NotifyWrapper,并传入Callback作为参数。
以上这个过程就是在通知的线程的完成的, 具体的代码如下:
template <class Method, class Params>
void Notify(const UnboundMethod<ObserverType, Method, Params>& method) {
base::AutoLock lock(list_lock_);
typename ObserversListMap::iterator it;
for (it = observer_lists_.begin(); it != observer_lists_.end(); ++it) {
ObserverListContext* context = (*it).second;
context->loop->PostTask(
FROM_HERE,
base::Bind(&ObserverListThreadSafe<ObserverType>::
template NotifyWrapper<Method, Params>, this, context, method));
}
}
下一步就是在各个observer所在的线程上触发NotifyWrapper了。它主要做两件事:
* 为每一个observer运行通知的函数:
{
typename ObserverList<ObserverType>::Iterator it(context->list);
ObserverType* obs;
while ((obs = it.GetNext()) != NULL)
method.Run(obs);
}
- 如果发现这个线程上已经没有可用的观察者,则将它从observer_list_中移除。
封装的通知方法
再回头看一下对回调方法封装,上面提到了参数是使用base::tuple封装的,它同时也提供了DispatchToMethod方法,把参数解开,再调用方法,详见base::tuple中的说明。
下面是UnboundMethod的定义:
// An UnboundMethod is a wrapper for a method where the actual object is
// provided at Run dispatch time.
template <class T, class Method, class Params>
class UnboundMethod {
public:
UnboundMethod(Method m, const Params& p) : m_(m), p_(p) {
COMPILE_ASSERT(
(base::internal::ParamsUseScopedRefptrCorrectly<Params>::value),
badunboundmethodparams);
}
void Run(T* obj) const {
DispatchToMethod(obj, m_, p_);
}
private:
Method m_;
Params p_;
};
如果没有Chromium的线程机制,也是可以实现的,核心是线程的抛转。
小结
这种方法适用于多线程下以函数+参数通知的方式。参数直接抛转到指定的线程不用特别担心线程安全问题。对于获得通知后仍然要跨线程访问数据的情况,则可以考虑: 1.以类似的方式,将数据通过函数参数传递(目前最多为6个)。 2. 如果数据量大,则可以考虑使用Multiversion Concurrency Control的算法,尽量避免加锁的开销。