Chomium MessageLoop实现原理

Chromium Message Loop

主要类和功能

Chomium MessageLoop实现原理

MessagePump(LibEvent)
如果是UI线程和IO线程会使用MessagePumpLibEvent 实现,其他线程则使用MessageDefault. 前者利用libEvent 的事件驱动特性,后者使用event 信号来控制线程阻塞和唤醒。

当线程启动时,线程的入口函数ThreadMain会创建并启动 MessagePump, 开始任务轮转,该类会不断的向ThreadControllerWithMessagePumpImpl 发出DoWork指令,驱动其向任务队列获取任务。当队列中仅剩延时任务或者有新入队的任务时,MessagePump 会在正确的时间将自己唤醒。

在MessagePumpLibEvent::Init 时会创建一对匿名管道,分别为 input 端和 output 端。当储存的任务全部完成时,MeesagePump 会利用 Libevent 监听 output 端是否可读,此时该线程会处于阻塞的状态。当其他有其他线程向该线程抛任务时,且检测到该线程正处于阻塞状态时,会向input端写入一个字节,由于libEvent事件驱动的特性,线程被唤醒重新执行调度。定时任务也是同样方案,当检测到当前只剩延时任务时,MessagePump会计算下一个任务还需要多长时间,创建TimeEvent,并且利用libevent将自己阻塞,等待定时完成。
MessagePumpDefault是通过 event信号量来实现线程阻塞和启动的,两者有什么区别呢?Chromium原文注释是

  // This type of pump only supports tasks and timers.
  DEFAULT,

  // This type of pump also supports native UI events (e.g., Windows
  // messages).
  UI,
  
  // This type of pump also supports asynchronous IO.
  IO,

这样看来,使用LibEvent的方式可能是为了潜在的跨进程唤醒。


ThreadControllerWithMessagePumpImpl

主要负责任务队列 SequenceManagerImpl 和 messagePump 之间的交互,负责向 SequenceManagerImpl 获取一批任务,然后在通过 TaskAnnotator 执行该任务,TaskAnnotator的作用是计算和处理一些堆栈信息和debug信息,以便打印堆栈和调试时可以查看任务执行过程。

在一批任务执行完成之后,还需要向 SequenceManagerImpl 请求下一个任务的状态 并返回给MessagePump (立即执行/延时/不存在下一个任务), MessagePump会根据这个返回值决定继续轮转还是阻塞。
另外这还是一个中转类,被包括RunLoop在内的大多数类持有,任务调度的大部分流程都有参与。


RunLoop 使用base::Thread 创建一个线程成功之后,该线程的入口函数MainThread 会创建 RunLoop对象,持有 ThreadControllerWithMessagePumpImpl, 负责启动线程和MessageLoop,线程完成后的回收等。

SequenceManagerImpl 负责任务的储存和调度,是核心类之一。 其成员 active_queues_ 中保存了至少一个 TaskQueue, 默认存在一个 default_tq (线程初始化时由SeqeuenceManagerThreadDelegate 调度创建)。每个task_queue都会设定一个优先级,优先级影响调度策略,目前仅在BrowserTaskQueue看到多个优先级的任务队列。
当 向SequenceManagerImpl 请求下一个任务时,会创建 TaskQueueSelector 对象,该对象负责从 active_queue_ 中选取一个WorkQueue,并从WorkQueue中获取一个任务。

TaskQueueSelector 由SequenceMangerImpl 持有,负责任务队列 WorkQueue的选取。
其包含两个work_queue_sets(immediate 和 delay), 里面存放所有TaskQueue持有的workQuque,并且按照优先级建立映射,以便获取任务时计算优先级。


TaskQueue(Impl) 管理任务的入队和出队,通过CreateTaskRunner 创建并暴露task_runner对象,其他线程可以通过静态方法获取该线程的TaskRunner并向该线程抛任务。

Chomium MessageLoop实现原理

每个 TaskQueueImpl 中都持有了两个对象 : MainThreadOnly 和 AnyThread,这两个对象分别表示当前线程可访问和任意线程可访问。
其中 MainThreadOnly 持有 delayed_work_queue 和 immediate_work_queue 两个WorkQueue类型的任务队列,注意这里的 delayed_work_queue 存放的实际上都是到期的延时任务,因此可以直接调度。MainThreadOnly 中还存在一个优先队列 delayed_incoming_queue, 延时任务都会push到这里,优先队列可以保证每次取出的都是最延时最小的任务。

