//本线程池能够实际运行。
//线程池类比较长,放在后面
//1.新建窗体,在画面上放两个按钮,一个用于扔任务到现场池,另一个用于查看任务是否完成。
//2.添加线程池类,较长,放在后面。
//3.在窗体cpp文件加入头文件
#include "visSimpleThreadPool.h"
//4.在窗体类cpp文件中加入
class Work1;
Work1 *pwork1;Work1 *pwork2;
class Work1:public TWork
{ public:
void Exec()
{ char str[100];
AnsiString t=WorkName;
for (int i = 0; i < 1000; i++)
{ sprintf(str,"%s:%d\n",t.c_str(),i);
OutputDebugStringA(str);
}
}
};
//5.在窗体的FormCreate()中加入
pwork1=new Work1;pwork2=new Work1;
pwork1->WorkName="WORK1";pwork2->WorkName="WORK2";
g_pThreadWorkList = new TThreadWorkList(10);
//6.在窗体的FormClose()中加入
if(g_pThreadWorkList) delete g_pThreadWorkList;
delete pwork1; delete pwork2;
//7.在的第1个按钮中加入
g_pThreadWorkList->AddWork(pwork1);
g_pThreadWorkList->AddWork(pwork2);
//8.在的第2个按钮中加入
ShowMessage(pwork1->WorkIsEnd());
//2. 包括2.1线程池类,以及2.2t emp.h头文件
//2.1线程池类
//----------------------------------------------------------------------------------------------------------------
//visSimpleThreadPool.h
//---------------------------------------------------------------------------
#include "temp.h"
#ifndef visSimpleThreadPoolH
#define visSimpleThreadPoolH
//---------------------------------------------------------------------------
#include <Classes.hpp>
#include <vector>
#include <queue>
#include <SyncObjs.hpp>
//---------------------------------------------------------------------------
class TWork_Thread ;
class PACKAGE TWork
{
public :
//--Tag开头的这几个变量留给用户使用。
void *Tag;
byte TagWait ; //
byte TagIndex ; //
//----
bool WorkEnd ; //任务已经完成 , 未完成则为排程中
bool IsWorking ; //计算中
unsigned int TimeStart;
unsigned int TimeEnd ;
String WorkMsg ;
String WorkName ;
TWork_Thread *Work_Thread ;
void (__closure *pOnWorkEnd)(TWork * );
void (__closure *pWorkExec)(TWork * );
TWork();
virtual ~TWork();
virtual void Exec() ;
bool WorkIsEnd();
void SetWorkIsEnd();
};
//---------------------------------------------------------------------------
class TThreadWorkList ;
class PACKAGE TWork_Thread : public TThread
{
friend class TThreadWorkList ;
private:
bool running ;
protected:
void __fastcall Execute();
TThreadWorkList *ThreadWorkList ;
public:
TWork *work ;
String WorkName ;
__fastcall TWork_Thread(bool CreateSuspended,TThreadWorkList *p);
};
//---------------------------------------------------------------------------
class PACKAGE TWorkList
{
TCriticalSection *CS ;
public :
std::vector<TWork *> FWorks ; //任务队列
void AddWork(TWork *pWork);
void ClearWorkList();
TWorkList();
~TWorkList();
};
//---------------------------------------------------------------------------
class PACKAGE TThreadWorkList
{
friend class TWork_Thread ;
TCriticalSection *ThreadPoolCS ;
TWork_Thread *FindFreeThread();
void RunWork(TWork_Thread *pThread,TWork *pWork);
std::queue<TWork *> FWorks ; //任务队列
std::vector<TWork_Thread *> FThreads ; //线程池 bool AutoRun;
public :
TThreadWorkList();
TThreadWorkList(int NThreads);
~TThreadWorkList();
bool AllThreadsIsFree();
int GetFreeThreadCount();
void AddWork(TWork *pWork);
int ClearWorkList();
};
//执行列表
extern PACKAGE TThreadWorkList *g_pThreadWorkList ;
//等待列表(未满足执行条件的待运行任务)
extern PACKAGE TWorkList *G_WaitingWorkList ;
extern PACKAGE bool G_StopAllThread ;
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
#endif
//----------------------------------------------------------------------------------------------------------------
//visSimpleThreadPool.cpp
// ---------------------------------------------------------------------------
#include <System.hpp>
#pragma hdrstop
#include "visSimpleThreadPool.h"
#pragma package(smart_init)
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
TThreadWorkList *g_pThreadWorkList = NULL;
TWorkList * G_WaitingWorkList = NULL;
bool G_StopAllThread = false;
__fastcall TWork_Thread::TWork_Thread(bool CreateSuspended, TThreadWorkList *p) :
TThread(CreateSuspended), work(NULL), ThreadWorkList(p), running(false)
{
}
// ---------------------------------------------------------------------------
void TWork::Exec()
{
if (pWorkExec)
pWorkExec(this);
};
// ---------------------------------------------------------------------------
void __fastcall TWork_Thread::Execute()
{
while (!Terminated && !G_StopAllThread)
{
if (work)
{
running = true;
work->IsWorking = true;
work->Work_Thread = this;
work->TimeStart = ::GetTickCount();
work->TimeEnd = 0;
this->WorkName = work->WorkName;
try
{
work->Exec();
}
catch(Exception &ee)
{
ThreadPostError(WorkName + "线程任务【异常】:" + ee.Message);
}
catch(...)
{
ThreadPostError(WorkName+"线程任务发生未知异常");
}
{
work->TimeEnd = ::GetTickCount();
work->WorkEnd = true;
work->Work_Thread = NULL;
work->IsWorking = false;
this->WorkName = "";
if (work->pOnWorkEnd)
work->pOnWorkEnd(work);
work = NULL;
}
if (ThreadWorkList && !ThreadWorkList->FWorks.empty())
{
bool WorkContinue = false;
ThreadWorkList->ThreadPoolCS->Acquire();
if (!ThreadWorkList->FWorks.empty())
{
work = ThreadWorkList->FWorks.front();
ThreadWorkList->FWorks.pop();
WorkContinue = true;
}
ThreadWorkList->ThreadPoolCS->Release();
if (WorkContinue)
continue;
}
}
running = false;
if (Terminated || G_StopAllThread)
break;
else
this->Suspend();
}
}
// ---------------------------------------------------------------------------
// TThreadWorkList
// ---------------------------------------------------------------------------
TThreadWorkList::TThreadWorkList() : FWorks(), FThreads()
{
ThreadPoolCS = new TCriticalSection;
}
TThreadWorkList::TThreadWorkList(int NThreads) : FWorks()
{
ThreadPoolCS = new TCriticalSection;
FThreads.resize(NThreads);
for (int i = 0; i < NThreads; ++i)
{
FThreads[i] = new TWork_Thread(true, this);
FThreads[i]->FreeOnTerminate = false;
}
}
// ---------------------------------------------------------------------------
TThreadWorkList::~TThreadWorkList()
{
try
{
for(size_t i = 0; i < FThreads.size(); ++i)
{
if(FThreads[i]->Suspended)
{
FThreads[i]->work = NULL;
FThreads[i]->Terminate();
FThreads[i]->Resume();
// FThreads[i]->Terminate();
}
else
{
FThreads[i]->Terminate();
}
}
}
catch(...)
{
}
Sleep(50);
try
{
for(size_t i = 0; i < FThreads.size(); ++i)
{
if(FThreads[i]->Suspended)
{
FThreads[i]->Resume();
Sleep(50);
}
else
{
Sleep(1);
}
if(FThreads[i]->running && !ThreadIsEnd(FThreads[i]))
{
if(FThreads[i]->work)
{
// && FThreads[i]->WorkName != ""
static AnsiString tmp = "强行终止未完成的作业:" + FThreads[i]->WorkName;
ThreadPostError(tmp.c_str());
Sleep(10);
if(FThreads[i]->running && !ThreadIsEnd(FThreads[i]))
{
try
{
// 可能造成无响应。堆没被释放。
void *p = (void *)FThreads[i]->Handle;
TerminateThread(p, 1);
Sleep(10);
CloseHandle(p);
}
catch(...)
{}
}
else
{
delete FThreads[i];
}
continue;
}
//
}
delete FThreads[i];
}
}
catch(...)
{
}
FThreads.clear();
while(!FWorks.empty())
{
FWorks.pop();
}
delete ThreadPoolCS;
}
// ---------------------------------------------------------------------------
TWork_Thread *TThreadWorkList::FindFreeThread()
{
for (size_t i = 0; i < FThreads.size(); ++i)
{
if (FThreads[i]->Suspended && FThreads[i]->work == NULL && !FThreads[i]->Terminated)
return FThreads[i];
}
return NULL;
}
// ---------------------------------------------------------------------------
int TThreadWorkList::GetFreeThreadCount()
{
int FreeCount = 0 ;
for (size_t i = 0; i < FThreads.size(); ++i)
{
if (FThreads[i]->Suspended && FThreads[i]->work == NULL && !FThreads[i]->Terminated)
FreeCount++;
}
return FreeCount;
}
// ---------------------------------------------------------------------------
int TThreadWorkList::ClearWorkList()
{
ThreadPoolCS->Acquire();
while (!FWorks.empty())
FWorks.pop();
ThreadPoolCS->Release();
return GetFreeThreadCount();
}
// ---------------------------------------------------------------------------
bool TThreadWorkList::AllThreadsIsFree()
{
for (size_t i = 0; i < FThreads.size(); ++i)
{
if ((!FThreads[i]->Suspended)) // || FThreads[i]->work)
return false;
}
return true;
}
// ---------------------------------------------------------------------------
void TWorkList::AddWork(TWork *pWork)
{
CS->Acquire();
int n = FWorks.size() ;
if(n > 6)
{
ThreadPostInfo(0,"线程池滞留任务数:"+AnsiString(n));
}
pWork->TimeStart = 0 ;
pWork->TimeEnd = 0 ;
pWork->WorkEnd = false ;
pWork->IsWorking = false;
FWorks.push_back(pWork);
CS->Release();
}
// ---------------------------------------------------------------------------
void TThreadWorkList::AddWork(TWork *pWork)
{
if (pWork)
{
pWork->WorkEnd = false;
pWork->IsWorking = false;
pWork->TimeStart = 0;
pWork->TimeEnd = 0;
ThreadPoolCS->Acquire();
TWork_Thread *pThread = TThreadWorkList::FindFreeThread();
if (pThread)
RunWork(pThread, pWork);
else
FWorks.push(pWork); // 放到后面去。
ThreadPoolCS->Release();
int n = GetFreeThreadCount();
if(n < 1)
{
ThreadPostInfo(0,"线程池暂无可用线程供分配:"+AnsiString(n)+"//"+AnsiString(FThreads.size()));
}
}
}
// ---------------------------------------------------------------------------
TWorkList::TWorkList()
{
CS = new TCriticalSection ;
}
TWorkList::~TWorkList()
{
delete CS ;
}
void TThreadWorkList::RunWork(TWork_Thread *pThread,TWork *pWork)
{
pThread->work = pWork;
pThread->Resume();
pWork->IsWorking = true;
}
TWork::TWork() : WorkEnd(true) , IsWorking(false),pOnWorkEnd(NULL),TagWait(0),Work_Thread(NULL)
,pWorkExec(NULL),TimeStart(0),TimeEnd(0) {}
TWork::~TWork(){}
bool TWork::WorkIsEnd(){return WorkEnd && (!IsWorking) ;}// && (TimeStart > 0) && (TimeEnd > 0)
void TWork::SetWorkIsEnd()
{
WorkEnd = true ;
IsWorking = false ;
}
//2.2 temp.h,
bool ThreadIsEnd(TThread *Thread)
{
// 增加判断线程是否退出条件!
DWORD dwExitCode = 0;
if(Thread != NULL)
{
::GetExitCodeThread( (HANDLE)(Thread->Handle), &dwExitCode);
}
return Thread == NULL || dwExitCode != STILL_ACTIVE;
}
void ThreadPostError(AnsiString temp)//这个函数你可以自己改
{
OutputDebugStringA(temp.c_str());
}
void ThreadPostInfo(int n,AnsiString temp1)//这个函数你也可以自己改
{ AnsiString temp=n;
temp=temp+temp1;
OutputDebugStringA(temp.c_str());
}