出自:http://blog.csdn.net/tian*st/article/details/9335243
在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。
出自:http://www.codeproject.com/Articles/4148/Multithreaded-Job-Queue,主要有以下几个功能:
1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。
2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)
3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去
4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少
大体框架主要由3个类构成
1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务
2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程
3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。
类图如下:
该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:
Job.h文件:
1 class CJob 2 { 3 public: 4 CJob(); 5 virtual ~CJob(); 6 BOOL m_Completed; //任务是否完成:TRUE 完成,FALSE 未完成 7 static long lastUsedID; //最后的ID 8 //================================================================================================ 9 //函数名: setPriority 10 //函数描述: 设置任务优先级 11 //输入: [in] priority 优先级别 12 //输出: 无 13 //返回: 无 14 //================================================================================================ 15 void setPriority(int priority); 16 //================================================================================================ 17 //函数名: getPriority 18 //函数描述: 返回任务优先级 19 //输入: 无 20 //输出: 无 21 //返回: 任务优先级 22 //================================================================================================ 23 int getPriority(); 24 //================================================================================================ 25 //函数名: getID 26 //函数描述: 返回任务ID 27 //输入: 无 28 //输出: 无 29 //返回: 任务ID 30 //================================================================================================ 31 long getID(); 32 //================================================================================================ 33 //函数名: setAutoDelete 34 //函数描述: 设置完成任务后是否删除任务 35 //输入: [in] autoDeleteFlag 36 //输出: 无 37 //返回: 无 38 //================================================================================================ 39 void setAutoDelete(BOOL autoDeleteFlag = TRUE); 40 //================================================================================================ 41 //函数名: AutoDelete 42 //函数描述: 返回删除任务标记 43 //输入: 无 44 //输出: 无 45 //返回: 任务标记 46 //================================================================================================ 47 BOOL AutoDelete(); 48 //================================================================================================ 49 //函数名: execute 50 //函数描述: 任务真正工作的函数,纯虚函数,需要子类化实现 51 //输入: 无 52 //输出: 无 53 //返回: 任务ID 54 //================================================================================================ 55 virtual void execute() = 0; 56 private: 57 long m_ID; //任务ID 58 BOOL m_autoDeleteFlag; //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUE 59 int m_priority; //任务优先级,默认为5 60 };
Job.cpp文件:
1 long CJob::lastUsedID = 0; 2 3 CJob::CJob() 4 { 5 this->m_ID = InterlockedIncrement(&lastUsedID); 6 this->m_autoDeleteFlag = TRUE; 7 this->m_priority = 5; 8 this->m_Completed= FALSE; 9 } 10 CJob::~CJob() 11 { 12 } 13 BOOL CJob::AutoDelete() 14 { 15 return m_autoDeleteFlag; 16 } 17 void CJob::setAutoDelete(BOOL autoDeleteFlag) 18 { 19 m_autoDeleteFlag = autoDeleteFlag; 20 } 21 long CJob::getID() 22 { 23 return this->m_ID; 24 } 25 int CJob::getPriority() 26 { 27 return this->m_priority; 28 } 29 void CJob::setPriority(int priority) 30 { 31 this->m_priority = priority; 32 }
JobExecuter.h文件:
1 //一个对象对应一个线程,执行任务Job 2 class CJobExecuter 3 { 4 public: 5 CJobExecuter(CMThreadedJobQ *pJobQ); 6 virtual ~CJobExecuter(); 7 //================================================================================================ 8 //函数名: stop 9 //函数描述: 停止执行任务 10 //输入: 无 11 //输出: 无 12 //返回: 无 13 //================================================================================================ 14 void stop(); 15 //================================================================================================ 16 //函数名: execute 17 //函数描述: 执行一个任务 18 //输入: [in] pJob 任务指针 19 //输出: 无 20 //返回: 无 21 //================================================================================================ 22 void execute(CJob* pJob); 23 static UINT ThreadFunction(LPVOID pParam); //线程函数 24 CMThreadedJobQ* m_pJobQ; //指向线程任务队列指针 25 CJob* m_pJob2Do; //指向正在执行任务的指针 26 int m_flag; //线程执行标记 27 CWinThread* m_pExecuterThread; //线程标识符 28 };
JobExecuter.cpp文件:
1 #define STOP_WORKING -1 2 #define KEEP_WORKING 0 3 CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ) 4 { 5 this->m_pJobQ= pJobQ; 6 this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this); 7 this->m_pJob2Do = NULL; 8 this->m_flag = KEEP_WORKING; 9 } 10 CJobExecuter::~CJobExecuter() 11 { 12 if(this->m_pExecuterThread!= NULL ) 13 { 14 this->m_pExecuterThread->ExitInstance(); 15 delete m_pExecuterThread; 16 } 17 } 18 UINT CJobExecuter::ThreadFunction(LPVOID pParam) 19 { 20 CJobExecuter *pExecuter = (CJobExecuter *)pParam; 21 pExecuter->m_flag = 1; 22 ::Sleep(1); 23 CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs); 24 while(pExecuter->m_flag !=STOP_WORKING ) 25 { 26 if(pExecuter->m_pJob2Do!= NULL) 27 { 28 pExecuter->m_pJob2Do->execute(); 29 pExecuter->m_pJob2Do->m_Completed = TRUE; 30 if(pExecuter->m_pJob2Do->AutoDelete()) 31 delete pExecuter->m_pJob2Do; 32 pExecuter->m_pJob2Do = NULL; 33 } 34 35 if(pExecuter->m_pJobQ == NULL) break; 36 CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs); 37 singleLock.Lock(); 38 if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁 39 { 40 pExecuter->stop(); 41 singleLock.Unlock(); 42 } 43 else 44 { 45 pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter); //完成任务后,添加到CMThreadedJobQ的空闲队列中 46 singleLock.Unlock(); 47 pExecuter->m_pJobQ->m_pObserverThread->ResumeThread(); 48 pExecuter->m_pExecuterThread->SuspendThread(); 49 } 50 } 51 if(pExecuter->m_pJobQ != NULL) 52 { 53 pExecuter->m_pJobQ->deleteJobExecuter(pExecuter); 54 } 55 else 56 { 57 delete pExecuter; 58 } 59 return 0; 60 } 61 void CJobExecuter::execute(CJob* pJob) 62 { 63 this->m_pJob2Do = pJob; 64 ::Sleep(0); 65 this->m_pExecuterThread->ResumeThread(); 66 } 67 void CJobExecuter::stop() 68 { 69 this->m_flag = STOP_WORKING; 70 this->m_pExecuterThread->ResumeThread(); 71 }
MThreadedJobQ.h文件
1 typedef CTypedPtrList< CPtrList ,CJob*>CJobQList; 2 //线程池任务队列 3 class CMThreadedJobQ 4 { 5 public: 6 typedef struct THNODE 7 { 8 CJobExecuter* pExecuter; 9 THNODE * pNext ; 10 } THNODE; 11 12 CMThreadedJobQ(); 13 virtual ~CMThreadedJobQ(); 14 //================================================================================================ 15 //函数名: deleteJobExecuter 16 //函数描述: 删除一个JobExecuter对象 17 //输入: [in] pEx 18 //输出: 无 19 //返回: 无 20 //================================================================================================ 21 void deleteJobExecuter(CJobExecuter *pEx); 22 //================================================================================================ 23 //函数名: setMaxNoOfExecuter 24 //函数描述: 设置CJobExecuter的个数 25 //输入: [in] value 26 //输出: 无 27 //返回: 无 28 //================================================================================================ 29 void setMaxNoOfExecuter(int value); 30 //================================================================================================ 31 //函数名: addJobExecuter 32 //函数描述: 添加一个CJobExecuter 33 //输入: [in] pEx 34 //输出: 无 35 //返回: 无 36 //================================================================================================ 37 void addJobExecuter(CJobExecuter *pEx); 38 //================================================================================================ 39 //函数名: getJobExecuter 40 //函数描述: 返回一个CJobExecuter 41 //输入: 无 42 //输出: 无 43 //返回: 处理任务的指针 44 //================================================================================================ 45 CJobExecuter* getJobExecuter(); 46 //================================================================================================ 47 //函数名: addFreeJobExecuter 48 //函数描述: 添加一个CJobExecuter 49 //输入: [in] pEx 50 //输出: 无 51 //返回: 无 52 //================================================================================================ 53 void addFreeJobExecuter(CJobExecuter *pEx); 54 //================================================================================================ 55 //函数名: addJob 56 //函数描述: 添加一个任务 57 //输入: [in] pJob 58 //输出: 无 59 //返回: 无 60 //================================================================================================ 61 void addJob(CJob *pJob); 62 //================================================================================================ 63 //函数名: getMaxNoOfExecuter 64 //函数描述: 获取CJobExecuter个数的最大值 65 //输入: 无 66 //输出: 无 67 //返回: 无 68 //================================================================================================ 69 int getMaxNoOfExecuter(); 70 //================================================================================================ 71 //函数名: getNoOfExecuter 72 //函数描述: 获取当前CJobExecuter的个数 73 //输入: 无 74 //输出: 无 75 //返回: 无 76 //================================================================================================ 77 int getNoOfExecuter(); 78 static UINT JobObserverThreadFunction(LPVOID); 79 //================================================================================================ 80 //函数名: pause 81 //函数描述: 挂起JobObserverThread线程 82 //输入: 无 83 //输出: 无 84 //返回: 无 85 //================================================================================================ 86 void pause(); 87 //================================================================================================ 88 //函数名: resume 89 //函数描述: 唤醒JobObserverThread线程 90 //输入: 无 91 //输出: 无 92 //返回: 无 93 //================================================================================================ 94 void resume(); 95 CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程 96 CCriticalSection m_cs; //关键代码段,用于互斥 97 CJobQList m_jobQList; //任务队列 98 private : 99 BOOL m_pause; //JobObserverThread线程运行标记 100 int m_MaxNoOfExecuter; //CJobExecuter最大个数 101 int m_NoOfExecuter; //当前CJobExecuter个数 102 THNODE* m_pFreeEList; //维护空闲处理任务线程的队列 103 THNODE* m_pAllEList; //维护所有处理任务线程的队列 104 };
MThreadedJobQ.cpp文件:
1 CMThreadedJobQ::CMThreadedJobQ() 2 { 3 m_MaxNoOfExecuter = 2; 4 m_pause = FALSE; 5 m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this); 6 m_pFreeEList =NULL; 7 m_NoOfExecuter =0; 8 m_pAllEList = NULL; 9 } 10 CMThreadedJobQ::~CMThreadedJobQ() 11 { 12 THNODE* pTempNode; 13 while (m_pAllEList != NULL) 14 { 15 pTempNode = m_pAllEList->pNext; 16 delete m_pAllEList->pExecuter; 17 delete m_pAllEList; 18 m_pAllEList = pTempNode; 19 } 20 while (m_pFreeEList != NULL) 21 { pTempNode = m_pFreeEList->pNext; 22 delete m_pFreeEList; 23 m_pFreeEList = pTempNode; 24 } 25 26 m_pObserverThread->ExitInstance(); 27 delete m_pObserverThread; 28 } 29 void CMThreadedJobQ::pause() 30 { 31 this->m_pause = TRUE; 32 } 33 void CMThreadedJobQ::resume() 34 { 35 this->m_pause = FALSE; 36 this->m_pObserverThread->ResumeThread(); 37 } 38 UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam) 39 { 40 CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam; 41 CJobExecuter *pJExecuter; 42 while(TRUE) 43 { 44 Sleep(100); 45 if(pMTJQ->m_pause != TRUE) 46 { 47 while(!pMTJQ->m_jobQList.IsEmpty() ) 48 { 49 pJExecuter = pMTJQ->getJobExecuter(); 50 if( pJExecuter!=NULL) 51 { 52 pMTJQ->m_cs.Lock(); 53 pJExecuter->execute(pMTJQ->m_jobQList.GetHead()); 54 pMTJQ->m_jobQList.RemoveHead(); 55 AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST); 56 pMTJQ->m_cs.Unlock(); 57 } 58 else 59 { 60 break; 61 } 62 if(pMTJQ->m_pause == TRUE) 63 break; 64 } 65 } 66 pMTJQ->m_pObserverThread->SuspendThread(); 67 } 68 return 0; 69 } 70 int CMThreadedJobQ::getNoOfExecuter() 71 { 72 return this->m_NoOfExecuter; 73 } 74 75 int CMThreadedJobQ::getMaxNoOfExecuter() 76 { 77 return this->m_MaxNoOfExecuter; 78 } 79 void CMThreadedJobQ::addJob(CJob *pJob) 80 { 81 CJob * pTempJob; 82 CSingleLock sLock(&this->m_cs); 83 sLock.Lock(); 84 POSITION pos,lastPos; 85 pos = this->m_jobQList.GetHeadPosition(); 86 lastPos = pos; 87 if(pos != NULL) 88 pTempJob =this->m_jobQList.GetHead(); 89 while(pos != NULL ) 90 { 91 if( pJob->getPriority() > pTempJob->getPriority()) 92 break; 93 lastPos = pos; 94 pTempJob = this->m_jobQList.GetNext(pos); 95 } 96 if(pos == NULL) 97 this->m_jobQList.AddTail(pJob); 98 else 99 this->m_jobQList.InsertBefore(lastPos,pJob); 100 this->m_pObserverThread->ResumeThread(); 101 sLock.Unlock(); 102 } 103 void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx) 104 { 105 m_cs.Lock(); 106 THNODE* node = new THNODE; 107 node->pExecuter = pEx; 108 node->pNext = this->m_pFreeEList; 109 this->m_pFreeEList = node; 110 m_cs.Unlock(); 111 } 112 CJobExecuter* CMThreadedJobQ::getJobExecuter() 113 { 114 THNODE *pTemp; 115 CJobExecuter *pEx=NULL; 116 m_cs.Lock(); 117 if(this->m_pFreeEList != NULL) //有空闲CJobExecuter,就返回 118 { 119 pTemp = this->m_pFreeEList; 120 this->m_pFreeEList = this->m_pFreeEList->pNext; 121 pEx = pTemp->pExecuter; 122 delete pTemp ; 123 m_cs.Unlock(); 124 return pEx; 125 } 126 if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter 127 { 128 pEx = new CJobExecuter(this); 129 this->addJobExecuter(pEx); 130 this->m_NoOfExecuter++; 131 m_cs.Unlock(); 132 return pEx; 133 } 134 m_cs.Unlock(); 135 return NULL; 136 } 137 void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx) 138 { 139 m_cs.Lock(); 140 THNODE* node = new THNODE; 141 node->pExecuter= pEx; 142 node->pNext = this->m_pAllEList; 143 this->m_pAllEList = node; 144 m_cs.Unlock(); 145 } 146 void CMThreadedJobQ::setMaxNoOfExecuter(int value) 147 { 148 this->m_cs.Lock(); 149 if(value >1 && value <11) 150 this->m_MaxNoOfExecuter = value; 151 m_pObserverThread->ResumeThread(); 152 this->m_cs.Unlock(); 153 } 154 void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx) 155 { 156 THNODE* pNode,*pNodeP; 157 CSingleLock singleLock(&m_cs); 158 singleLock.Lock(); 159 if(this->m_pAllEList != NULL) 160 { 161 pNode = this->m_pAllEList; 162 if(pNode->pExecuter == pEx ) 163 { 164 this->m_pAllEList = pNode->pNext; 165 delete pNode; 166 } 167 else 168 { 169 pNodeP =pNode; 170 pNode = pNode->pNext ; 171 while(pNode != NULL ) 172 { 173 if(pNode->pExecuter== pEx ) break; 174 pNodeP = pNode; 175 pNode = pNode->pNext ; 176 } 177 if(pNode!= NULL) 178 { 179 pNodeP->pNext = pNode->pNext; 180 delete pNode; 181 } 182 } 183 } 184 this->m_NoOfExecuter--; 185 singleLock.Unlock(); 186 pEx->stop(); 187 Sleep(1); 188 delete pEx; 189 }