std::lock_guard
std::lock_guard是RAII模板类
的简单实现,功能简单。
- std::lock_guard 在构造函数中进行加锁,析构函数中进行解锁。
- 锁在多线程编程中,使用较多,因此c++11提供了lock_guard模板类;在实际编程中,我们也可以根据自己的场景编写
resource_guard
RAII类,避免忘掉释放资源。
std::unique_lock
- 类 unique_lock 是通用互斥包装器,允许
延迟锁定、锁定的有时限尝试、递归锁定、所有权转移和与条件变量一同使用
。 - unique_lock比lock_guard使用更加灵活,功能更加强大。
- 使用unique_lock需要付出更多的时间、性能成本。
std::condition_variable
C++11中引入了条件变量,其相关内容均在<condition_variable>中。这里主要介绍std::condition_variable类。
条件变量std::condition_variable用于多线程之间的通信,它可以阻塞一个或同时阻塞多个线程。std::condition_variable需要与std::unique_lock配合使用。
当std::condition_variable对象的某个wait函数被调用的时候,它使用std::unique_lock(通过std::mutex)来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的std::condition_variable对象上调用了notification函数来唤醒当前线程。
std::condition_variable对象通常使用std::unique_lock<std::mutex>来等待,如果需要使用另外的lockable类型,可以使用std::condition_variable_any类。
需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 4; // Item buffer size.
static const int kItemsToProduce = 10; // How many items we plan to produce.
struct ItemRepository {
int item_buffer[kItemRepositorySize];
size_t read_position;
size_t write_position;
size_t produced_item_counter;
size_t consumed_item_counter;
std::mutex mtx;
std::mutex produced_item_counter_mtx;
std::mutex consumed_item_counter_mtx;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
} gItemRepository;
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
while(((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position) { // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock);
}
(ir->item_buffer)[ir->write_position] = item;
(ir->write_position)++;
if (ir->write_position == kItemRepositorySize)
ir->write_position = 0;
(ir->repo_not_empty).notify_all();
lock.unlock();
}
int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
while(ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock);
}
data = (ir->item_buffer)[ir->read_position];
(ir->read_position)++;
if (ir->read_position >= kItemRepositorySize)
ir->read_position = 0;
(ir->repo_not_full).notify_all();
lock.unlock();
return data;
}
void ProducerTask()
{
bool ready_to_exit = false;
while(1) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx);
if (gItemRepository.produced_item_counter < kItemsToProduce) {
++(gItemRepository.produced_item_counter);
ProduceItem(&gItemRepository, gItemRepository.produced_item_counter);
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is producing the " << gItemRepository.produced_item_counter
<< "^th item" << std::endl;
} else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) break;
}
std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}
void ConsumerTask()
{
bool ready_to_exit = false;
while(1) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
if (gItemRepository.consumed_item_counter < kItemsToProduce) {
int item = ConsumeItem(&gItemRepository);
++(gItemRepository.consumed_item_counter);
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is consuming the " << item << "^th item" << std::endl;
} else ready_to_exit = true;
lock.unlock();
if (ready_to_exit == true) break;
}
std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}
void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0;
ir->read_position = 0;
ir->produced_item_counter = 0;
ir->consumed_item_counter = 0;
}
int main()
{
InitItemRepository(&gItemRepository);
std::thread producer1(ProducerTask);
std::thread producer2(ProducerTask);
std::thread producer3(ProducerTask);
std::thread producer4(ProducerTask);
std::thread consumer1(ConsumerTask);
std::thread consumer2(ConsumerTask);
std::thread consumer3(ConsumerTask);
std::thread consumer4(ConsumerTask);
producer1.join();
producer2.join();
producer3.join();
producer4.join();
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
}