AnyThread 中仅存在TaskDeque类型的 immediate_incoming_queue。
每当SequenceMnagerImpl 通过 SelectNextTaskImpl 获取任务时,会通过标志位来检查所有的TaskQuque.main_thread_only. immediate_woek_queue_是否为空,然后会 使用 TakeTaskFromWorkQueue 将any_thread中的即时任务全部移动过来。

另外 SelectNextTaskImpl 还会调用 MoveReadyDelayedTasksToWorkQueues 检查 delayed_incoming_queue 中的任务是否到期,并移动到 delayed_work_queue 中,以便在下一个loop时挑选

TaskRunner 用于抛任务, 当抛出一个immediate任务时会先保存在 any_thread_.immediate_incoming_queue里。当抛出一个延时任务时, 如果从当前线程抛出, 会直接push到MainThreadOnly.delayed_incoming_queue里,如果从其他线程抛出,会新建一个特殊的immediate任务,让当前线程执行调度,该任务的工作就是将task 放到 delayed_incoming_queue 中。
这里执行一个额外步骤的原因:在设计中 MainThreadOnly 仅有主线程调用,而Anythread任意线程都可以调度,当其他线程需要抛出延时任务时,直接push到MainThreadOnly.delayed_incoming_queue 是线程不安全的,因此需要新建一个临时任务让线程自己完成push的操作。


初始化流程

对于子线程来说,在创建时就会自动绑定RunLoop,MessagePump等必要的类,对于主线程来说,由于不是base::thread直接创建出来的,需要手动进行绑定,我们只看一下子线程的情况。

  1. 通过Thread::StartWithOptions 创建一个线程,会创建SequenceManagerThreadDelegate 对象,绑定线程ID。并依次创建 SequenceManagerImpl, TaskRunnerImpl, TaskRunner 等对象,最后调用posix平台的pthread_create 完成真正的线程创建。
  2. 线程创建之后的入口函数为ThreadMain, 会首先调用 SequenceManagerThreadDelegate::BIndToCurrentThread方法创建并绑定MessagePump, 然后创建RunLoop对象,并调用Runloop的Run方法,会通过ThreadControllerWithMessagePumpImpl 最终调用到 MessagePump 的 Run 方法开启循环。

抛任务

如 TaskQueue(Impl) 中所述,会根据是否是延迟任务以及源线程是不是当前线程有不同的策略。为了容易理解,下面简化和折叠了大量的代码。

void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,
                                          CurrentThread current_thread) {
.......

    base::internal::CheckedAutoLock lock(any_thread_lock_); // 获取锁
.......
    any_thread_.immediate_incoming_queue.push_back(Task(
        std::move(task), delayed_run_time, sequence_number, sequence_number));  //统一放在Any_thread_中
.......
}


void TaskQueueImpl::PostDelayedTaskImpl(PostedTask task,
                                        CurrentThread current_thread) {
.... 
  if (current_thread == CurrentThread::kMainThread) {   
 	 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
  } else {
 	  PostImmediateTaskImpl(  PostedTask(std::move(task_runner),
                 BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
                          Unretained(this), std::move(pending_task)),
                 FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),
      CurrentThread::kNotMainThread);
 }
 ......
 }
 
// post了一个 “push 延迟任务”的 临时任务, 该任务在当前线程调度,将任务push到 main_thread_only , 当前线程不需要锁
 
 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
.....
  if (delayed_run_time <= time_domain_now) {  // 已超时,直接放 work_queue 中
    main_thread_only().delayed_work_queue.push(std::move(pending_task));
  } else {                                                                                  // 未超时
 	 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
  }
......
}
 

调度任务

  1. MessagePump 由于定时器或者新抛入任务而被唤醒,向 ThreadControllerWithMessagePumpImpl 发出执行任务的指令
