Thread pool in chromium
本文来尝试分析 chromium 中线程池的实现与一些应用.
从任务发起出发
首先来看从最上层接口 TaskRunner PostTask, 到任务开始运行经历的流程.
从时序图可见从任务发起到真正执行, 经历了多个类, 多重步骤, 我们需要理解这些类在调用流程的作用和它们的生命周期, 才能明白 chromium 的设计意图, 以及要达到什么目的.
为了简化理解, 将类分组,
- TaskRunner
- ThreadPoolImpl, TaskTracker, TaskAnnotation. TaskTracker 持有 TaskAnnotation, 又被 ThreadPoolImpl 持有, 它们的声明周期相同
- ThreadGroup, ThreadGroupImpl. ThreadGroup 是 ThreadPool 持有线程的一个子集. ThreadPool 肯定会有一个前端 ThreadGroup, 根据启动参数创建后端 ThreadGroup, 且后端可根据配置切换到 ThreadGroupNative 实现, 前端只能是 ThreadGroupImpl 实现.
GetThreadGroupForTraits
来通过任务优先级选择 ThreadGroup, 较低优先级的任务会尝试放到后端. - WorkerThread, WorkerThreadDelegateImpl. 工作线程, 和工作线程与 ThreadGroup 沟通的桥梁.
我们根据调用数据来看实际的代码, 需要注意的是线程池的接口许多都是多线程访问, 因此要考虑线程安全问题.
// base/task/thread_pool/pooled_parallel_task_runner.cc:21
bool PooledParallelTaskRunner::PostDelayedTask(const Location& from_here,
OnceClosure closure,
TimeDelta delay) {
// pooled_task_runner_delegate_ 就是 ThreadPoolImpl, 这个判断为测试服务, 因此不用关心
if (!PooledTaskRunnerDelegate::MatchesCurrentDelegate(
pooled_task_runner_delegate_)) {
return false;
}
// 创建 一次性的单任务 Sequence.
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
traits_, this, TaskSourceExecutionMode::kParallel);
{
CheckedAutoLock auto_lock(lock_);
sequences_.insert(sequence.get());
}
return pooled_task_runner_delegate_->PostTaskWithSequence(
Task(from_here, std::move(closure), TimeTicks::Now(), delay),
std::move(sequence));
}
这里说明几个数据类:
traits_ : TaskTraits, 它标识了 TaskRunner 发出 Task 的元信息, 包括
TaskPriority, TaskShutdownBehavior, ThreadPolicy, MayBlock, WithBaseSyncPrimitive
Sequence : 继承了 TaskSource, 通过 Transaction 机制存储 Task, 底层是 queue, 提供任务执行的先后顺序的可能性.
Task : 存储任务的信息 创建时包括 任务的发起点, 要执行的闭包, 任务的已经等待时间和延迟时间.
现在我们创建好了需要的 Task 和 Sequence, 继续执行下一个方法.
// base/task/thread_pool/thread_pool_impl.cc:480
bool ThreadPoolImpl::PostTaskWithSequence(Task task,
scoped_refptr<Sequence> sequence) {
CHECK(task.task);
DCHECK(sequence);
if (!task_tracker_->WillPostTask(&task, sequence->shutdown_behavior()))
return false;
if (task.delayed_run_time.is_null()) {
return PostTaskWithSequenceNow(std::move(task), std::move(sequence));
} else {
// It's safe to take a ref on this pointer since the caller must have a ref
// to the TaskRunner in order to post.
scoped_refptr<TaskRunner> task_runner = sequence->task_runner();
delayed_task_manager_.AddDelayedTask(
std::move(task),
BindOnce(
[](scoped_refptr<Sequence> sequence,
ThreadPoolImpl* thread_pool_impl, Task task) {
thread_pool_impl->PostTaskWithSequenceNow(std::move(task),
std::move(sequence));
},
std::move(sequence), Unretained(this)),
std::move(task_runner));
}
return true;
}
首先在 task_tracker_ 做检测, 是否当前处于 shut_down 状态, 然后在 task_annotator_ 填充 task 的栈帧(用于crash时在core dump中显示.)
之后检测task的延迟时间, 我们只考虑没有延迟的情况.
// base/task/thread_pool/thread_pool_impl.cc:418
bool ThreadPoolImpl::PostTaskWithSequenceNow(Task task,
scoped_refptr<Sequence> sequence) {
auto transaction = sequence->BeginTransaction();
const bool sequence_should_be_queued = transaction.WillPushTask();
RegisteredTaskSource task_source;
if (sequence_should_be_queued) {
task_source = task_tracker_->RegisterTaskSource(sequence);
// We shouldn't push |task| if we're not allowed to queue |task_source|.
if (!task_source)
return false;
}
if (!task_tracker_->WillPostTaskNow(task, transaction.traits().priority()))
return false;
transaction.PushTask(std::move(task));
if (task_source) {
const TaskTraits traits = transaction.traits();
GetThreadGroupForTraits(traits)->PushTaskSourceAndWakeUpWorkers(
{std::move(task_source), std::move(transaction)});
}
return true;
}
首先开启一个 transaction, 它用于维护 sequence 的状态, 会在构造函数对 sequence 加锁, 析构函数解锁.transaction.WillPushTask()
返回 sequence 的 queue 为空且未与 WorkerThread 绑定, 在我们的情况下为真, 因为 sequence 被创建后是首次进入 PostTaskWithSequenceNow
函数.
之后我们通过 task_tracker_->RegisterTaskSource(sequence)
生成一个 RegisteredTaskSource. 在这里还是需要检查 shut_down(多线程访问的架构下, 任何时间都又可能进入 shut_down 状态), 然后增加未完成的 task_source 计数, 然后生成 RegisteredTaskSource, 即 task_source 的一个 RAII warper .task_tracker_->WillPostTaskNow
检查状态transaction.PushTask(std::move(task))
将任务加入到 queue
最后找到 traits 对应的 ThreadGroup
来发起 PushTaskSourceAndWakeUpWorkers
, 这里我们假设走前端的 ThreadGroupImpl.
void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
TransactionWithRegisteredTaskSource transaction_with_task_source) {
ScopedCommandsExecutor executor(this);
PushTaskSourceAndWakeUpWorkersImpl(&executor,
std::move(transaction_with_task_source));
}
void ThreadGroup::PushTaskSourceAndWakeUpWorkersImpl(
BaseScopedCommandsExecutor* executor,
TransactionWithRegisteredTaskSource transaction_with_task_source) {
CheckedAutoLock auto_lock(lock_);
DCHECK(!replacement_thread_group_);
DCHECK_EQ(delegate_->GetThreadGroupForTraits(
transaction_with_task_source.transaction.traits()),
this);
if (transaction_with_task_source.task_source->heap_handle().IsValid()) {
// If the task source changed group, it is possible that multiple concurrent
// workers try to enqueue it. Only the first enqueue should succeed.
executor->ScheduleReleaseTaskSource(
std::move(transaction_with_task_source.task_source));
return;
}
auto sort_key = transaction_with_task_source.task_source->GetSortKey(
disable_fair_scheduling_);
priority_queue_.Push(std::move(transaction_with_task_source.task_source),
sort_key);
EnsureEnoughWorkersLockRequired(executor);
}
// base/task/thread_pool/thread_group_impl.cc:173
void FlushImpl() {
CheckedLock::AssertNoLockHeldOnCurrentThread();
// Wake up workers.
workers_to_wake_up_.ForEachWorker(
[](WorkerThread* worker) { worker->WakeUp(); });
// Start workers. Happens after wake ups to prevent the case where a worker
// enters its main function, is descheduled because it wasn't woken up yet,
// and is woken up immediately after.
workers_to_start_.ForEachWorker([&](WorkerThread* worker) {
worker->Start(outer_->after_start().worker_thread_observer);
if (outer_->worker_started_for_testing_)
outer_->worker_started_for_testing_->Wait();
});
if (must_schedule_adjust_max_tasks_)
outer_->ScheduleAdjustMaxTasks();
if (!scheduled_histogram_samples_->empty()) {
DCHECK_LE(scheduled_histogram_samples_->size(),
kHistogramSampleStackSize);
for (auto& scheduled_sample : scheduled_histogram_samples_)
scheduled_sample.first->Add(scheduled_sample.second);
scheduled_histogram_samples_->clear();
}
}
ScopedCommandsExecutor : 会在析构函数调用 FlushImpl()
, 我们最后再来看这个函数PushTaskSourceAndWakeUpWorkersImpl
在加锁后首先判断 task_source->heap_handle
, 这个值会在TaskSource被加入到优先队列时赋值, 即判断是否task_source是否已经被加入.(这里Transaction对象虽然持有了task_source的锁,但并没有防止在ReEnqueueTaskSourceLockRequired
函数被中加入优先队列, 该函数会在后面继续提到), 如果已经在优先队列, 用executor获取task_source的所有权, 这是为了让Transaction正确的销毁后,再去销毁task_source, 这里会产生的行为包括修改引用计数, 未完成的task_source计数等.
否则将 task_source 加入到优先队列中, 然后EnsureEnoughWorkersLockRequired
会选择需要唤醒的worker加入到executor中, 这里涉及到WorkerThread的动态变化问题, 后续再详细分析, 先假设一个空闲的WorkerThread被加入.
FlushImpl
: 我们关注前两个调用, 首先Assert现在没有锁被持有, 因为ThreadGroup的锁和TaskSource的锁在上个函数结束时都被自动释放了. 其次唤醒需要运行 task 的 WorkerThread.
运行到这里就task发起线程的流程就完成,对ThreadPool做一个小总结, 它负责启动/停止线程池, 创建各种不同类型的TaskRunner, 允许Task有不同的优先级、运行策略和延迟. 它是进程单例的. 接下来继续对 WorkerThread 对部分进行分析.
WorkerThread
线程池中的工作线程, 它不同于 base/threading/thread, 是专门为线程池和TaskRunner的需求做的抽象, 逻辑更为紧密, 接口也更简单.
来看其如何运行的, 首先由 ThreadGroup 动态创建, 之后调用 Start() 启动, 真正的线程由 PlatformThread 创建并启动(POSIX/WIN 两种实现), 然后调回 RunWorker()
Start(worker_thread_observer);
PlatformThread::CreateWithPriority(kDefaultStackSize, this, &thread_handle_,
current_thread_priority_);
ThreadMain();
Run[Background]<ThreadLabel>Worker(); // 从调用栈里显示线程的类型
RunWorker();
// base/task/thread_pool/worker_thread.cc
void WorkerThread::RunWorker() {
delegate_->OnMainEntry(this);
// A WorkerThread starts out waiting for work.
delegate_->WaitForWork(&wake_up_event_);
while (!ShouldExit()) {
UpdateThreadPriority(GetDesiredThreadPriority());
// Get the task source containing the next task to execute.
RegisteredTaskSource task_source = delegate_->GetWork(this);
if (!task_source) {
// Exit immediately if GetWork() resulted in detaching this worker.
if (ShouldExit())
break;
delegate_->WaitForWork(&wake_up_event_);
continue;
}
task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
delegate_->DidProcessTask(std::move(task_source));
// Calling WakeUp() guarantees that this WorkerThread will run Tasks from
// TaskSources returned by the GetWork() method of |delegate_| until it
// returns nullptr. Resetting |wake_up_event_| here doesn't break this
// invariant and avoids a useless loop iteration before going to sleep if
// WakeUp() is called while this WorkerThread is awake.
wake_up_event_.Reset();
}
// Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
// after invoking OnMainExit().
delegate_->OnMainExit(this);
// Release the self-reference to |this|. This can result in deleting |this|
// and as such no more member accesses should be made after this point.
self_ = nullptr;
}
在整体的流程上, 该线程循环也符合一般的想法, 在检查 ShouldExit 循环中, GetWork 尝试获取一个 RegisteredTaskSource , 如果没有拿到, 就等待, 否则交给 task_tracker_ 运行 RunAndPopNextTask , 然后做 DidProcessTask. delegate_ 是 WorkerThreadDelegateImpl 对象, 它是 WorkerThread 和 ThreadGroupImpl 通信的桥梁.
OnMainEntry/OnMainExit 还是做一些检查, 不会改变状态, WaitForWork 的第一次调用显得奇怪, 为什么不直接进入 loop 中, 而要额外做一次等待呢? 这里应该还是与状态相关, 我们首先来看 GetWork
// base/task/thread_pool/thread_group_impl.cc
RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
WorkerThread* worker) {
// outer_ 是 ThreadGroupImpl 的引用
ScopedCommandsExecutor executor(outer_.get());
CheckedAutoLock auto_lock(outer_->lock_);
// 必要时尝试创建新Worker, 在此以避免阻塞PostTask执行.
if (!outer_->after_start().wakeup_after_getwork &&
outer_->after_start().wakeup_strategy !=
WakeUpStrategy::kCentralizedWakeUps) {
outer_->EnsureEnoughWorkersLockRequired(&executor);
executor.FlushWorkerCreation(&outer_->lock_);
}
if (!CanGetWorkLockRequired(&executor, worker)) // (1)
return nullptr;
RegisteredTaskSource task_source;
TaskPriority priority;
while (!task_source && !outer_->priority_queue_.IsEmpty()) {
// Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
// BEST_EFFORT tasks run concurrently.
priority = outer_->priority_queue_.PeekSortKey().priority();
if (!outer_->task_tracker_->CanRunPriority(priority) ||
(priority == TaskPriority::BEST_EFFORT &&
outer_->num_running_best_effort_tasks_ >=
outer_->max_best_effort_tasks_)) {
break;
}
task_source = outer_->TakeRegisteredTaskSource(&executor);
}
if (!task_source) {
OnWorkerBecomesIdleLockRequired(worker); // (4)
return nullptr;
}
// Running task bookkeeping.
outer_->IncrementTasksRunningLockRequired(priority);
DCHECK(!outer_->idle_workers_stack_.Contains(worker));
write_worker().current_task_priority = priority;
write_worker().current_shutdown_behavior = task_source->shutdown_behavior();
if (outer_->after_start().wakeup_after_getwork &&
outer_->after_start().wakeup_strategy !=
WakeUpStrategy::kCentralizedWakeUps) {
outer_->EnsureEnoughWorkersLockRequired(&executor);
}
return task_source;
}
bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
ScopedCommandsExecutor* executor,
WorkerThread* worker) {
// To avoid searching through the idle stack : use GetLastUsedTime() not being
// null (or being directly on top of the idle stack) as a proxy for being on
// the idle stack.
const bool is_on_idle_workers_stack =
outer_->idle_workers_stack_.Peek() == worker ||
!worker->GetLastUsedTime().is_null();
DCHECK_EQ(is_on_idle_workers_stack,
outer_->idle_workers_stack_.Contains(worker));
if (is_on_idle_workers_stack) { // (2)
if (CanCleanupLockRequired(worker))
CleanupLockRequired(executor, worker); // (3)
return false;
}
// Excess workers should not get work, until they are no longer excess (i.e.
// max tasks increases). This ensures that if we have excess workers in the
// thread group, they get a chance to no longer be excess before being cleaned
// up.
if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) {
OnWorkerBecomesIdleLockRequired(worker); // (4)
return false;
}
return true;
}
// base/task/thread_pool/thread_group_impl.h
class ThreadGroupImpl {
...
// Stack of idle workers. Initially, all workers are on this stack. A worker
// is removed from the stack before its WakeUp() function is called and when
// it receives work from GetWork() (a worker calls GetWork() when its sleep
// timeout expires, even if its WakeUp() method hasn't been called). A worker
// is pushed on this stack when it receives nullptr from GetWork().
WorkerThreadStack idle_workers_stack_ GUARDED_BY(lock_);
}
首先要说明 WaitForWork 是会超时的, 因此 GetWork 有可能是被等待超时后的 Worker 触发, 也有可能真正被唤醒的.
根据代码和注释, WorkerThread会有两种状态, Idle/Wake, Idle 线程存放在 idle_workers_stack_, 且初始都会是 Idle 的, 在 GetWork 时(1)如果发现 Worker Idle(2), 就会尝试退出线程(3). 否则在当前运行的 Worker 数量大于最大 task 数量或拿不到 task 时(4), 将 WorkerThread 变为 Idle.
这里和上文忽略的动态创建WorkerThread都属于动态线程管理, 在可以忍受线程创建销毁的开销下, 可以保证任务来临时会被尽快的运行. 也解释了为什么要先 WaitForWork, 如果直接调 GetWork, 新创建的线程就直接被当作 Idle 而销毁了.
接下来我们看任务交给 task_tracker_ 运行的流程
// base/task/thread_pool/task_tracker.cc
RegisteredTaskSource TaskTracker::RunAndPopNextTask(
RegisteredTaskSource task_source) {
DCHECK(task_source);
// 是否正在退出,或者应该阻塞退出?
const bool should_run_tasks = BeforeRunTask(task_source->shutdown_behavior());
// Run the next task in |task_source|.
absl::optional<Task> task;
TaskTraits traits;
{
// 开启 transaction, 保证 task_source 的状态.
auto transaction = task_source->BeginTransaction();
task = should_run_tasks ? task_source.TakeTask(&transaction)
: task_source.Clear(&transaction);
traits = transaction.traits();
}
if (task) {
// Run the |task| (whether it's a worker task or the Clear() closure).
RunTask(std::move(task.value()), task_source.get(), traits);
}
if (should_run_tasks)
// 为 SKIP_ON_SHUTDOWN 减少阻塞退出计数
AfterRunTask(task_source->shutdown_behavior());
const bool task_source_must_be_queued = task_source.DidProcessTask();
// |task_source| should be reenqueued iff requested by DidProcessTask().
if (task_source_must_be_queued)
return task_source;
return nullptr;
}
void TaskTracker::RunTask(Task task,
TaskSource* task_source,
const TaskTraits& traits) {
DCHECK(task_source);
const auto environment = task_source->GetExecutionEnvironment();
absl::optional<ScopedDisallowSingleton> disallow_singleton;
absl::optional<ScopedDisallowBlocking> disallow_blocking;
absl::optional<ScopedDisallowBaseSyncPrimitives> disallow_sync_primitives;
if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
disallow_singleton.emplace();
if (!traits.may_block())
disallow_blocking.emplace();
if (!traits.with_base_sync_primitives())
disallow_sync_primitives.emplace();
{
DCHECK(environment.token.IsValid());
ScopedSetSequenceTokenForCurrentThread
scoped_set_sequence_token_for_current_thread(environment.token);
ScopedSetTaskPriorityForCurrentThread
scoped_set_task_priority_for_current_thread(traits.priority());
// Local storage map used if none is provided by |environment|.
absl::optional<SequenceLocalStorageMap> local_storage_map;
if (!environment.sequence_local_storage)
local_storage_map.emplace();
ScopedSetSequenceLocalStorageMapForCurrentThread
scoped_set_sequence_local_storage_map_for_current_thread(
environment.sequence_local_storage
? environment.sequence_local_storage
: &local_storage_map.value());
// Set up TaskRunnerHandle as expected for the scope of the task.
absl::optional<SequencedTaskRunnerHandle> sequenced_task_runner_handle;
absl::optional<ThreadTaskRunnerHandle> single_thread_task_runner_handle;
absl::optional<EphemeralTaskExecutor> ephemeral_task_executor;
switch (task_source->execution_mode()) {
case TaskSourceExecutionMode::kJob:
case TaskSourceExecutionMode::kParallel:
break;
case TaskSourceExecutionMode::kSequenced:
DCHECK(task_source->task_runner());
sequenced_task_runner_handle.emplace(
static_cast<SequencedTaskRunner*>(task_source->task_runner()));
ephemeral_task_executor.emplace(
static_cast<SequencedTaskRunner*>(task_source->task_runner()),
nullptr, &traits);
break;
case TaskSourceExecutionMode::kSingleThread:
DCHECK(task_source->task_runner());
single_thread_task_runner_handle.emplace(
static_cast<SingleThreadTaskRunner*>(task_source->task_runner()));
ephemeral_task_executor.emplace(
static_cast<SequencedTaskRunner*>(task_source->task_runner()),
static_cast<SingleThreadTaskRunner*>(task_source->task_runner()),
&traits);
break;
}
// Run<ShutdownBehavior> => RunTaskImpl => tak_annotator_.RunTask => RunTaskImpl
RunTaskWithShutdownBehavior(task, traits, task_source, environment.token);
// Make sure the arguments bound to the callback are deleted within the
// scope in which the callback runs.
task.task = OnceClosure();
}
}
void TaskAnnotator::RunTaskImpl(PendingTask& pending_task) {
debug::ScopedTaskRunActivity task_activity(pending_task);
TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION(
pending_task.posted_from.file_name());
// Before running the task, store the IPC context and the task backtrace with
// the chain of PostTasks that resulted in this call and deliberately alias it
// to ensure it is on the stack if the task crashes. Be careful not to assume
// that the variable itself will have the expected value when displayed by the
// optimizer in an optimized build. Look at a memory dump of the stack.
static constexpr int kStackTaskTraceSnapshotSize =
PendingTask::kTaskBacktraceLength + 4;
std::array<const void*, kStackTaskTraceSnapshotSize> task_backtrace;
// Store a marker to locate |task_backtrace| content easily on a memory
// dump. The layout is as follows:
//
// +------------ +----+---------+-----+-----------+----------+-------------+
// | Head Marker | PC | frame 0 | ... | frame N-1 | IPC hash | Tail Marker |
// +------------ +----+---------+-----+-----------+----------+-------------+
//
// Markers glossary (compliments of wez):
// cool code,do it dude!
// 0x c001 c0de d0 17 d00d
// o dude,i did it biig
// 0x 0 d00d 1 d1d 17 8119
task_backtrace.front() = reinterpret_cast<void*>(0xc001c0ded017d00d);
task_backtrace.back() = reinterpret_cast<void*>(0x0d00d1d1d178119);
task_backtrace[1] = pending_task.posted_from.program_counter();
ranges::copy(pending_task.task_backtrace, task_backtrace.begin() + 2);
task_backtrace[kStackTaskTraceSnapshotSize - 2] =
reinterpret_cast<void*>(pending_task.ipc_hash);
debug::Alias(&task_backtrace);
auto* tls = GetTLSForCurrentPendingTask();
auto* previous_pending_task = tls->Get();
tls->Set(&pending_task);
if (g_task_annotator_observer)
g_task_annotator_observer->BeforeRunTask(&pending_task);
std::move(pending_task.task).Run();
tls->Set(previous_pending_task);
// Stomp the markers. Otherwise they can stick around on the unused parts of
// stack and cause |task_backtrace| to be associated with an unrelated stack
// sample on this thread later in the event of a crash. Alias once again after
// these writes to make sure the compiler doesn't optimize them out (unused
// writes to a local variable).
task_backtrace.front() = nullptr;
task_backtrace.back() = nullptr;
debug::Alias(&task_backtrace);
}
RunTask中用到了许多RAII, 它们是来保证任务能运行在正确的环境中, 然后我们看到在RunTaskImpl中, 填充了task的源栈帧, 并且使用了两个有意思的魔数, 最后终于将其运行. 运行后的清理工作由 DidProcessTask 完成, 包括将还有任务的 queue 重新加入 ThreadGroup, 以及恢复由 task 带来的状态改变.
退出
线程池的退出从 ThreadPoolImpl::Shutdown 开始.
void ThreadPoolImpl::Shutdown() {
task_tracker_->StartShutdown();
// Allow all tasks to run. Done after initiating shutdown to ensure that non-
// BLOCK_SHUTDOWN tasks don't get a chance to run and that BLOCK_SHUTDOWN
// tasks run with a normal thread priority.
UpdateCanRunPolicy();
// Ensures that there are enough background worker to run BLOCK_SHUTDOWN
// tasks.
foreground_thread_group_->OnShutdownStarted();
if (background_thread_group_)
background_thread_group_->OnShutdownStarted();
task_tracker_->CompleteShutdown();
}
void TaskTracker::StartShutdown() {
CheckedAutoLock auto_lock(shutdown_lock_);
// This method can only be called once.
DCHECK(!shutdown_event_);
DCHECK(!state_->HasShutdownStarted());
shutdown_event_ = std::make_unique<WaitableEvent>();
const bool tasks_are_blocking_shutdown = state_->StartShutdown();
// From now, if a thread causes the number of tasks blocking shutdown to
// become zero, it will call OnBlockingShutdownTasksComplete().
if (!tasks_are_blocking_shutdown) {
// If another thread posts a BLOCK_SHUTDOWN task at this moment, it will
// block until this method releases |shutdown_lock_|. Then, it will fail
// DCHECK(!shutdown_event_->IsSignaled()). This is the desired behavior
// because posting a BLOCK_SHUTDOWN task after StartShutdown() when no
// tasks are blocking shutdown isn't allowed.
shutdown_event_->Signal();
return;
}
}
void TaskTracker::CompleteShutdown() {
// It is safe to access |shutdown_event_| without holding |lock_| because the
// pointer never changes after being set by StartShutdown(), which must
// happen-before this.
DCHECK(TS_UNCHECKED_READ(shutdown_event_));
{
base::ScopedAllowBaseSyncPrimitives allow_wait;
// Allow tests to wait for and introduce logging about the shutdown tasks
// before we block this thread.
BeginCompleteShutdown(*TS_UNCHECKED_READ(shutdown_event_));
// Now block the thread until all tasks are done.
TS_UNCHECKED_READ(shutdown_event_)->Wait();
}
// Unblock FlushForTesting() and perform the FlushAsyncForTesting callback
// when shutdown completes.
{
CheckedAutoLock auto_lock(flush_lock_);
flush_cv_->Broadcast();
}
CallFlushCallbackForTesting();
}
void ThreadGroupImpl::OnShutdownStarted() {
ScopedCommandsExecutor executor(this);
CheckedAutoLock auto_lock(lock_);
// Don't do anything if the thread group isn't started.
if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
return;
// Start a MAY_BLOCK scope on each worker that is already running a task.
for (scoped_refptr<WorkerThread>& worker : workers_) {
// The delegates of workers inside a ThreadGroupImpl should be
// WorkerThreadDelegateImpls.
WorkerThreadDelegateImpl* delegate =
static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
delegate->OnShutdownStartedLockRequired(&executor);
}
EnsureEnoughWorkersLockRequired(&executor);
shutdown_started_ = true;
}
void ThreadGroupImpl::WorkerThreadDelegateImpl::OnShutdownStartedLockRequired(
ScopedCommandsExecutor* executor) {
if (!read_any().is_running_task())
return;
// Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
// max_tasks/max_best_effort_tasks. The effect is reverted in
// DidProcessTask().
if (*read_any().current_shutdown_behavior ==
TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
incremented_max_tasks_for_shutdown_ = true;
IncrementMaxTasksLockRequired();
}
}
首先开始启动 Shutdown. shutdown_event_ 来阻塞 CompleteShutdown, 确保阻塞停止的任务全部运行完成. 然后调用 ThreadGroup 的 OnShutdownStarted, OnShutdownStartedLockRequired 负责统计 CONTINUE_ON_SHUTDOWN 任务的个数, 在 EnsureEnoughWorkersLockRequired 唤醒对应的 WorkerThread 以保证它们能被运行.
看起来 ThreadGroupImpl 没有被销毁, 注释中也提到它不允许被销毁, 除非创建失败或测试环境. 先来看 WorkerThread 是怎么退出的.
// Makes a request to cleanup the worker. This may be called from any thread.
// The caller is expected to release its reference to this object after
// calling Cleanup(). Further method calls after Cleanup() returns are
// undefined.
//
// Expected Usage:
// scoped_refptr<WorkerThread> worker_ = /* Existing Worker */
// worker_->Cleanup();
// worker_ = nullptr;
void WorkerThread::Cleanup() {
DCHECK(!should_exit_.IsSet());
should_exit_.Set();
wake_up_event_.Signal();
}
bool WorkerThread::ShouldExit() const {
// The ordering of the checks is important below. This WorkerThread may be
// released and outlive |task_tracker_| in unit tests. However, when the
// WorkerThread is released, |should_exit_| will be set, so check that
// first.
return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
task_tracker_->IsShutdownComplete();
}
Cleanup 是上文提到 GetTask 时发现自己是 Idle 情况下被调用, 而 ShouldExit 会检查 should_exit_ 和 task_tracker_. RunWorker 在结束后会通过 OnMainExit 调用到 ThreadGroupImpl , 因此需要比所有创建的 WorkerThread 生命周期都长. ThreadPoolImpl 的析构函数中释放了 ThreadGroupImpl 的 unique_ptr, 因此逻辑上应有的销毁顺序为 All WorkerThread, ThreadGroupImpl, ThreadPoolImpl, 但 ThreadPoolImpl 是进程单例且没有主动析构, ThreadPoolImpl, ThreadGroupImpl, 都是不会被析构到, 主动泄漏的.
最后看一张类图,来总结类之间的关系
References
- Threading and Tasks in Chrome
- Chromium source 97 cc7bf2a5b9d50883948c2102873445d2d783d120