c++11多线程

文章目录


c++11多线程

对多线程的支持

我们来看一个稍微复杂一点的例子。

在 C++11 之前,由于 C++98/03 本身缺乏对线程和线程同步原语的支持,我们要写一个生产者消费者逻辑要这么写。

在 Windows 上

代码

 /**
  * RecvMsgTask.h
  */
 class CRecvMsgTask : public CThreadPoolTask
 {
 public:
     CRecvMsgTask(void);
     ~CRecvMsgTask(void);
 
 public:
     virtual int Run();
     virtual int Stop();
     virtual void TaskFinish();
 
     BOOL AddMsgData(CBuffer* lpMsgData);
 
 private:
     BOOL HandleMsg(CBuffer* lpMsg);
 
 private:
     HANDLE                m_hEvent;
     CRITICAL_SECTION      m_csItem;
     HANDLE                m_hSemaphore;
     std::vector<CBuffer*> m_arrItem;
 };
 
 /**
  * RecvMsgTask.cpp
  */
 CRecvMsgTask::CRecvMsgTask(void)
 {
     ::InitializeCriticalSection(&m_csItem);
     m_hSemaphore = ::CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
     m_hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
 }
 
 CRecvMsgTask::~CRecvMsgTask(void)
 {
     ::DeleteCriticalSection(&m_csItem);
 
     if (m_hSemaphore != NULL)
     {
         ::CloseHandle(m_hSemaphore);
         m_hSemaphore = NULL;
     }
 
     if (m_hEvent != NULL)
     {
         ::CloseHandle(m_hEvent);
         m_hEvent = NULL;
     }
 }
 
 int CRecvMsgTask::Run()
 {
     HANDLE hWaitEvent[2];
     DWORD dwIndex;
     CBuffer * lpMsg;
 
     hWaitEvent[0] = m_hEvent;
     hWaitEvent[1] = m_hSemaphore;
 
     while (1)
     {
         dwIndex = ::WaitForMultipleObjects(2, hWaitEvent, FALSE, INFINITE);
 
         if (dwIndex == WAIT_OBJECT_0)
             break;
 
         lpMsg = NULL;
 
         ::EnterCriticalSection(&m_csItem);
         if (m_arrItem.size() > 0)
         {
             //消费者从队列m_arrItem中取出任务执行
             lpMsg = m_arrItem[0];
             m_arrItem.erase(m_arrItem.begin() + 0);
         }
         ::LeaveCriticalSection(&m_csItem);
 
         if (NULL == lpMsg)
             continue;
 
         //处理任务
         HandleMsg(lpMsg);
 
         delete lpMsg;
     }
 
     return 0;
 }
 
 int CRecvMsgTask::Stop()
 {
     m_HttpClient.SetCancalEvent();
     ::SetEvent(m_hEvent);
     return 0;
 }
 
 void CRecvMsgTask::TaskFinish()
 {
 }
 
 //生产者调用这个方法将Task放入队列m_arrItem中
 BOOL CRecvMsgTask::AddMsgData(CBuffer * lpMsgData)
 {
     if (NULL == lpMsgData)
         return FALSE;
 
     ::EnterCriticalSection(&m_csItem);
     m_arrItem.push_back(lpMsgData);
     ::LeaveCriticalSection(&m_csItem);
 
     ::ReleaseSemaphore(m_hSemaphore, 1, NULL);
 
     return TRUE;
 }

在 Linux 下

代码

