第11章 Windows线程池
11.1 传统的Windows线程池及API
(1)线程池中的几种底层线程
①可变数量的长任务线程:WT_EXECUTELONGFUNCTION
②Timer线程:调用CreateTimerQueueTimer时,将在Timer线程上创建以APC方式通知的可等待计时器对象,并让该线程在可警告状态下等待定时器的APC。由于这个线程一旦创建就贯穿进程生命期而不会被销毁,因此WT_EXECUTEINPERSISTENTHREAD标志的线程池回调函数也由这种线程执行。
③多个Wait线程:服务于RegisterWaitForSingleObject,每个线程用WaitForMultipleObjects等待最多63(MAXIMUM_WAIT_OBJECTS减去一个用于维护对象数组的工作对象)个内核对象,对象触发后执行回调函数。
④可变数量的IO线程:因线程在发出异步IO请求(如ReadFileEx)后,一旦线程结束,请求就会被撤消,因此在请求完成之前,发出请求的线程一定要存在。但线程池的被设计为会根据CPU繁忙情况动态地创建和删除线程。因此线程池中有一部分线程比较特殊,他们会检测自己在执行回调函数里发出的异步IO请求是否完成。如果没有,就不会结束,这种会追踪自己发出的异步IO请求的线程被称为IO线程。
⑤可变数量的非IO线程:线程池内部实现了一个IO完成端口,服务于BindIoCompletionCallback,其中IOCP服务线程(即在GetQueueCompletionStatus上休眠)由于其数量会根据CPU情况动态调整,所以不应在这种线程上执行异步IO,故称为非IO线程。
(2) 传统的线程池对象及对应的API
线程池对象 |
API |
普通任务线程池 |
QueueUserWorkItem |
计时器线程池 |
CreateTimerQueue(创建线程池) |
CreateTimerQueueTimer(创建计时器) |
|
ChangeTimerQueueTimer |
|
DeleteTimerQueueTimer |
|
DeteTimerQueueEx |
|
同步对象等待线程池 |
RegisterWaitForSingleObject |
UnregisterWaitEx |
|
完成端口线程池 |
BindIoCompletionCallback |
11.1.2 普通任务线程池
(1)QueueUserWorkItem函数
参数 |
描述 |
LPTHREAD_START_ROUTINE pfnCallback |
工作项,即要排队到线程池中的回调函数(类似于线程函数),原型声明为 DWORD WINAPI ThreadProc(LPVOID lpParameter); |
PVOID Context |
要传给线程函数的额外数据 |
ULONG Flags |
用于指明线程池中的线程在什么条件下调用这个回调函数 ①WT_EXECUTEDEFAULT:普通线程不可警告状态下运行。 ②WT_EXECUTEINIOTHREAD:以IO可警告状态运行线程回调函数。 ③WT_EXECUTEINPERSISTENTTHREAD:该线程一直运行而不会终止。 ④WT_EXECUTELONGFUNCTION:执行一个运行时间较长的任务(这会使系统考虑是否在线程池中创建新的线程)。 ⑤WT_TRANSFER_IMPERSONATION:以当前的访问令牌运行线程并回调函数 |
备注:CreateThread函数与QueueUserWorkItem函数要求的线程函数的原型一致。因此可以方便的将一个线程函数创建为线程或线程池的线程池回调函数。 |
【QueueUserWorkItem示例程序】
#include <windows.h>
#include <locale.h>
#include <tchar.h>
#include <strsafe.h> #define BEGINTHREAD(Fun,Param) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL); DWORD WINAPI ThreadProc(LPVOID lpParameter); int _tmain()
{
_tsetlocale(LC_ALL, _T("chs"));
int iWaitLen = ; do{
_tprintf(_T("请输入一个等待的时间常量,单位秒(输入0退出):"));
_tscanf_s(_T("%i"), &iWaitLen);
if (iWaitLen>){
//下面的代码演示了,使用CreateThread和QueueUserWorkItem,实际效果
//是一样的,当然线程不多的情况下如此,如果线程很多时一定要使用QueueUserWorkItem
QueueUserWorkItem(ThreadProc, (PVOID)iWaitLen, WT_EXECUTELONGFUNCTION); //显示使用CreateThread来创建多个线程的效果
//BEGINTHREAD(ThreadProc, (LPVOID)iWaitLen);
} } while (iWaitLen); return ;
} //该函数可以由CreateThread的线程启动,也可以使用QueueUserWorkItem线程池中的线程启动
DWORD WINAPI ThreadProc(LPVOID lpParameter){
int iWaitLen = (int)lpParameter;
_tprintf(_T("\n线程[ID:0x%X]将等待%u秒..."), GetCurrentThreadId(), iWaitLen);
Sleep(iWaitLen * );
_tprintf(_T("\n线程[ID:0x%X]将等待结束!\n"), GetCurrentThreadId(), iWaitLen);
return ;
}
11.1.3 同步对象等待线程池——当对象被触发时调用函数
(1)RegisterWaitForSingleObject函数
参数 |
描述 |
phNewWaitObject |
返回的线程池对象(同步对象等待线程池),该对象句柄不能用CloseHandle来关闭。 |
hObject |
要等待触发的内核对象 |
pfnCallback |
回调函数,其原型如下: VOID CALLBACK WaitOrTimerCallback( PVOID lpParameter, // thread data BOOLEAN TimerOrWaitFired // reason ); |
pvContext |
传给回调函数的额外参数 |
dwMilliseconds |
等待的时间 |
dwFlags |
与QueueUserWorkItem函数的dwFlags意义相同 |
备注:CreateThread函数与QueueUserWorkItem函数要求的线程函数的原型一致。因此可以方便的将一个线程函数创建为线程或线程池的线程池回调函数。 |
(2)撤销等待:UnregisterWait(hNewWaitObject),传入RegisterWaitForSingleObject时返回的第1个句柄。注意不能用CloseHandle(hNewWaitObject)来关闭
【WaitableCallback程序】
线程池调用同一线程来执行回调函数 线程池调用不同的线程来执行回调函数
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h> //bWaitFired:TRUE表示超时,FALSE表示事件对象被触发
void CALLBACK WaitCallback(PVOID lpParameter, BOOLEAN bWaitFired)
{
if (!bWaitFired){
_tprintf(_T("[ID:0x%X] WaitCallback Success\n"), GetCurrentThreadId());
} else{
_tprintf(_T("[ID:0x%X] WaitCallback Failed\n"), GetCurrentThreadId());
}
} int _tmain(){
_tsetlocale(LC_ALL, _T("chs")); _tprintf(_T("主线程[ID:0x%X] Runing\n"), GetCurrentThreadId()); //创建一个事件对象
HANDLE hEvent = NULL;
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //自动,初始未触发状态
if (NULL == hEvent)
return ; //模拟等待五次
HANDLE hNewWait = NULL; //用来保存线程池对象
//WT_EXECUTEONLYONCE:表示回调函数只被执行一次。适用于进程/线程句柄这种触发后不再重置的对象
RegisterWaitForSingleObject(&hNewWait, hEvent, WaitCallback, NULL, INFINITE, WT_EXECUTEDEFAULT); for (int i = ; i < ;i++){
SetEvent(hEvent); //触发5次,让回调函数被执行5次(但并不关心是那个线程执行了该函数)
Sleep(); //改变这个时间,可以看到线程池会调用不同线程来执行回调函数
} UnregisterWaitEx(hNewWait, INVALID_HANDLE_VALUE); CloseHandle(hEvent);
_tsystem(_T("PAUSE")); return ;
}
11.1.4 定时器回调线程池
(1)调定定时器线程池对象的一般步骤
①调用CreateTimerQueue来创建一个定时器线程池的对象
②调用CreateTimerQueueTimer创建一个定时器,并指定计时器的回调函数及参数
③调用ChangeTimerQueueTimer可修改一个已有的定时器的计时周期
④调用DeleteTimerQueueTimer删除一个定时器对象(该计时器的回调函数也会被停止调用)
⑤调用DeleteTimerQueueEx删除定时器线程池对象。
(2)CreateTimerQueueTimer函数
参数 |
描述 |
phNewTimer |
用来接收创建好的计时器对象句柄的指针 |
hTimerQueue |
计时器线程池对象。为NULL时,使用默认的计时器线程池,此时可以不需调用CreateTimerQueue来创建线程池对象。 |
pfnCallback |
新计时器对象的回调函数。函数原型如下: Void WINAPI WaitOrTimerCallback(PVOID pvContext,BOOL fTimerOrWaitFired);其中的fTimerOrWaitFired为TRUE时,表示调用回调函数时,计时器己经触发。 |
pvContext |
传给回调函数的额外参数 |
dwDueTime |
预计从调用该函数开始后,多少毫秒后第一次调用回调函数。如果为0,只有可能,就会调用回调函数。 |
dwPeriod |
调用回调函数的周期(毫米数)。如果为0,表示一个单步计时器,即回调函数只被调用一次。 |
dwFlags |
用于指明线程池中的线程在什么条件下调用这个回调函数。该参数的意义与QueueUserWorkItem函数相应的参数相同 |
备注:如果dwDueTime和dwPeriod均不为0,计时器在dwDueTime后会第1次被触发,以后每经过dwPeriod时间后,周期性地触发。每次触发时都会调用回调函数,哪怕前一个回调函数还没执行完(会启动另一个线程来调用该回调函数)。 |
(3)修改计时器对象周期:ChangeTimeQueueTimer(hTimerQueue,hTimer,dwDueTime,dwPeriod);
(4)DeleteTimerQueueTimer:删除一个计时器对象
参数 |
描述 |
phNewTimer |
用来接收创建好的计时器对象句柄的指针 |
hTimerQueue |
指定要删除的计时器对象位于哪个线程队列(线程池对象)中 |
hTimer |
要删除的计时器对象 |
hCompletionEvent |
当系统取消计时器且队列中的所有回调函数都执行完毕时,会触发该事件对象。如果指定为INVALID_HANDLE_VALUE:将会一直等待计时器其回调函数,DeleteTimerQueueTimer函数才返回。如果为NULL,函数会给该计时器对象做个删除标志并立即返回。但是以后并不会接收到回调函数执行完的任何通知,一般为指定为NULL。如果指定为一个有效的事件内核对象的句柄,将函数会立即返回,当所有排队的工作项目完成之后,该内核对象被触发。 |
备注: 从当回调函数内部删除计时器对象可能会造成死锁。 |
(5)DeleteTimerQueueEx:删除计时器队列及其中的所有计时器对象
DeleteTimerQueueEx(hTimerQueue,hCompletionEvent);
【TimerQueue程序】
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h> HANDLE gDoneEvent = NULL; //定时器回调函数
//参数TimerOrWaitFired:TRUE表示当函数被调用时,定时器己经被触发
void CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired); int _tmain(){
_tsetlocale(LC_ALL, _T("chs")); HANDLE hTimer = NULL;
HANDLE hTimerQueue = NULL;
int arg = ; //使用事件对象来追踪TimerRoutine的运行
gDoneEvent = CreateEvent(NULL, TRUE, FALSE, NULL); //手动重置
if (NULL == gDoneEvent){
_tprintf(_T("创建事件对象失败!(%d)\n"), GetLastError());
return ;
} //创建定时器线程池对象
hTimerQueue = CreateTimerQueue();
if (NULL == hTimerQueue){
_tprintf(_T("定时器线程池对象失败!(%d)\n"), GetLastError());
return ;
} //创建定时器对象
if (!CreateTimerQueueTimer(&hTimer, hTimerQueue,
(WAITORTIMERCALLBACK)TimerRoutine, &arg, , , )){
_tprintf(_T("定时器对象失败!(%d)\n"), GetLastError());
return ;
} _tprintf(_T("将在10秒后调用TimerRoutine函数...\n")); if (WAIT_OBJECT_0 != WaitForSingleObject(gDoneEvent, INFINITE)){
_tprintf(_T("WaitForSingleObject失败(%d)\n"), GetLastError());
}
CloseHandle(gDoneEvent); //因gDoneEvent被触发时,说明回调函数己执行完,但计时器对象还没被完全删除,
//所以指定参数为INVALID_HANDLE_VALUE会等到所有对象都被删除时,Delete*函数才会返回。
//注意DleteTimerQueueEx虽然是删除线池队列的,但该函数被调用时,会将线程池中的所有
//计时器对象一起删除,因此此处可以不必调用DeleteTimerQueueTimer函数。
if (!DeleteTimerQueueEx(hTimerQueue,INVALID_HANDLE_VALUE)){
_tprintf(_T("DeleteTimerQueue失败(%d)\n"), GetLastError());
} _tsystem(_T("PAUSE"));
return ;
} void CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired){
if (lpParam == NULL){
_tprintf(_T("TimerRoutine lpParam为空!\n"));
} else{
_tprintf(_T("TimerRoutine函数被调用。参数为%d.\n"), *(int*)lpParam);
} SetEvent(gDoneEvent);
}
11.1.4 完成端口回调线程池
(1)BindIoCompletionCallback函数
参数 |
描述 |
hDevice |
设备句柄(如文件、Socket等) |
pfnCallback |
回调函数(完成例程)函数原型如下: VOID WINAPI OverlappedCompletionRoutine( DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, POVERLAPPED pOverlapped); |
dwFlags |
保留字段,这个标志必须为0. |
备注: ①该函数内部是使用非IO线程来调用回调函数的,所以在回调函数内部不能再发出其他的异步IO请求。(注意,该函数内部使用非IO线程来处理向设备发出异步IO请求的。) ②该函数并没有OVERLAPPED结构体的参数,但OVERLAPPED结构体会被传递给ReadFile和WriteFile之类的函数,操作系统会跟踪这个结构体,并在请求完成时,传入完成端口,并最终传到我们的回调函数中来。 ③如果有其他信息要传入回调函数中,可以在自定封装OVERLAPPED结构体,设计出一个“单IO数据”结构体以便传递额外的参数。 ④CloseHandle(hDevice)来关闭设备,会导致所有待处理IO请求立即完成,并产生一个错误码,可以在回调函数中处理这种情况。因此在关闭设备时,如果要避免这种错误,可以引入引用计数,当每发出一个IO请求时,计数加1,完成一个IO请求时,计数减1,当计数为0时,才能关闭。 |
(2)完成端口回调线程池的优点
①不用自己再去创建和管理完成端口对象。
②不用创建线程和管理线程。
③不用调用GetQueuedCompletionStatus(Ex)方法去等待IO完成操作。
④开发者只需要集中设计好每个IO完成后的回调函数即可。
【IOCPPool程序】利用完成端口回调线程池模拟文件的写入操作
#define _WIN32_WINNT 0x0600 //用于 Windows Vista
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h> #define GRS_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz)
#define GRS_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define GRS_SAFEFREE(p) if(NULL !=p){HeapFree(GetProcessHeap(),0,p);p=NULL;} #define GRS_ASSERT(s) if (!(s)){DebugBreak();} #define OP_READ 0x1 //读取操
#define OP_WRITE 0x2 //写入操作 #define GRS_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);
#define MAX_WRITEPERTHREAD 20 //每个线程最大写入次数
#define MAXWRITE 10 //写入线程数量 //单IO数据(扩展OVERLAPPED结构体
typedef struct _tagPER_IO_CONTEXT{
OVERLAPPED m_ol; //Overlapped结构体
HANDLE m_hFile; //操作的文件句柄
DWORD m_dwOp; //操作类型,OP_READ或OP_WRITE
LPVOID m_pData; //操作的数据
UINT m_nLen; //操作的数据长度
DWORD m_dwWrite; //写入的字节数
DWORD m_dwTimestamp; //起始操作的时间戳 }PER_IO_CONTEXT, *PPER_IO_CONTEXT; //IOCP线程池回调函数,实际就是完成通知响应函数
VOID CALLBACK FileIoCompletionRoutine(DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped); //写入文件的线程
DWORD WINAPI WriteThread(LPVOID lpParameter); //当前操作的文件对象的指针
LARGE_INTEGER g_liFilePointer = {}; //获取文件所在的完整路径(不含文件名,但包含最后的\)
VOID GetAppPath(LPTSTR pszBuffer){
DWORD dwLen = ;
if ( == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
return; for (DWORD i = dwLen; i > ;i--){
if ('\\'== pszBuffer[i]){
pszBuffer[i + ] = '\0';
break;
}
}
} int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); TCHAR pFileName[MAX_PATH] = { };
GetAppPath(pFileName);
StringCchCat(pFileName, MAX_PATH, _T("OldIOCPFile.txt")); HANDLE ahWThread[MAX_WRITEPERTHREAD] = { };
DWORD dwWrited = ; //创建文件(使用FILE_FLAG_OVERLAPPED标志,表示异步设备)
HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, , NULL, CREATE_ALWAYS,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,NULL); if (INVALID_HANDLE_VALUE == hTxtFile){
_tprintf(_T("创建文件(%s)失败,错误码:0x%08X\n"), pFileName, GetLastError());
_tsystem(_T("PAUSE"));
return ;
} //将文件句柄与IOCP线程池绑定
BindIoCompletionCallback(hTxtFile, FileIoCompletionRoutine, ); //写入UNICODE文件的前缀码,以便正确打开
PER_IO_CONTEXT* pIo = (PPER_IO_CONTEXT)GRS_CALLOC(sizeof(PER_IO_CONTEXT)); GRS_ASSERT(NULL != pIo); pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = hTxtFile;
pIo->m_pData = GRS_CALLOC(sizeof(WORD));
GRS_ASSERT(NULL != pIo->m_pData); *(WORD*)pIo->m_pData = MAKEWORD(0xff, 0xfe); //UNICODE文本文件的前缀
pIo->m_nLen = sizeof(WORD); //偏移文件指针
pIo->m_ol.Offset = g_liFilePointer.LowPart;
pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
g_liFilePointer.QuadPart += pIo->m_nLen; //重新文件指针的位置 pIo->m_dwTimestamp = GetTickCount(); //记录时间戳 WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen, &pIo->m_dwWrite, &pIo->m_ol);//写入Unicode前缀 //启动写入线程进行日志写入操作
for (int i = ; i < MAXWRITE; i++){
ahWThread[i] = GRS_BEGINTHREAD(WriteThread, hTxtFile);
} //让主线程等待写入线程结束
WaitForMultipleObjects(MAXWRITE, ahWThread, TRUE, INFINITE); for (int i = ; i < MAXWRITE; i++){
CloseHandle(ahWThread[i]);
} //关闭日志文件
if (INVALID_HANDLE_VALUE != hTxtFile){
CloseHandle(hTxtFile);
hTxtFile = INVALID_HANDLE_VALUE;
} _tsystem(_T("PAUSE")); return ;
} VOID CALLBACK FileIoCompletionRoutine(DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
{
if (ERROR_SUCCESS != dwErrorCode){
_tprintf(_T("I/O操作出错,错误码:%u\n"), dwErrorCode);
return;
} PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(lpOverlapped, PER_IO_CONTEXT, m_ol); DWORD dwCurTimestamp = GetTickCount(); switch (pIoContext->m_dwOp)
{
case OP_WRITE:
_tprintf(_T("线程[0x%X]得到IO完成通知,完成操作(%s),缓冲区(0x%08X)长度(%ubytes),写入时间戳(%u),当前时间戳(%u),时差(%u)\n"),
GetCurrentThreadId(),OP_WRITE == pIoContext->m_dwOp ? _T("Write"):_T("Read"),
pIoContext->m_pData,pIoContext->m_nLen,pIoContext->m_dwTimestamp,dwCurTimestamp,dwCurTimestamp-pIoContext->m_dwTimestamp);
GRS_SAFEFREE(pIoContext->m_pData);
GRS_SAFEFREE(pIoContext);
break; case OP_READ: //这里没用到
break; default:
break;
}
} //写入文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParameter){
TCHAR pTxtContext[MAX_LOGLEN] = {};
PER_IO_CONTEXT* pIo = NULL;
size_t szLen = ;
LPTSTR pWriteText = NULL; StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%08X]写入.\r\n"),
GetCurrentThreadId());
StringCchLength(pTxtContext, MAX_LOGLEN, &szLen); szLen +=; int i=;
//每个线程写入20次
for (; i < MAX_WRITEPERTHREAD;i++){
pWriteText = (LPTSTR)GRS_CALLOC(szLen*sizeof(TCHAR)); //每条记录
GRS_ASSERT(NULL != pWriteText);
StringCchCopy(pWriteText, szLen, pTxtContext); //为每个IO操作申请一个“单IO数据”结构体
pIo = (PER_IO_CONTEXT*)GRS_CALLOC(sizeof(PER_IO_CONTEXT));
GRS_ASSERT(NULL != pIo);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = (HANDLE)lpParameter;
pIo->m_pData = pWriteText; //pWriteText为堆上分配的数据!生命期长于线程函数
pIo->m_nLen = (szLen-)* sizeof(TCHAR); //注意,这里不写入每行最后的\0 //这里使用原子操作同步文件指针,写入不会相互覆盖
//函数执行时,先比较第1个参数和第3个参数是否相同(此处必然相等)。相等时
//将参数2的值更新到g_liFilePointer里,同时将g_liFilePointer中旧的值赋值给
//pIo->m_ol.Pointer(这是一个技巧,体现了lock-free算法的精髓)
*((LONGLONG*)&pIo->m_ol.Pointer) =
InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
g_liFilePointer.QuadPart + pIo->m_nLen,g_liFilePointer.QuadPart); pIo->m_dwTimestamp = GetTickCount(); //记录写入时间戳 //写入
WriteFile((HANDLE)lpParameter, pIo->m_pData, pIo->m_nLen, &pIo->m_dwWrite, &pIo->m_ol);
} return i; //该线程写入的次数
}