1.前言
webrtc线程源于chromium,其中有消息队列,通信等功能,相对于原始的std::thread或者posix pthread而言,好用不少,本文介绍了webrtc 线程的常用功能以及实现;
2.正文
2.1 webrtc中的主要线程
出于管理接口即时性,平衡IO任务或其它任务阻塞性等,通过异步的方式,将不同类型的任务归类到不同的异步线程去处理是常见的处理方式,webrtc中一共有三大线程
-
signaling_thread_: 处理PeerConnection有关的接口任务和observer回调
-
network_thread_:网络io等
-
worker_thread_:其它阻塞耗时的任务
2.2 使用Invoke在异步线程执行任务
当需要将一个任务放到异步线程的时候,只需要使用thread->Invoke<>()
函数即可,取JsepTransportController::SetLocalDescription()
作为例子, 在函数最开头就检查了,network_thread线程是否是当前线程,如果不是则通过network_thread_->Invoke<>()
将当前函数投递到network_thread_中去执行:
RTCError JsepTransportController::SetLocalDescription(
SdpType type,
const cricket::SessionDescription* description) {
// network线程运行
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<RTCError>(
RTC_FROM_HERE, [=] { return SetLocalDescription(type, description); });
}
RTC_DCHECK_RUN_ON(network_thread_);
if (!initial_offerer_.has_value()) {
initial_offerer_.emplace(type == SdpType::kOffer);
if (*initial_offerer_) {
SetIceRole_n(cricket::ICEROLE_CONTROLLING);
} else {
SetIceRole_n(cricket::ICEROLE_CONTROLLED);
}
}
return ApplyDescription_n(/*local=*/true, type, description);
}
2.3 Invoke的实现和线程任务管理
2.3.1 任务的投递
以上述的SetLocalDescription()
为例,其调用的network_thread_->Invoke()
如下:
template中的第一个参数ReturnT
为执行函数的返回值类型,第二个参数用来SFINAE确保返回值不是void
(还有一个适配void类型的版本)
作为task的任务被转化成了FunctionView
类型的functor,传入到函数InvokeInternal()
中
template <
class ReturnT,
typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
ReturnT result;
InvokeInternal(posted_from, [functor, &result] { result = functor(); });
return result;
}
InvokeInternal()
会将functor转化成Msg handler然后放到this线程队列中去
void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
"src_func", posted_from.function_name());
class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
: functor_(functor) {}
void OnMessage(Message* msg) override { functor_(); }
private:
rtc::FunctionView<void()> functor_;
// 将funtor转化成Msg handler
} handler(functor);
// 发送到this线程队列中
Send(posted_from, &handler);
}
在Thread::Send()
函数中有非常多的细节,首先会判断当前线程和this线程是否相同,是就直接执行,否则生成一个QueueTask 投递到this线程的队列中去
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
RTC_DCHECK(!IsQuitting());
if (IsQuitting())
return;
// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.
// 构造成msg
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
// 如果当前线程就是this线程的话,直接将
// 执行任务即可
if (IsCurrent()) {
msg.phandler->OnMessage(&msg);
return;
}
AssertBlockingIsAllowedOnCurrentThread();
// 获取当前线程
Thread* current_thread = Thread::Current();
#if RTC_DCHECK_IS_ON
if (current_thread) {
RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this));
ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread,
this);
}
#endif
// Perhaps down the line we can get rid of this workaround and always require
// current_thread to be valid when Send() is called.
std::unique_ptr<rtc::Event> done_event;
if (!current_thread)
done_event.reset(new rtc::Event());
bool ready = false;
// 将msg封装达成QueueTask,放到线程队列中
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); },
[this, &ready, current_thread, done = done_event.get()] {
if (current_thread) {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp();
} else {
done->Set();
}
}));
if (current_thread) {
// 当前的thread是google thread
bool waited = false;
crit_.Enter();
while (!ready) {
// 任务未执行完,阻塞等待到任务完成被唤醒
crit_.Leave();
current_thread->socketserver()->Wait(kForever, false); // epoll wait
waited = true;
crit_.Enter();
}
crit_.Leave();
// Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send. Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B. While processing
// the message, thread B Posts a message to A. We consume the wakeup for
// that Post while waiting for the Send to complete, which means that when
// we exit this loop, we need to issue another WakeUp, or else the Posted
// message won't be processed in a timely manner.
if (waited) {
// socketserver有两个使用场景
// 1.像这种给别的线程投递了阻塞任务后,进行wait等到执行完毕
// 2.Thread::Get()函数中获取消息的时候,如果获取不到就会陷入永久的wait直到被wakup()
// 对于第二点,此处提到了一个问题,A向B投递了一个阻塞任务task1后wait等待结果,此时别的线程
// 向A的队列投递了一个任务task1,投递的时候会有wakeup()的操作,那么上面检测ready的loop会把
// 这个wakeup()给吃掉,当任务完成时,由于wakeup被吃掉了,导致线程获取task得时候会陷入wait
// 无法及时处理task,(表述上确实如此,但代码上看似乎没有这样得问题,因为是先检测队列是否为空
// 再继续wait的)
current_thread->socketserver()->WakeUp();
}
} else {
// 非常google thread
done_event->Wait(rtc::Event::kForever);
}
}
先跟着主流程走看看这个PostTask()
函数, PostTask()
内部直接调用了POST()
, 在POST()
中把msg再次封装成一个rtc::message
然后投递到this线程的任务队列messages_
中,然后执行WakeUpSocketServer()
唤醒this线程消费任务,至此,任务的投递过程就完成了;
void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
// Though Post takes MessageData by raw pointer (last parameter), it still
// takes it with ownership.
Post(RTC_FROM_HERE, &queued_task_handler_,
/*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}
void Thread::Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
RTC_DCHECK(!time_sensitive);
if (IsQuitting()) {
delete pdata;
return;
}
// Keep thread safe
// Add the message to the end of the queue
// Signal for the multiplexer to return
{
CritScope cs(&crit_);
// 将QueueTask 封装成 rtc::message 放到message队列中
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
messages_.push_back(msg);
}
// 唤醒this线程消费任务
WakeUpSocketServer();
}
2.3.2 任务的消费
接下来看看task的消费流程,thread启动之后会运行Run()
然后运行ProcessMessages()
void Thread::Run() {
ProcessMessages(kForever);
}
bool Thread::ProcessMessages(int cmsLoop) {
// Using ProcessMessages with a custom clock for testing and a time greater
// than 0 doesn't work, since it's not guaranteed to advance the custom
// clock's time, and may get stuck in an infinite loop.
RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
cmsLoop == kForever);
int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
int cmsNext = cmsLoop;
while (true) {
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
Message msg;
// 获取消息
if (!Get(&msg, cmsNext))
return !IsQuitting();
// 分发处理
Dispatch(&msg);
if (cmsLoop != kForever) {
cmsNext = static_cast<int>(TimeUntil(msEnd));
if (cmsNext < 0)
return true;
}
}
}
在Thread::Get()
中会从消息队列messages_
获取消息,看起来很长,核心的只有几句:
遍历delay_messages_,获取到期消息并放入到messages_队列中
将messages_存在的消息取出,返回出去,如果没有,则break陷入阻塞直到被唤醒
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
// Return and clear peek if present
// Always return the peek if it exists so there is Peek/Get symmetry
if (fPeekKeep_) {
*pmsg = msgPeek_;
fPeekKeep_ = false;
return true;
}
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0;
int64_t msStart = TimeMillis();
int64_t msCurrent = msStart;
while (true) {
// Check for posted events
int64_t cmsDelayNext = kForever;
bool first_pass = true;
while (true) {
// All queue operations need to be locked, but nothing else in this loop
// (specifically handling disposed message) can happen inside the crit.
// Otherwise, disposed MessageHandlers will cause deadlocks.
{
CritScope cs(&crit_);
// On the first pass, check for delayed messages that have been
// triggered and calculate the next trigger time.
if (first_pass) {
first_pass = false;
// 遍历delay message到期消息
while (!delayed_messages_.empty()) {
if (msCurrent < delayed_messages_.top().run_time_ms_) {
cmsDelayNext =
TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
// 将到期消息移动到messages_队列中
messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
}
}
// Pull a message off the message queue, if available.
// 获取messages_任务
if (messages_.empty()) {
break;
} else {
*pmsg = messages_.front();
messages_.pop_front();
}
} // crit_ is released here.
// If this was a dispose message, delete it and skip it.
if (MQID_DISPOSE == pmsg->message_id) {
RTC_DCHECK(nullptr == pmsg->phandler);
delete pmsg->pdata;
*pmsg = Message();
continue;
}
return true;
}
if (IsQuitting())
break;
// Which is shorter, the delay wait or the asked wait?
int64_t cmsNext;
if (cmsWait == kForever) {
cmsNext = cmsDelayNext;
} else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
cmsNext = cmsDelayNext;
}
{
// 阻塞直到消息来
// Wait and multiplex in the meantime
if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
return false;
}
// If the specified timeout expired, return
msCurrent = TimeMillis();
cmsElapsed = TimeDiff(msCurrent, msStart);
if (cmsWait != kForever) {
if (cmsElapsed >= cmsWait)
return false;
}
}
return false;
}
当消息被获取出,就调用dispatch()
然后执行,至此,任务就被执行完成了
void Thread::Dispatch(Message* pmsg) {
TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
pmsg->posted_from.file_name(), "src_func",
pmsg->posted_from.function_name());
RTC_DCHECK_RUN_ON(this);
int64_t start_time = TimeMillis();
pmsg->phandler->OnMessage(pmsg);// 执行
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
if (diff >= dispatch_warning_ms_) {
RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
<< "ms to dispatch. Posted from: "
<< pmsg->posted_from.ToString();
// To avoid log spew, move the warning limit to only give warning
// for delays that are larger than the one observed.
dispatch_warning_ms_ = diff + 1;
}
}
调用QueuedTaskHandler::OnMessage()
,从msg->pdata中还原成QueueTask运行,然后release释放掉
void Thread::QueuedTaskHandler::OnMessage(Message* msg) {
RTC_DCHECK(msg);
//取出data 还原成queue task
auto* data = static_cast<ScopedMessageData<webrtc::QueuedTask>*>(msg->pdata);
std::unique_ptr<webrtc::QueuedTask> task = std::move(data->data());
// Thread expects handler to own Message::pdata when OnMessage is called
// Since MessageData is no longer needed, delete it.
delete data;
// 运行之后释放
// QueuedTask interface uses Run return value to communicate who owns the
// task. false means QueuedTask took the ownership.
if (!task->Run())
task.release();
}
2.3.3 执行结果的返回
上述task执行的时候涉及到两个非常重要的函数task->Run()
和 task.release()
:
task->Run()
会将最初的投递进来的函数运行然后存到result中,流程如下
首先构造QueueTask时的lambda
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
......
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); }, // <= Run()
[this, &ready, current_thread, done = done_event.get()] {
if (current_thread) {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp();
} else {
done->Set();
}
}));
......
}
msg.phandler->OnMessage(&msg)
中的phanlder是InvokeInternal
构造的FunctorMessageHandler,调用的override的OnMessage()函数
void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
"src_func", posted_from.function_name());
class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
: functor_(functor) {}
void OnMessage(Message* msg) override { functor_(); } // <= OnMessage()
private:
rtc::FunctionView<void()> functor_;
} handler(functor);
Send(posted_from, &handler);
}
functor_()
是Invoke中是将result = functor();
封装成的一个lambda,所以执行完成后的result会存在该result中;
template <
class ReturnT,
typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
ReturnT result;
InvokeInternal(posted_from, [functor, &result] { result = functor(); });
return result;
}
在之前投递任务的send函数中,当前线程PostTask()
后就开始current_thread->socketserver()->Wait(kForever, false)
, 陷入阻塞;
task.release()
则会唤醒投递任务后陷入wait的当前线程,让其将result返回
void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {
......
std::unique_ptr<rtc::Event> done_event;
if (!current_thread)
done_event.reset(new rtc::Event());
bool ready = false;
// 将msg封装达成QueueTask,放到线程队列中
PostTask(webrtc::ToQueuedTask(
[&msg]() mutable { msg.phandler->OnMessage(&msg); },
[this, &ready, current_thread, done = done_event.get()] { // <= task.release()
if (current_thread) {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp(); // 唤醒
} else {
done->Set();
}
}));
if (current_thread) {
// 当前的thread是google thread
bool waited = false;
crit_.Enter();
while (!ready) {
// 任务未执行完,阻塞等待到任务完成被唤醒
crit_.Leave();
current_thread->socketserver()->Wait(kForever, false); // epoll wait
waited = true;
crit_.Enter();
}
crit_.Leave();
// Our Wait loop above may have consumed some WakeUp events for this
// Thread, that weren't relevant to this Send. Losing these WakeUps can
// cause problems for some SocketServers.
//
// Concrete example:
// Win32SocketServer on thread A calls Send on thread B. While processing
// the message, thread B Posts a message to A. We consume the wakeup for
// that Post while waiting for the Send to complete, which means that when
// we exit this loop, we need to issue another WakeUp, or else the Posted
// message won't be processed in a timely manner.
if (waited) {
// socketserver有两个使用场景
// 1.像这种给别的线程投递了阻塞任务后,进行wait等到执行完毕
// 2.Thread::Get()函数中获取消息的时候,如果获取不到就会陷入永久的wait直到被wakup()
// 对于第二点,此处提到了一个问题,A向B投递了一个阻塞任务task1后wait等待结果,此时别的线程
// 向A的队列投递了一个任务task1,投递的时候会有wakeup()的操作,那么上面检测ready的loop会把
// 这个wakeup()给吃掉,当任务完成时,由于wakeup被吃掉了,导致线程获取task得时候会陷入wait
// 无法及时处理task,(表述上确实如此,但代码上看似乎没有这样得问题,因为是先检测队列是否为空
// 再继续wait的)
current_thread->socketserver()->WakeUp();
}
} else {
// 非常google thread
done_event->Wait(rtc::Event::kForever);
}
}
2.4 API类的异步代理类
signaling_thread_ 是用来处理Api层次任务的线程,类对外的接口会通过代理类,将内部的接口任务投递到signaling_thread_ 中;
比如类PeerConnection,在api\peer_connection_proxy.h
定义了其代理类如下
BEGIN_PROXY_MAP(PeerConnection)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, local_streams)
PROXY_METHOD0(rtc::scoped_refptr<StreamCollectionInterface>, remote_streams)
PROXY_METHOD1(bool, AddStream, MediaStreamInterface*)
PROXY_METHOD1(void, RemoveStream, MediaStreamInterface*)
PROXY_METHOD2(RTCErrorOr<rtc::scoped_refptr<RtpSenderInterface>>,
AddTrack,
rtc::scoped_refptr<MediaStreamTrackInterface>,
const std::vector<std::string>&)
.....
这样的代理类是通过api/proxy.h
的一组宏完成的的,这组宏的用法在文件有很详细的说明:
//
// Example usage:
// 1. 创建interface类
// class TestInterface : public rtc::RefCountInterface {
// public:
// std::string FooA() = 0;
// std::string FooB(bool arg1) const = 0;
// std::string FooC(bool arg1) = 0;
// };
//
// Note that return types can not be a const reference.
// 2.继承接口类,实现
// class Test : public TestInterface {
// ... implementation of the interface.
// };
//
// 3. 通过宏生命代理类
// BEGIN_PROXY_MAP(Test)
// PROXY_PRIMARY_THREAD_DESTRUCTOR()
// PROXY_METHOD0(std::string, FooA)
// PROXY_CONSTMETHOD1(std::string, FooB, arg1)
// PROXY_SECONDARY_METHOD1(std::string, FooC, arg1)
// END_PROXY_MAP()
//
// Where the destructor and first two methods are invoked on the primary
// thread, and the third is invoked on the secondary thread.
//
// The proxy can be created using
// 4.创建代理对象
// TestProxy::Create(Thread* signaling_thread, Thread* worker_thread,
// TestInterface*).
//
将PeerConnection的代理宏展开
template <class INTERNAL_CLASS>
class PeerConnectionProxyWithInternal;
typedef PeerConnectionProxyWithInternal<PeerConnectionInterface> PeerConnectionProxy;
// 代理类继承PeerConnectionInterface接口
template <class INTERNAL_CLASS> class PeerConnectionProxyWithInternal : public PeerConnectionInterface {
protected:
typedef PeerConnectionInterface C;
public:
const INTERNAL_CLASS* internal() const { return c_; }
INTERNAL_CLASS* internal() { return c_; }
protected:
PeerConnectionProxyWithInternal(rtc::Thread* primary_thread,
rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) :
primary_thread_(primary_thread), secondary_thread_(secondary_thread),
c_(PeerConnection) {}
private:
// 放入的两个线程
mutable rtc::Thread* primary_thread_;
mutable rtc::Thread* secondary_thread_;
protected:
~PeerConnectionProxyWithInternal() {
MethodCall<PeerConnectionProxyWithInternal, void> call( this, &PeerConnectionProxyWithInternal::DestroyInternal);
call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 28), destructor_thread());
}
private:
void DestroyInternal() { c_ = nullptr; }
rtc::scoped_refptr<INTERNAL_CLASS> c_;
public:
// 创建代理对象静态方法
static rtc::scoped_refptr<PeerConnectionProxyWithInternal>
Create( rtc::Thread* primary_thread, rtc::Thread* secondary_thread, INTERNAL_CLASS* PeerConnection) {
return new rtc::RefCountedObject<PeerConnectionProxyWithInternal>( primary_thread, secondary_thread, PeerConnection);
}
private:
rtc::Thread* destructor_thread() const { return primary_thread_; }
public:
// local_streams的代理方法
rtc::scoped_refptr<StreamCollectionInterface> local_streams() override {
MethodCall<C, rtc::scoped_refptr<StreamCollectionInterface>> call(c_, &C::local_streams);
return call.Marshal(::rtc::Location(__FUNCTION__, "E:\\git\\webrtc\\webrtc-checkout\\src\\api\\peer_connection_proxy.h", 30), primary_thread_);
}
代理类创建的时候可以放入两个目的线程primary_thread_
和secondary_thread_
,用来提供给代理方法使用。
local_streams()
的代理方法是通过宏PROXY_METHOD0()
创建的,该宏会创建一个MethodCall<> call
封装要执行的函数local_streams()
,然后通过call.Marshal()
将任务投递到primary_thread_
Marshal()
方法如下所示:
R Marshal(const rtc::Location& posted_from, rtc::Thread* t) {
if (t->IsCurrent()) {
// 是当前线程,Invoke
Invoke(std::index_sequence_for<Args...>());
} else {
// 不是则PostTask 然后阻塞等
t->PostTask(std::unique_ptr<QueuedTask>(this));
event_.Wait(rtc::Event::kForever);
}
return r_.moved_result();
}