参考文章:http://blog.csdn.net/huyiyang2010/archive/2010/08/10/5801597.aspx
// CThread.h #ifndef __MY_THREAD_H__
#define __MY_THREAD_H__ #include <windows.h>
#include <process.h> // 线程执行接口
class CRunnable
{
public:
CRunnable(){}
virtual ~CRunnable() {}
virtual void Run() = ;
}; class CThread : public CRunnable
{
enum
{
enmMaxThreadNameLen = ,
};
private:
explicit CThread(const CThread & rhs);
public:
CThread();
CThread(CRunnable * pRunnable); ~CThread(void);
bool Start();
virtual void Run();
void Join(int timeout = -);
void Resume();
void Suspend();
bool Terminate(unsigned long ExitCode);
void dump();
unsigned int GetThreadID();
friend bool operator==(const CThread& left,const CThread& right);
private:
static unsigned int WINAPI StaticThreadFunc(void * arg);
void init();
private:
HANDLE m_hHandle;
CRunnable * const m_pRunnable;
unsigned int m_nThreadID;
volatile bool m_bRun;
}; #endif
#include "CThread.h"
#include <iostream>
using namespace std; CThread::CThread(void):m_pRunnable(NULL)
{
init();
} CThread::CThread(CRunnable * pRunnable):m_pRunnable(pRunnable)
{
init();
} CThread::~CThread(void)
{
if(m_pRunnable != NULL )
{
delete m_pRunnable;
}
} void CThread::init()
{
m_bRun = false;
m_hHandle = (HANDLE)_beginthreadex(NULL, , StaticThreadFunc, this, CREATE_SUSPENDED, &m_nThreadID);
} bool CThread::Start()
{
Resume();
return (NULL != m_pRunnable);
} void CThread::Run()
{
if(NULL != m_pRunnable)
{
m_pRunnable->Run();
if(m_pRunnable != NULL)
{
delete m_pRunnable;
}
}
} void CThread::Join(int timeout)
{
if(NULL == m_hHandle || m_bRun == false )
{
return;
}
if(timeout <= )
{
timeout = INFINITE;
}
::WaitForSingleObject(m_hHandle, timeout);
} void CThread::Resume()
{
if( NULL == m_hHandle || m_bRun == true )
{
return;
}
m_bRun = true;
::ResumeThread(m_hHandle);
} void CThread::Suspend()
{
if(NULL == m_hHandle || m_bRun == false )
{
return;
}
::SuspendThread(m_hHandle);
} bool CThread::Terminate(unsigned long ExitCode)
{
if(NULL == m_hHandle || m_bRun == false )
{
return true;
}
if(::TerminateThread(m_hHandle, ExitCode))
{
::CloseHandle(m_hHandle);
return true;
}
return false;
} unsigned int CThread::GetThreadID()
{
return m_nThreadID;
} unsigned int CThread::StaticThreadFunc(void * arg)
{
CThread * pThread = (CThread *)arg;
pThread->Run();
pThread->m_bRun = false;
return ;
} void CThread::dump()
{
cout<<"run="<<m_bRun<<",id="<<m_nThreadID<<endl;
}
bool operator==(const CThread& left,const CThread& right)
{
return left.m_nThreadID == right.m_nThreadID;
}
#ifndef _STATIC_QUEUE_H_
#define _STATIC_QUEUE_H_ // 静态queue模板,用数组实现的队列,在初始化的时候需要指定长度
template<class T>
class Static_Queue{
public:
Static_Queue();
virtual ~Static_Queue();
bool init(const unsigned int size);
bool push(T t);
bool pop(T & t);
unsigned int GetElementCnt();
void dump();
private:
volatile unsigned int m_nSize;
T *m_arrT;
volatile unsigned int m_nHead;
volatile unsigned int m_nTail;
volatile unsigned int m_nEmptyCnt;
}; template<class T>
Static_Queue<T>::Static_Queue()
{
m_nSize = ;
m_arrT = NULL;
m_nEmptyCnt = ;
m_nHead = ;
m_nTail = ;
} template<class T>
Static_Queue<T>::~Static_Queue()
{
delete[] m_arrT;
m_arrT = NULL;
} template<class T>
bool Static_Queue<T>::init(const unsigned int size)
{
m_nSize = size;
m_arrT = new T[m_nSize];
if(m_arrT == NULL)
{
return false;
}
memset(m_arrT,,sizeof(T)*m_nSize);
m_nEmptyCnt = m_nSize;
return true;
} template<class T>
bool Static_Queue<T>::push(T t)
{
if( m_nEmptyCnt <= )
{
return false;
}
m_arrT[m_nTail++] = t;
if(m_nTail >= m_nSize)
{
m_nTail = ;
}
m_nEmptyCnt--;
return true;
} template<class T>
bool Static_Queue<T>::pop(T & t)
{
if( m_nEmptyCnt >= m_nSize )
{
return false;
}
t = m_arrT[m_nHead++];
if( m_nHead >= m_nSize )
{
m_nHead = ;
}
m_nEmptyCnt++;
return true;
} template<class T>
unsigned int Static_Queue<T>::GetElementCnt()
{
return m_nSize - m_nEmptyCnt;
} template<class T>
void Static_Queue<T>::dump()
{
cout<<"head= "<<m_nHead<<" "<<"tail= "<<m_nTail<<endl;
cout<<"[";
for(int i = ;i < m_nSize;i++ )
{
cout<<m_arrT[i]<<" ";
}
cout<<"]"<<endl;
} #endif
#ifndef _MY_HASH_INT_H_
#define _MY_HASH_INT_H_ template<class T,class K>
class HashInt{
public:
HashInt();
virtual ~HashInt();
private:
typedef struct tagElement
{
T data;
K key;
bool use;
tagElement(){use = false;}
~tagElement(){}
}Element;
volatile unsigned int m_nSize;
Element *m_arrT;
volatile unsigned int m_nElementCnt;
// 查找
bool find(K key,unsigned int &index);
public:
// 初始化,分配内存
bool init(const unsigned int size);
// 哈希函数
unsigned int hash(K key);
// ELF哈希函数
unsigned int hash_elf(char *str);
// 插入
bool insert(T data,K key);
// 删除
bool remove(K key);
// 查找
bool find(K key,T &data);
// 修改
bool modify(T data,K key);
// 元素个数
unsigned int get_element_cnt();
// 下标索引
bool get_by_index(unsigned int index,T &data);
void dump();
}; template<class T,class K>
bool HashInt<T, K>::get_by_index( unsigned int index,T &data )
{
if( m_nElementCnt == )
{
return false;
}
if(m_arrT[index].use == false)
{
return false;
}
data = m_arrT[index].data;
return true;
} template<class T,class K>
unsigned int HashInt<T, K>::get_element_cnt()
{
return m_nElementCnt;
} template<class T,class K>
unsigned int HashInt<T, K>::hash_elf( char *str)
{
unsigned int locate = ;
unsigned int x = ;
while (*str)
{
locate = (locate << ) + (*str++);//hash左移4位,当前字符ASCII存入hash低四位。
if ((x = locate & 0xF0000000L) != )
{//如果最高的四位不为0,则说明字符多余7个,如果不处理,再加第九个字符时,第一个字符会被移出,因此要有如下处理。
locate ^= (x >> );
//清空28-31位。
locate &= ~x;
}
}
return locate%m_nSize;
} template<class T,class K>
HashInt<T, K>::~HashInt()
{
if(m_arrT != NULL)
{
delete[] m_arrT;
}
} template<class T,class K>
HashInt<T, K>::HashInt()
{
m_arrT = NULL;
m_nSize = ;
m_nElementCnt = ;
} template<class T,class K>
void HashInt<T, K>::dump()
{
cout<<"m_nElementCnt="<<m_nElementCnt<<",m_nSize="<<m_nSize<<endl;
for(unsigned int i = ;i < m_nSize;i++)
{
if(m_arrT[i].use == true)
{
cout<<i<<"-";
cout<<"["<<m_arrT[i].key<<"="<<m_arrT[i].data<<"] ";
}
}
cout<<endl;
} template<class T,class K>
bool HashInt<T, K>::modify( T data,K key )
{
if( m_nElementCnt == )
{
return false;
}
bool exist = false;
unsigned int index;
exist = find(key,index);
if( exist == true )
{
m_arrT[index].data = data;
}
return false;
} template<class T,class K>
bool HashInt<T, K>::find( K key,T &data )
{
if( m_nElementCnt == )
{
return false;
}
bool exist = false;
unsigned int index;
exist = find(key,index);
if( exist == true )
{
data = m_arrT[index].data;
return true;
}
return false;
} template<class T,class K>
bool HashInt<T, K>::find( K key,unsigned int &index )
{
if( m_nElementCnt == )
{
return false;
}
unsigned int locate = hash(key),i = ;
while(i < m_nSize)
{
if( m_arrT[locate].use == true && m_arrT[locate].key == key)
{
index = locate;
return true;
}
locate = (locate + i)%m_nSize;
i++;
}
return false;
} template<class T,class K>
bool HashInt<T, K>::remove( K key )
{
// 表为空
if( m_nElementCnt == )
{
return false;
}
bool exist = false;
unsigned int index;
exist = find(key,index);
if( exist == true )
{
m_arrT[index].use = false;
m_nElementCnt--;
return true;
}
return false;
} template<class T,class K>
bool HashInt<T, K>::insert( T data,K key)
{
// 表已满
if( m_nElementCnt == m_nSize )
{
return false;
}
unsigned int locate = hash(key),i = ;
while(i < m_nSize)
{
if( m_arrT[locate].use == false)
{
m_arrT[locate].data = data;
m_arrT[locate].key = key;
m_arrT[locate].use = true;
m_nElementCnt++;
return true;
}
locate = (locate + i)%m_nSize;
i++;
}
return false;
} template<class T,class K>
unsigned int HashInt<T, K>::hash( K key )
{
return key%m_nSize;
} template<class T,class K>
bool HashInt<T, K>::init( const unsigned int size )
{
m_nSize = size;
m_arrT = new Element[m_nSize];
m_nElementCnt = ;
//cout<<"size = "<<sizeof(Element)*m_nSize<<endl;
return true;
} #endif
#ifndef __MY_MY_THREAD_POOL_H_
#define __MY_MY_THREAD_POOL_H_ #include "CThread.h"
#include <windows.h>
#include "StaticQueue.h"
#include "HashInt.h" using namespace std; class CMyThreadPool
{
public:
CMyThreadPool(void);
virtual ~CMyThreadPool(void);
// 初始化线程池,创建minThreads个线程
bool Initialize(unsigned int minThreadCnt,unsigned int maxThreadCnt,unsigned int maxTaskQueueLength);
bool AddTask( CRunnable *pRunnable, bool bRun = true);
void Terminate();
// 获取线程数量
unsigned int GetThreadCnt();
private:
// 从任务队列头中取出一个任务
CRunnable *GetTask();
// 执行任务线程
static unsigned int WINAPI StaticThreadFunc(void * arg);
private:
// 工作者类
class CWorker : public CThread
{
public:
CWorker(CMyThreadPool *pThreadPool,CRunnable *pFirstTask = NULL);
~CWorker();
void Run();
private:
CMyThreadPool * const m_pThreadPool;
CRunnable * m_pFirstTask;
volatile bool m_bRun;
}; typedef HashInt<CWorker*,unsigned int> ThreadPool;
typedef Static_Queue<CRunnable *> Tasks; CRITICAL_SECTION m_csTasksLock;
CRITICAL_SECTION m_csThreadPoolLock; // 线程池
ThreadPool m_ThreadPool;
// 垃圾线程
ThreadPool m_TrashThread;
// 任务队列
Tasks m_Tasks;
// 是否在运行
volatile bool m_bRun;
// 能否插入任务
volatile bool m_bEnableInsertTask;
// 最小线程数
volatile unsigned int m_minThreads;
// 最大线程数
volatile unsigned int m_maxThreads;
// 最大挂起任务数量
volatile unsigned int m_maxPendingTasks;
}; #endif
#include "CMyThreadPool.h"
#include <iostream>
#include <time.h> using namespace std; CMyThreadPool::CWorker::CWorker(CMyThreadPool *pThreadPool,CRunnable *pFirstTask)
:m_pThreadPool(pThreadPool),m_pFirstTask(pFirstTask),m_bRun(true)
{ } CMyThreadPool::CWorker::~CWorker()
{ } void CMyThreadPool::CWorker::Run()
{
CRunnable * pTask = NULL;
while(m_bRun)
{
// 取出一个任务
if(NULL == m_pFirstTask)
{
pTask = m_pThreadPool->GetTask();
}
else
{
pTask = m_pFirstTask;
m_pFirstTask = NULL;
}
// 如果没有取到任务
if(NULL == pTask)
{
EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
// 如果运转的线程数大于最小线程数,需要清除多余的线程
if(m_pThreadPool->GetThreadCnt() > m_pThreadPool->m_minThreads)
{
CWorker *p = NULL;
unsigned int thread_id = this->GetThreadID();
if(m_pThreadPool->m_ThreadPool.find(thread_id,p) == true)
{
if(p == NULL)
{
cout<<"find failed,p == NULL,thread_id = "<<thread_id<<endl;
continue;
}
m_pThreadPool->m_ThreadPool.remove(thread_id);
m_pThreadPool->m_TrashThread.insert(p,thread_id);
}
else
{
cout<<"find by thread_id failed,thread_id = "<<thread_id<<endl;
}
m_bRun = false;
}
else
{
// 等待已经开始运行的线程结束
for(unsigned int i = ;i < m_pThreadPool->m_TrashThread.get_element_cnt();i++)
{
CWorker *p = NULL;
if(m_pThreadPool->m_TrashThread.get_by_index(i,p) == true)
{
if(p == NULL)
{
cout<<"get_by_index failed,p == NULL"<<endl;
continue;
}
p->Join();
m_pThreadPool->m_TrashThread.remove(p->GetThreadID());
delete p;
}
}
}
LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
continue;
}
else
{
pTask->Run();
delete pTask;
pTask = NULL;
}
}
} CMyThreadPool::CMyThreadPool(void):m_bRun(false),m_bEnableInsertTask(false)
{
InitializeCriticalSection(&m_csTasksLock);
InitializeCriticalSection(&m_csThreadPoolLock);
} CMyThreadPool::~CMyThreadPool(void)
{
DeleteCriticalSection(&m_csTasksLock);
DeleteCriticalSection(&m_csThreadPoolLock);
} bool CMyThreadPool::Initialize(unsigned int minThreadCnt,unsigned int maxThreadCnt,unsigned int maxTaskQueueLength)
{
if(minThreadCnt == )
{
return false;
}
if(minThreadCnt > maxThreadCnt)
{
return false;
}
if(m_ThreadPool.init(maxThreadCnt) == false
|| m_TrashThread.init(maxThreadCnt) == false
|| m_Tasks.init(maxTaskQueueLength) == false)
{
return false;
}
m_minThreads = minThreadCnt;
m_maxThreads = maxThreadCnt;
m_maxPendingTasks = maxTaskQueueLength;
unsigned int i = m_ThreadPool.get_element_cnt();
for(; i<minThreadCnt; i++)
{
//创建线程 minThreadCnt 个线程
CWorker * pWorker = new CWorker(this);
if(NULL == pWorker)
{
return false;
}
EnterCriticalSection(&m_csThreadPoolLock);
if(m_ThreadPool.insert(pWorker,pWorker->GetThreadID()) == false)
{
return false;
}
LeaveCriticalSection(&m_csThreadPoolLock);
pWorker->Start();
}
// 可以开始插入任务队列
m_bRun = true;
m_bEnableInsertTask = true;
return true;
} unsigned int CMyThreadPool::GetThreadCnt()
{
return m_ThreadPool.get_element_cnt();
} CRunnable * CMyThreadPool::GetTask()
{
CRunnable *Task = NULL;
EnterCriticalSection(&m_csTasksLock);
if( m_Tasks.GetElementCnt() != )
{
m_Tasks.pop(Task);
}
LeaveCriticalSection(&m_csTasksLock);
return Task;
} bool CMyThreadPool::AddTask( CRunnable *pRunnable, bool bRun /*= true*/ )
{
if(m_bEnableInsertTask == false)
{
return false;
}
if(NULL == pRunnable)
{
return false;
}
// 如果达到最大挂起任务数量
if(m_Tasks.GetElementCnt() >= m_maxPendingTasks)
{
// 如果小于最大线程数
if(m_ThreadPool.get_element_cnt() < m_maxThreads)
{
CWorker * pWorker = new CWorker(this, pRunnable);
if(NULL == pWorker)
{
cout<<"error:CWorker * pWorker = new CWorker(this, pRunnable)"<<endl;
return false;
}
EnterCriticalSection(&m_csThreadPoolLock);
if(m_ThreadPool.insert(pWorker,pWorker->GetThreadID()) == false)
{
cout<<"error:m_ThreadPool.insert(pWorker,pWorker->GetThreadID(),id"<<pWorker->GetThreadID()<<endl;
return false;
}
LeaveCriticalSection(&m_csThreadPoolLock);
pWorker->Start();
}
else
{
return false;
}
}
else
{
EnterCriticalSection(&m_csTasksLock);
if( m_Tasks.push(pRunnable) == false )
{
cout<<"error:m_Tasks.push(pRunnable) == false"<<endl;
return false;
}
LeaveCriticalSection(&m_csTasksLock);
}
//m_ThreadPool.dump();
return true;
} void CMyThreadPool::Terminate()
{
m_bEnableInsertTask = false;
while(m_Tasks.GetElementCnt() > )
{
Sleep();
}
m_bRun = false;
m_minThreads = ;
m_maxThreads = ;
m_maxPendingTasks = ;
while(m_ThreadPool.get_element_cnt() > )
{
Sleep();
}
EnterCriticalSection(&m_csThreadPoolLock);
for(unsigned int i = ;i < m_TrashThread.get_element_cnt();i++)
{
CWorker *p = NULL;
if(m_TrashThread.get_by_index(i,p) == true)
{
if(p == NULL)
{
cout<<"get_by_index failed,p == NULL"<<endl;
return;
}
p->Join();
m_TrashThread.remove(p->GetThreadID());
}
}
LeaveCriticalSection(&m_csThreadPoolLock);
}
#include <iostream>
#include <time.h>
#include <iomanip>
#include "CThread.h"
#include "CThreadPool.h"
#include "CMyThreadPool.h" using namespace std; class R : public CRunnable
{
public:
R(int t):m_nt(t)
{
}
~R()
{
cout<<"~R:"<<m_nt<<endl;
}
void Run()
{
Sleep(m_nt);
cout<<"i am "<<m_nt<<endl;
}
friend ostream &operator<<(ostream &out,const R &r)
{
out<<r.m_nt;
return out;
}
friend bool operator < (const R &l,const R &r)
{
return l < r;
}
void display()
{
cout<<m_nt<<" ";
}
int m_nt;
}; int main()
{
R *r1 = new R();
CMyThreadPool *myThreadPool = new CMyThreadPool();
if(myThreadPool->Initialize(,,) == false)
{
cout<<"inti failed!"<<endl;
return ;
}
for (int i = ;i < ;i++)
{
R *r = new R( -i);
if(myThreadPool->AddTask(r) == false)
{
cout<<"add task failed"<<endl;
}
}
myThreadPool->Terminate();
delete myThreadPool;
system("pause");
return ;
}