ThreadPool(线程池)介绍

>>返回《C# 并发编程》

1. 线程池的由来

1.1. 线程池出现前

解决三个需求

  1. 异步调用方法
  2. 按时间间隔调用方法
  3. 当一个内核对象收到信号时调用方法

开发人员经常创建一个新线程来执行单个任务,当任务完成时,该线程就会死亡。

  • 与进程相比,创建和销毁线程速度更快,并且占用的OS资源更少,但是创建销毁线程肯定不是免费的
  • 创建线程过程
    1. 分配和初始化内核对象,分配和初始化线程的堆栈内存
    2. Windows 向进程中的每个DLL发送 DLL_THREAD_ATTACH 通知,从而导致磁盘中的pages被转移到内存中以便代码可以执行
  • 销毁线程过程
    • 当线程死亡时,将向每个DLL发送 DLL_THREAD_DETACH 通知,该线程的堆栈内存被释放,并且内核对象被释放(如果其使用计数变为0
  • 因此,创建和销毁线程有很多开销,这些开销与创建线程最初要执行的工作无关

1.2. 线程池的诞生

  • Microsoft实现了一个线程池,该线程池首次在 Windows2000 中获得支持。
  • 当 .NET Framework 团队设计和构建公共语言运行库(CLR)时,他们决定在CLR本身中实现一个线程池
    • 这样,任何托管应用程序都可以利用线程池,即使该应用程序运行在Windows 2000之前的Windows版本(例如Windows 98)上。

1.3. CLR线程池工作过程

线程池执行任务流程

  • CLR初始化时,其线程池不包含任何线程。
  • 当应用程序要创建线程来执行任务时,应用程序应请求该任务由线程池线程执行。
  • 线程池知道后,将创建一个初始线程。这个新线程将与其他任何线程进行相同的初始化。
  • 当任务完成时,线程不会销毁自己。而是,线程将进入挂起(suspended)状态下返回线程池。
  • 线程池已有线程能满足运算需求
    • 如果应用程序再次请求线程池,则被挂起的线程将被唤醒执行任务,不会创建新线程。
    • 只要应用程序将任务列入到线程池的速度不超过一个线程处理每个列入的任务的速度,就节省了线程创建销毁产生的开销
  • 线程池已有线程不能满足运算需求
    • 如果应用程序将任务列入到线程池的速度超过一个线程处理每个列入的任务的速度,则线程池将创建其他线程。
    • 当然,创建新线程确实会产生开销,但是应用程序很可能仅需要几个线程来处理应用程序生命周期内向它抛出的所有任务
  • 线程池线程超过需要需要的算力
    • 当线程池线程自身挂起时,如果一段时间(如40秒)没有被使用时,线程将唤醒并自行销毁
      • 从而释放它正在使用的所有OS资源(堆栈,内核对象等)
  • 总的来说,使用线程池可以提高应用程序的性能。

线程池提供了四种功能:

  • 异步调用方法
  • 按时间间隔调用方法
  • 当发信号通知单个内核对象时调用方法
  • 异步I/O请求完成时调用方法
    • 应用程序开发人员很少使用这个功能,因此在此不做说明

要为线程池中的任务排队,请使用 System.Threading 命名空间中定义的 ThreadPool 静态类。

2. 线程池解决的问题

2.1. 异步调用方法

要使线程池线程异步调用方法,您的代码必须调用 ThreadPoolQueueUserWorkItem 方法,如下所示:

public static Boolean QueueUserWorkItem(WaitCallback wc, Object state);
public static Boolean QueueUserWorkItem(WaitCallback wc); 

这些方法将“工作项”(和可选的状态数据)排队到线程池中的线程,然后立即返回。

  • “工作项”只是一个委托
    • System.Threading.WaitCallback 委托类型定义如下:

      public delegate void WaitCallback(Object state);
  • “状态数据”是一个 Object 类型的数据,作为参数传递给“工作项”委托
    • 没有 “状态数据” 参数的QueueUserWorkItem版本将 null 传递给WaitCallback方法
  • 最终,池中的某些线程将处理工作项,从而导致您的方法被调用

CLR的线程池将在必要时自动创建线程,并在可能的情况下重用现有线程。该线程在处理回调方法后不会立即被销毁。它返回线程池,以便准备处理队列中的任何其他工作项

线程池调用异步方法

using System;
using System.Threading;

class App {
   static void Main() {
      Console.WriteLine("Main thread: 列入一个异步操作.");
      ThreadPool.QueueUserWorkItem(new WaitCallback(MyAsyncOperation));

      Console.WriteLine("Main thread: 执行其他操作.");
      // ...

      Console.WriteLine("Main thread: 暂停在这,以模拟执行其他操作。");
      Console.ReadLine();
   }

   static void MyAsyncOperation(Object state) {
      Console.WriteLine("ThreadPool thread: 执行异步操作.");
      // ...
      Thread.Sleep(5000);    
      // 等待5s,模拟执行工作项
      // 方法返回,导致线程挂起自身,以等待其他“工作项”
   }
}

输出为:

Main thread: 列入一个异步操作.
Main thread: 执行其他操作.
Main thread: 暂停在这,以模拟执行其他操作。
ThreadPool thread: 执行异步操作.

2.2. 按时间间隔调用方法

System.Threading 命名空间定义了 Timer 类。当构造 Timer 类的实例时,您在告诉线程池您希望在将来的特定时间回调您的方法。 Timer 类提供了四个构造函数:

public Timer(TimerCallback callback, Object state,
   Int32 dueTime, Int32 period);
public Timer(TimerCallback callback, Object state,
   UInt32 dueTime, UInt32 period);
public Timer(TimerCallback callback, Object state,
   Int64 dueTime, Int64 period);
public Timer(TimerCallback callback, Object state,
   Timespan dueTime, TimeSpan period); 

System.Threading.TimerCallback 委托类型定义如下:

public delegate void TimerCallback(Object state);
  • 构造传递的委托调用时将构造传递的Object对象 state 作为参数传递
  • 可以使用dueTime参数来告诉线程池第一次调用回调方法之前要等待多少毫秒。
    • 立即调用回调方法设置 dueTime 参数为 0
    • 防止调用回调方法设置 dueTime 参数为 Timeout.Infinite0
  • 参数 period 允许您指定每个连续调用之前等待的时间(以毫秒为单位)
    • 为0时,线程池将仅调用一次回调方法。
  • 构造 Timer 对象后,线程池自动监视时间
    • Timer类提供了一些方法,使您可以与线程池进行通信,以修改何时回调该方法
    • ChangeDispose 方法
      cs public Boolean Change(Int32 dueTime, Int32 period); public Boolean Change(UInt32 dueTime, UInt32 period); public Boolean Change(Int64 dueTime, Int64 period); public Boolean Change(TimeSpan dueTime, TimeSpan period); public Boolean Dispose(); public Boolean Dispose(WaitHandle notifyObject);
      • Change 方法使您可以更改 Timer对象dueTimeperiod
      • 使用 Dispose 方法可以完全和有选择的取消回调
        • 当所有正在执行的回调完成后,通过notifyObject参数,发送信号到内核对象,取消后续执行

每2000毫秒(或2秒)调用一次方法示例:

using System;
using System.Threading;

class App {
   static void Main() {
      Console.WriteLine("每2秒检查一次修改状态.");
      Console.WriteLine("   (按 Enter 键停止示例程序)");
      Timer timer = new Timer(new TimerCallback(CheckStatus), null, 0, 2000);
      Console.ReadLine();
   }

   static void CheckStatus(Object state) {
      Console.WriteLine("检查状态.");
      // ...
   }
}

输出为:

每2秒检查一次修改状态.
   (按 Enter 键停止示例程序)
检查状态.
检查状态.
检查状态.
... ...

3. 当单个内核对象接收到信号通知时调用方法

在进行性能研究时,Microsoft研究人员发现许多应用程序生成线程,只是为了等待单个内核对象接收发出的信号。

对象发出信号后,线程将某种通知发布到另一个线程,然后返回回来以等待对象再次发出信号。

一些开发人员甚至编写了其中多个线程各自等待一个对象的代码。这是对系统资源的极大浪费。
因此,如果您的应用程序中当前有等待单个内核对象发出信号的线程,那么线程池再次是您提高应用程序性能的理想资源。

3.1. 注册WaitHandle

System.Threading.ThreadPool 类中定义的一些静态方法(RegisterWaitForSingleObject)。要使线程池线程在发出内核对象信号时调用方法

public static RegisterWaitHandle RegisterWaitForSingleObject(
   WaitHandle waitObject, WaitOrTimerCallback callback, Object state, 
   UInt32 milliseconds, Boolean executeOnlyOnce);

public static RegisterWaitHandle RegisterWaitForSingleObject(
   WaitHandle waitObject, WaitOrTimerCallback callback, Object state, 
   Int32 milliseconds, Boolean executeOnlyOnce);

public static RegisterWaitHandle RegisterWaitForSingleObject(
   WaitHandle waitObject, WaitOrTimerCallback callback, Object state, 
   TimeSpan milliseconds, Boolean executeOnlyOnce);

public static RegisterWaitHandle RegisterWaitForSingleObject(
   WaitHandle waitObject, WaitOrTimerCallback callback, Object state,
   Int64 milliseconds, Boolean executeOnlyOnce);

当您调用RegisterWaitForSingleObject方法时

  • waitObject 参数,需要线程池等待的内核对象
    • 可以将引用传递给AutoResetEvent,ManualResetEvent或Mutex对象
  • callback 参数,需要线程池线程调用的方法
    • System.Threading.WaitOrTimerCallback 委托类型定义:

      public delegate void WaitOrTimerCallback(Object state, Boolean timedOut);
  • state 参数, callback 委托对象运行时需要的 state 参数
    • 如果不需要可以为 null
  • milliseconds参数, 单位:毫秒,需要线程池等待多久后向内核对象发出超时信号
    • 通常在此处传递-1表示无限超时
  • executeOnlyOnce参数
    • true ,则线程池线程将仅执行一次回调方法,之后线程将不再在 waitObject 参数上等待
    • false ,则每次向内核对象发出信号时,线程池线程都会执行回调方法(这对于AutoResetEvent对象最有用)
      • 表示每次完成等待操作后都重置计时器,直到 waitObject 取消注册

调用WaitOrTimerCallback委托类型的回调方法,bool类型的 timedOut 参数值:

  • false ,说明内核对象收到信号,导致该方法被调用
  • true ,说明在指定的时间内没有发信号通知内核对象,超时后导致该方法被调用
    • 回调方法应执行必要的操作(超时处理)

3.2. 注销 WaitHandle

  • 在前提到的 RegisterWaitForSingleObject 方法返回一个 RegisteredWaitHandle 对象
  • 该对象标识线程池正在等待的内核对象
    • 如果由于某种原因您的应用程序想要告诉线程池停止监视已注册的 WaitHandle ,则您的应用程序可以调用 RegisteredWaitHandleUnregister 方法:
      cs public Boolean Unregister(WaitHandle waitObject);
    • waitObject 参数,指示当所有排队的工作项均已执行时,如何通知您
      • 如果您不想收到通知,则应为此参数传递 null
      • 如果将派生自 WaitHandle 的对象引用传递给Unregister方法,当已注册的WaitHandle的所有待处理工作项均已执行时,线程池将向该对象;。;(waitObject)发出信号

3.3. 代码示例

using System;
using System.Threading;

class App {
   static void Main() {
      AutoResetEvent are = new AutoResetEvent(false);
      RegisteredWaitHandle rwh = ThreadPool.RegisterWaitForSingleObject(
         are, new WaitOrTimerCallback(EventSignalled), null, 1100, false);
      for (Int32 x = 0 ; x < 5; x++) {
         Thread.Sleep(1000);
         are.Set();
      }
      
      Thread.Sleep(2400);

      rwh.Unregister(null);
      Console.WriteLine("按 Enter 键停止示例程序");
      Console.ReadLine();
   }

   static void EventSignalled(Object state, Boolean timedOut) {
      if (timedOut) {
         Console.WriteLine("等待 AutoResetEvent 操作超时.");
      } else {
         Console.WriteLine("AutoResetEvent 接到信号.");
      }
   }
}

输出为:

; 五次 AutoResetEvent 发送信号
AutoResetEvent 接到信号.
AutoResetEvent 接到信号.
AutoResetEvent 接到信号.
AutoResetEvent 接到信号.
AutoResetEvent 接到信号.
; 等待2.4s导致两次超时
等待 AutoResetEvent 操作超时.
等待 AutoResetEvent 操作超时.
; 注销后不在调用委托
按 Enter 键停止示例程序

4. 结语

关于线程池相关的 API 不需要熟练掌握,但是我们了解了线程池的设计初衷和具备的功能,为我们更好的理解 异步编程、并行、多线程 有很大的助益。

前一章我们了解了 同步上下文 是如何编排线程执行代码的,这一章我们了解了线程池,加深了异步任务交给线程池执行的理解,后面我们开始对 异步编程、数据流块、Rx等我们常用的技术进行讲解。

上一篇:用Rust编写web server,实现线程池的清除


下一篇:半同步半异步线程池框架代码实现