void MessagePumpLibevent::Run(Delegate* delegate) {
for (;;) {
    Delegate::NextWorkInfo next_work_info = delegate->DoWork();  // 驱动任务调度,并获取下一个任务的状态
    bool immediate_work_available = next_work_info.is_immediate();  
    bool attempt_more_work = immediate_work_available || processed_io_events_;

    if (attempt_more_work)  // 如果仍有即时任务,继续调度
      continue;

    attempt_more_work = delegate->DoIdleWork(); //执行Idel任务

    if (!next_work_info.delayed_run_time.is_max()) { // 如果是延迟任务,设定定时器
      const TimeDelta delay = next_work_info.remaining_delay();
      struct timeval poll_tv;
      poll_tv.tv_sec = delay.InSeconds();
      poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
      event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
      event_base_set(event_base_, timer_event.get());   
      event_add(timer_event.get(), &poll_tv);
      did_set_timer = true;
    }
    event_base_loop(event_base_, EVLOOP_ONCE); // 如果接下来没有任务,监听output 阻塞当前线程

  }
}

  1. ThreadControllerWithMessagePumpImpl::DoWorkImpl 向 SequenceManagerImpl 请求一批任务,由参数 work_batch_size 表示一次 MessagePump 的调度可以执行多少个任务,目前CrRenderMain/Compositor线程该参数是4, 其他线程为1。然后使用TaskAnnotator 执行该任务,TaskAnnotator 能处理堆栈和trace等debug信息,使栈帧能够按照顺序链接起来。
    执行完一批任务之后, 还需要通过SequenceManagerImpl::DelayTillNextTask 接口获取下一个任务的延时,并返回给MessagePump。

  2. 获取任务
    SequenceManagerImpl::SelectNextTaskImpl 首先通过 ReloadEmptyWorkQueues 检查每个 TaskQueue 持有的 mainThreadOnly对象中 immediate_work_queue 是否为空,如果为空,会从对应的anythread中移动过来最新一批任务。
    ( 每个TaskQueueImpl 构造时,会创建一个 empty_queues_to_reload_handle_,在 SequenceManagerImpl 中注册一个 atomicFlag 并且绑定一个回调 。 当通过 TaskQueueImpl::PostImmediateTaskImpl 将新任务入队时,会通过Anythread_中保存的镜像标志位 immediate_work_queue_empty 知道 MainThreadOnly中队列状态,如果为空,会将注册的atomicflag置位为 true. 当从workqueue中获取了最后一个任务时,也会将该标志置true.
    获取任务时,ReloadEmptyWorkQueues 会调用 RunActiveCallbacks 寻找标志位为true的 taskQuque, 并执行之前绑定的回调 TaskQueueImpl::ReloadEmptyImmediateWorkQueue, 将对应的anyThread 中保存的 immediate_work_queue 全部移动 MainThread中。)

  3. 接下来调用 MoveReadyDelayedTasksToWorkQueues 检查所有 TaskQueue 中的延迟任务队列 delayed_incoming_queue 里 ,如果任务到期,则移动到对应的 delayed_work_queue中。

  4. 任务更新完成之后,会使用 TaskQueueSelector 从所有 taskQueue 中挑选出一个workqueue, 当获取的queue是延时任务队列时,immediate_starvation_count_计数加一,表示即时任务队列饥饿,当计数器超过 kMaxDelayedStarvationTasks=3 时,下次选择必定是immediate队列。

    关于挑选taskQueue的策略,首先是根据优先级从work_queue_sets中选取一组候选的workQueue,
    另外每个 workqueue 还维护一个最早入队的任务的时间戳, TaskQueueSelector 会挑选存在最早任务的 workqueue 作为结果返回。
    至此,任务选择完毕返回给 ThreadControllerWithMessagePumpImpl 执行。

后记

  1. chromium 中还提供一种可撤回的 delayTask, 使用 PostDelayedCancellableTask 抛出。其实现原理是在原有的task 上包装一个 TaskHandle::Runner, 并生成 TaskHandle 返回给调用者,而抛出的任务使用的是weakPtr指针。 当调用者需要撤销任务,调用cancel方法时,会调用计数指针的 InvalidateWeakPtrs方法, 将保存在 TaskQueue 中 的 WeakPtr 无效化,达到撤销任务的目的。 TaskHandle本身 使用 WFT ThreadSafeRefCounted 模板,因此无需考虑线程安全问题

  2. 待分析: 可重入问题, MessagePump的Run是可重入的,流程中也经常可以看到 Nestable::kNonNestable, nesting_depth类似的标识,但是没有找到重入MessagePump的入口和完整流程。

上一篇:9.14.10-PointersOnC-20220302


下一篇:【linux驱动基础】linux工作队列work_struct,delayed_work的使用