webrtc 线程设计

webrtc 线程设计

前言

本文主要对webrtc框架使用到的三种线程类进行分析,根据下面两方面去讨论:

  1. 使用场景(webrtc那些模块使用)
  2. 接口设计

1. webrtc::ProcessThread

1.1 使用场景

webrtc::ProcessThread 在 modules\utility\include 路径下,它其实本质上是一个定时器的作用,用于周期性的调度业务模块,比如pacing,发送RTCP报文和NACK重传包发送等模块。这些业务特点都是需要周期性的检测。

1.2 接口设计

webrtc::ProcessThread 是对外提供的接口类,真正的实现在ProcessThreadImpl

class ProcessThreadImpl : public ProcessThread {
 public:
  explicit ProcessThreadImpl(const char* thread_name);
  ~ProcessThreadImpl() override;

  void Start() override;
  void Stop() override;

  void WakeUp(Module* module) override;
  void PostTask(std::unique_ptr<QueuedTask> task) override;
  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
                       uint32_t milliseconds) override;

  void RegisterModule(Module* module, const rtc::Location& from) override;
  void DeRegisterModule(Module* module) override;

 protected:
  static void Run(void* obj);
  bool Process();

 private:
  struct ModuleCallback {
    ModuleCallback() = delete;
    ModuleCallback(ModuleCallback&& cb) = default;
    ModuleCallback(const ModuleCallback& cb) = default;
    ModuleCallback(Module* module, const rtc::Location& location)
        : module(module), location(location) {}
    bool operator==(const ModuleCallback& cb) const {
      return cb.module == module;
    }

    Module* const module;
    int64_t next_callback = 0;  // Absolute timestamp.
    const rtc::Location location;

   private:
    ModuleCallback& operator=(ModuleCallback&);
  };
  struct DelayedTask {
    DelayedTask(int64_t run_at_ms, std::unique_ptr<QueuedTask> task)
        : run_at_ms(run_at_ms), task(task.release()) {}
    friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) {
      // Earliest DelayedTask should be at the top of the priority queue.
      return lhs.run_at_ms > rhs.run_at_ms;
    }

    int64_t run_at_ms;
    // DelayedTask owns the |task|, but some delayed tasks must be removed from
    // the std::priority_queue, but mustn't be deleted. std::priority_queue does
    // not give non-const access to the values, so storing unique_ptr would
    // delete the task as soon as it is remove from the priority queue.
    // Thus lifetime of the |task| is managed manually.
    QueuedTask* task;
  };
  typedef std::list<ModuleCallback> ModuleList;

  void Delete() override;

  // Warning: For some reason, if |lock_| comes immediately before |modules_|
  // with the current class layout, we will  start to have mysterious crashes
  // on Mac 10.9 debug.  I (Tommi) suspect we're hitting some obscure alignemnt
  // issues, but I haven't figured out what they are, if there are alignment
  // requirements for mutexes on Mac or if there's something else to it.
  // So be careful with changing the layout.
  rtc::RecursiveCriticalSection
      lock_;  // Used to guard modules_, tasks_ and stop_.

  rtc::ThreadChecker thread_checker_;
  rtc::Event wake_up_;
  // TODO(pbos): Remove unique_ptr and stop recreating the thread.
  std::unique_ptr<rtc::PlatformThread> thread_;

  ModuleList modules_;
  std::queue<QueuedTask*> queue_;
  std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(lock_);
  bool stop_;
  const char* thread_name_;
};

为了节省资源,一般是许多模块共用一个webrtc::ProcessThread,提供了RegisterModule函数,注册相关的模块,只要调度的业务按照 class Module 实现接口就行了。

Module 有两个主要的接口,TimeUntilNextProcess() 这是返回当前模块需要调度的时间点,单位是毫秒;Process() 是模块实现具体的业务。

class Module {
 public:
  // Returns the number of milliseconds until the module wants a worker
  // thread to call Process.
  // This method is called on the same worker thread as Process will
  // be called on.
  // TODO(tommi): Almost all implementations of this function, need to know
  // the current tick count.  Consider passing it as an argument.  It could
  // also improve the accuracy of when the next callback occurs since the
  // thread that calls Process() will also have it's tick count reference
  // which might not match with what the implementations use.
  virtual int64_t TimeUntilNextProcess() = 0;

  // Process any pending tasks such as timeouts.
  // Called on a worker thread.
  virtual void Process() = 0;

  // This method is called when the module is attached to a *running* process
  // thread or detached from one.  In the case of detaching, |process_thread|
  // will be nullptr.
  //
  // This method will be called in the following cases:
  //
  // * Non-null process_thread:
  //   * ProcessThread::RegisterModule() is called while the thread is running.
  //   * ProcessThread::Start() is called and RegisterModule has previously
  //     been called.  The thread will be started immediately after notifying
  //     all modules.
  //
  // * Null process_thread:
  //   * ProcessThread::DeRegisterModule() is called while the thread is
  //     running.
  //   * ProcessThread::Stop() was called and the thread has been stopped.
  //
  // NOTE: This method is not called from the worker thread itself, but from
  //       the thread that registers/deregisters the module or calls Start/Stop.
  virtual void ProcessThreadAttached(ProcessThread* process_thread) {}

