文章目录
c++11多线程
对多线程的支持
我们来看一个稍微复杂一点的例子。
在 C++11 之前,由于 C++98/03 本身缺乏对线程和线程同步原语的支持,我们要写一个生产者消费者逻辑要这么写。
在 Windows 上
代码
/**
* RecvMsgTask.h
*/
class CRecvMsgTask : public CThreadPoolTask
{
public:
CRecvMsgTask(void);
~CRecvMsgTask(void);
public:
virtual int Run();
virtual int Stop();
virtual void TaskFinish();
BOOL AddMsgData(CBuffer* lpMsgData);
private:
BOOL HandleMsg(CBuffer* lpMsg);
private:
HANDLE m_hEvent;
CRITICAL_SECTION m_csItem;
HANDLE m_hSemaphore;
std::vector<CBuffer*> m_arrItem;
};
/**
* RecvMsgTask.cpp
*/
CRecvMsgTask::CRecvMsgTask(void)
{
::InitializeCriticalSection(&m_csItem);
m_hSemaphore = ::CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
m_hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
}
CRecvMsgTask::~CRecvMsgTask(void)
{
::DeleteCriticalSection(&m_csItem);
if (m_hSemaphore != NULL)
{
::CloseHandle(m_hSemaphore);
m_hSemaphore = NULL;
}
if (m_hEvent != NULL)
{
::CloseHandle(m_hEvent);
m_hEvent = NULL;
}
}
int CRecvMsgTask::Run()
{
HANDLE hWaitEvent[2];
DWORD dwIndex;
CBuffer * lpMsg;
hWaitEvent[0] = m_hEvent;
hWaitEvent[1] = m_hSemaphore;
while (1)
{
dwIndex = ::WaitForMultipleObjects(2, hWaitEvent, FALSE, INFINITE);
if (dwIndex == WAIT_OBJECT_0)
break;
lpMsg = NULL;
::EnterCriticalSection(&m_csItem);
if (m_arrItem.size() > 0)
{
//消费者从队列m_arrItem中取出任务执行
lpMsg = m_arrItem[0];
m_arrItem.erase(m_arrItem.begin() + 0);
}
::LeaveCriticalSection(&m_csItem);
if (NULL == lpMsg)
continue;
//处理任务
HandleMsg(lpMsg);
delete lpMsg;
}
return 0;
}
int CRecvMsgTask::Stop()
{
m_HttpClient.SetCancalEvent();
::SetEvent(m_hEvent);
return 0;
}
void CRecvMsgTask::TaskFinish()
{
}
//生产者调用这个方法将Task放入队列m_arrItem中
BOOL CRecvMsgTask::AddMsgData(CBuffer * lpMsgData)
{
if (NULL == lpMsgData)
return FALSE;
::EnterCriticalSection(&m_csItem);
m_arrItem.push_back(lpMsgData);
::LeaveCriticalSection(&m_csItem);
::ReleaseSemaphore(m_hSemaphore, 1, NULL);
return TRUE;
}
在 Linux 下
代码
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <list>
#include <semaphore.h>
#include <iostream>
class Task
{
public:
Task(int taskID)
{
this->taskID = taskID;
}
void doTask()
{
std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
}
private:
int taskID;
};
pthread_mutex_t mymutex;
std::list<Task*> tasks;
pthread_cond_t mycv;
void* consumer_thread(void* param)
{
Task* pTask = NULL;
while (true)
{
pthread_mutex_lock(&mymutex);
while (tasks.empty())
{
//如果获得了互斥锁,但是条件不合适的话,pthread_cond_wait会释放锁,不往下执行。
//当发生变化后,条件合适,pthread_cond_wait将直接获得锁。
pthread_cond_wait(&mycv, &mymutex);
}
pTask = tasks.front();
tasks.pop_front();
pthread_mutex_unlock(&mymutex);
if (pTask == NULL)
continue;
pTask->doTask();
delete pTask;
pTask = NULL;
}
return NULL;
}
void* producer_thread(void* param)
{
int taskID = 0;
Task* pTask = NULL;
while (true)
{
pTask = new Task(taskID);
pthread_mutex_lock(&mymutex);
tasks.push_back(pTask);
std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
pthread_mutex_unlock(&mymutex);
//释放信号量,通知消费者线程
pthread_cond_signal(&mycv);
taskID ++;
//休眠1秒
sleep(1);
}
return NULL;
}
int main()
{
pthread_mutex_init(&mymutex, NULL);
pthread_cond_init(&mycv, NULL);
//创建5个消费者线程
pthread_t consumerThreadID[5];
for (int i = 0; i < 5; ++i)
pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
//创建一个生产者线程
pthread_t producerThreadID;
pthread_create(&producerThreadID, NULL, producer_thread, NULL);
pthread_join(producerThreadID, NULL);
for (int i = 0; i < 5; ++i)
pthread_join(consumerThreadID[i], NULL);
pthread_cond_destroy(&mycv);
pthread_mutex_destroy(&mymutex);
return 0;
}
怎么样?上述代码如果对于新手来说,望而却步。
为了实现这样的功能在 Windows 上你需要掌握线程如何创建、线程同步对象 CriticalSection、Event、Semaphore、WaitForSingleObject/WaitForMultipleObjects 等操作系统对象和 API。
在 Linux 上需要掌握线程创建,你需要了解线程创建、互斥体、条件变量。
对于需要支持多个平台的开发,需要开发者同时熟悉上述原理并编写多套适用不同平台的代码。
C++11 的线程库改变了这个现状,现在你只需要掌握 std::thread、std::mutex、std::condition_variable 少数几个线程同步对象即可,同时使用这些对象编写出来的代码也可以跨平台。示例如下:
c++11多线程
代码
#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>
#include <iostream>
class Task
{
public:
Task(int taskID)
{
this->taskID = taskID;
}
void doTask()
{
std::cout << "handle a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl;
}
private:
int taskID;
};
std::mutex mymutex;
std::list<Task*> tasks;
std::condition_variable mycv;
void* consumer_thread()
{
Task* pTask = NULL;
while (true)
{
std::unique_lock<std::mutex> guard(mymutex);
while (tasks.empty())
{
//如果获得了互斥锁,但是条件不合适的话,pthread_cond_wait会释放锁,不往下执行。
//当发生变化后,条件合适,pthread_cond_wait将直接获得锁。
mycv.wait(guard);
}
pTask = tasks.front();
tasks.pop_front();
if (pTask == NULL)
continue;
pTask->doTask();
delete pTask;
pTask = NULL;
}
return NULL;
}
void* producer_thread()
{
int taskID = 0;
Task* pTask = NULL;
while (true)
{
pTask = new Task(taskID);
//使用括号减小guard锁的作用范围
{
std::lock_guard<std::mutex> guard(mymutex);
tasks.push_back(pTask);
std::cout << "produce a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl;
}
//释放信号量,通知消费者线程
mycv.notify_one();
taskID ++;
//休眠1秒
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return NULL;
}
int main()
{
//创建5个消费者线程
std::thread consumer1(consumer_thread);
std::thread consumer2(consumer_thread);
std::thread consumer3(consumer_thread);
std::thread consumer4(consumer_thread);
std::thread consumer5(consumer_thread);
//创建一个生产者线程
std::thread producer(producer_thread);
producer.join();
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
consumer5.join();
return 0;
}