#include <pthread.h>
 #include <errno.h>
 #include <unistd.h>
 #include <list>
 #include <semaphore.h>
 #include <iostream>
 
 class Task
 {
 public:
     Task(int taskID)
     {
         this->taskID = taskID;
     }
     
     void doTask()
     {
         std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; 
     }
     
 private:
     int taskID;
 };
 
 pthread_mutex_t  mymutex;
 std::list<Task*> tasks;
 pthread_cond_t   mycv;
 
 void* consumer_thread(void* param)
 {   
     Task* pTask = NULL;
     while (true)
     {
         pthread_mutex_lock(&mymutex);
         while (tasks.empty())
         {               
             //如果获得了互斥锁,但是条件不合适的话,pthread_cond_wait会释放锁,不往下执行。
             //当发生变化后,条件合适,pthread_cond_wait将直接获得锁。
             pthread_cond_wait(&mycv, &mymutex);
         }
         
         pTask = tasks.front();
         tasks.pop_front();
 
         pthread_mutex_unlock(&mymutex);
         
         if (pTask == NULL)
             continue;
 
         pTask->doTask();
         delete pTask;
         pTask = NULL;       
     }
     
     return NULL;
 }
 
 void* producer_thread(void* param)
 {
     int taskID = 0;
     Task* pTask = NULL;
     
     while (true)
     {
         pTask = new Task(taskID);
             
         pthread_mutex_lock(&mymutex);
         tasks.push_back(pTask);
         std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; 
         
         pthread_mutex_unlock(&mymutex);
         
         //释放信号量,通知消费者线程
         pthread_cond_signal(&mycv);
         
         taskID ++;
 
         //休眠1秒
         sleep(1);
     }
     
     return NULL;
 }
 
 int main()
 {
     pthread_mutex_init(&mymutex, NULL);
     pthread_cond_init(&mycv, NULL);
 
     //创建5个消费者线程
     pthread_t consumerThreadID[5];
     for (int i = 0; i < 5; ++i)
         pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
     
     //创建一个生产者线程
     pthread_t producerThreadID;
     pthread_create(&producerThreadID, NULL, producer_thread, NULL);
 
     pthread_join(producerThreadID, NULL);
     
     for (int i = 0; i < 5; ++i)
         pthread_join(consumerThreadID[i], NULL);
     
     pthread_cond_destroy(&mycv);
     pthread_mutex_destroy(&mymutex);
 
     return 0;
 }

怎么样?上述代码如果对于新手来说,望而却步。

为了实现这样的功能在 Windows 上你需要掌握线程如何创建、线程同步对象 CriticalSection、Event、Semaphore、WaitForSingleObject/WaitForMultipleObjects 等操作系统对象和 API。

在 Linux 上需要掌握线程创建,你需要了解线程创建、互斥体、条件变量。

对于需要支持多个平台的开发,需要开发者同时熟悉上述原理并编写多套适用不同平台的代码。

C++11 的线程库改变了这个现状,现在你只需要掌握 std::thread、std::mutex、std::condition_variable 少数几个线程同步对象即可,同时使用这些对象编写出来的代码也可以跨平台。示例如下:

c++11多线程

代码

#include <thread>
 #include <mutex>
 #include <condition_variable>
 #include <list>
 #include <iostream>
 
 class Task
 {
 public:
     Task(int taskID)
     {
         this->taskID = taskID;
     }
     
     void doTask()
     {
         std::cout << "handle a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl; 
     }
     
 private:
     int taskID;
 };
 
 std::mutex                mymutex;
 std::list<Task*>          tasks;
 std::condition_variable   mycv;
 
 void* consumer_thread()
 {   
     Task* pTask = NULL;
     while (true)
     {
         std::unique_lock<std::mutex> guard(mymutex);
         while (tasks.empty())
         {               
             //如果获得了互斥锁,但是条件不合适的话,pthread_cond_wait会释放锁,不往下执行。
             //当发生变化后,条件合适,pthread_cond_wait将直接获得锁。
             mycv.wait(guard);
         }
         
         pTask = tasks.front();
         tasks.pop_front();
         
         if (pTask == NULL)
             continue;
 
         pTask->doTask();
         delete pTask;
         pTask = NULL;       
     }
     
     return NULL;
 }
 
 void* producer_thread()
 {
     int taskID = 0;
     Task* pTask = NULL;
     
     while (true)
     {
         pTask = new Task(taskID);
             
         //使用括号减小guard锁的作用范围
         {
             std::lock_guard<std::mutex> guard(mymutex);
             tasks.push_back(pTask);
             std::cout << "produce a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl; 
         }
         
         //释放信号量,通知消费者线程
         mycv.notify_one();
         
         taskID ++;
 
         //休眠1秒
         std::this_thread::sleep_for(std::chrono::seconds(1));
     }
     
     return NULL;
 }
 
 int main()
 {
     //创建5个消费者线程
     std::thread consumer1(consumer_thread);
     std::thread consumer2(consumer_thread);
     std::thread consumer3(consumer_thread);
     std::thread consumer4(consumer_thread);
     std::thread consumer5(consumer_thread);
     
     //创建一个生产者线程
     std::thread producer(producer_thread);
 
     producer.join();
     consumer1.join();
     consumer2.join();
     consumer3.join();
     consumer4.join();
     consumer5.join();
 
     return 0;
 }

C++并发编程实战

参考

上一篇:linux线程


下一篇:C++同步锁笔记