 protected:
  virtual ~Module() {}
};

2、rtc::TaskQueue

2.1 使用场景

rtc::TaskQueue 在 rtc_base\task_queue 下,它是一个异步处理任务线程模型,使用者继承QueuedTask 实现Run接口,投递到TaskQueue中,TaskQueue就会自动调度任务处理。这个线程模型在webrtc中使用广泛,如webrtc的音频和视频的编码线程就是采用这个线程调度模型

2.2 使用接口

PostTask 投递任务
PostDelayedTask 投递任务,但任务设定在milliseconds 后执行

class RTC_LOCKABLE RTC_EXPORT TaskQueue {
public:
 // TaskQueue priority levels. On some platforms these will map to thread
 // priorities, on others such as Mac and iOS, GCD queue priorities.
 using Priority = ::webrtc::TaskQueueFactory::Priority;

 explicit TaskQueue(std::unique_ptr<webrtc::TaskQueueBase,
                                    webrtc::TaskQueueDeleter> task_queue);
 ~TaskQueue();

 // Used for DCHECKing the current queue.
 bool IsCurrent() const;

 // Returns non-owning pointer to the task queue implementation.
 webrtc::TaskQueueBase* Get() { return impl_; }

 // TODO(tommi): For better debuggability, implement RTC_FROM_HERE.

 // Ownership of the task is passed to PostTask.
 void PostTask(std::unique_ptr<webrtc::QueuedTask> task);

 // Schedules a task to execute a specified number of milliseconds from when
 // the call is made. The precision should be considered as "best effort"
 // and in some cases, such as on Windows when all high precision timers have
 // been used up, can be off by as much as 15 millseconds (although 8 would be
 // more likely). This can be mitigated by limiting the use of delayed tasks.
 void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
                      uint32_t milliseconds);

 // std::enable_if is used here to make sure that calls to PostTask() with
 // std::unique_ptr<SomeClassDerivedFromQueuedTask> would not end up being
 // caught by this template.
 template <class Closure,
           typename std::enable_if<!std::is_convertible<
               Closure,
               std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
 void PostTask(Closure&& closure) {
   PostTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)));
 }

 // See documentation above for performance expectations.
 template <class Closure,
           typename std::enable_if<!std::is_convertible<
               Closure,
               std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
 void PostDelayedTask(Closure&& closure, uint32_t milliseconds) {
   PostDelayedTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)),
                   milliseconds);
 }

private:
 webrtc::TaskQueueBase* const impl_;

 RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
};

rtc::TaskQueue有两个具体的实现类,一个是TaskQueueStdlib
路径是rtc_basc\task_queue_stdlib.h,非window的平台采用这个实现,采用了一个队列实现任务的分发处理;

一个是TaskQueueWin(rtc_basc\task_queue_win.h),TaskQueueWin是window下专有的,采用window的消息机制实现任务的分发处理。

3、rtc::Thread

3.1 使用场景

在创建peerconnectionfactory 的时候,如下图所示会设置对应的rtc::Thread 线程:

RTC_EXPORT rtc::scoped_refptr<PeerConnectionFactoryInterface>
CreatePeerConnectionFactory(
   rtc::Thread* network_thread,
   rtc::Thread* worker_thread,
   rtc::Thread* signaling_thread,
   rtc::scoped_refptr<AudioDeviceModule> default_adm,
   rtc::scoped_refptr<AudioEncoderFactory> audio_encoder_factory,
   rtc::scoped_refptr<AudioDecoderFactory> audio_decoder_factory,
   std::unique_ptr<VideoEncoderFactory> video_encoder_factory,
   std::unique_ptr<VideoDecoderFactory> video_decoder_factory,
   rtc::scoped_refptr<AudioMixer> audio_mixer,
   rtc::scoped_refptr<AudioProcessing> audio_processing,
   AudioFrameProcessor* audio_frame_processor = nullptr);

}  // namespace webrtc

下面这些是来自WebRTC 的线程模型的引用内容

signaling_thread:
一般是工作在 PeerConnection 层,主要是完成控制平面的逻辑,用于和应用层交互。比如,CreateOffer,SetRemoteSession 等接口都是通过 Signal threa 完成的。默认是采用 PeerConnectionFactory 初始化线程作为信令线程

network_thread 会创建带有socket的线程:

  1. Transport 的初始化
  2. 从网络接收数据,发送给 Worker thread
  3. 从 Worker thread 接收数据,发送到网络

worker_thread 不带有socket的线程,处理以下等业务:

  1. 音频设备初始化
  2. 视频设备初始化
  3. 流对象的初始化
  4. 从网络线程接收数据,传给解码器线程
  5. 从编码器线程接收数据,传给网络线程

3.2 接口设计

接口设计参看下面这篇文章:rtc::thread 设计

4. 引用文章

https://zhuanlan.zhihu.com/p/136070941
https://blog.csdn.net/xiaomucgwlmx/article/details/103287398

上一篇:【OBS-WEBRTC】obs-output 集成owt 采坑


下一篇:《项目经理指导手册》监控篇1,职能指标