说明
如果 使用过程中有BUG 一定要告诉我:在下面留言或者给我邮件(sawpara at 126 dot com)
使用boost::thread库来实现生产者消费者模型中的缓冲区!
- 仓库内最多可以存放 capacity 个产品。
- 条件变量 condition_put 标记是否可以往仓库中存放一个产品。
- 条件变量 condition_get 标记是否可以从仓库中取出一个产品。
- 互斥量 mutexer 用于保证当前仓库只有一个线程拥有主权。
实现
#include <queue>
#include "boost/thread/condition.hpp"
#include "boost/thread/mutex.hpp"
template<class Product>
class Repository
{
public:
Repository() : _capacity(2), _emptynum(_capacity){}
Repository(int capacity) : _emptynum(capacity), _capacity(capacity){}
// success : when new_capacity > _capacity - _emptynum
bool set_capacity(int new_capacity){
boost::mutex::scoped_lock lock();
if (_capacity - _emptynum < new_capacity ){
_emptynum = new_capacity - _capacity + _emptynum;
_capacity = new_capacity;
return true;
}
return false;
}
bool empty(){
boost::mutex::scoped_lock lock(_mutexer);
return _is_empty();
}
void put(Product &product){
{
boost::mutex::scoped_lock lock(_mutexer);
while (_is_full()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_put.wait(_mutexer);
}
_products.push(product); // need implement the copy constructor and =
_emptynum--; // decrease empty position
}
_condition_get.notify_one(); // have put one, one another thread can get product now
}
void get(Product &product){
{
// lock this repository
boost::mutex::scoped_lock lock(_mutexer);
while (_is_empty()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_get.wait(_mutexer);
}
product = _products.front(); // need implement the copy constructor and =
_products.pop();
_emptynum++; // increase empty position
}
// have removed one product, one another thread can put product now
_condition_put.notify_one();
}
private:
inline bool _is_full (){ return _emptynum == 0; } // if have empty position
inline bool _is_empty(){ return _emptynum == _capacity; } //
private:
int _capacity; // capacity of this repository
int _emptynum; // number of empty position
std::queue<Product> _products; // products in a FIFO queue
boost::mutex _mutexer; // race access
boost::condition _condition_put;
boost::condition _condition_get;
};
实验
#include <iostream>
#include "boost/thread.hpp"
boost::mutex g_io_mutexer;
boost::mutex g_mutexer;
bool g_is_finished = false;
Repository<int> g_repository(4);
void producing(){
for (int i = 0; i < 100; i++){
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Producing product : " << i << std::endl;
}
g_repository.put(i);
}
boost::mutex::scoped_lock lock(g_mutexer);
g_is_finished = true;
}
void consuming(){
int product;
while (true){
{
boost::mutex::scoped_lock lock(g_mutexer);
if (g_is_finished && g_repository.empty()){
break;
}
}
g_repository.get(product);
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Consuming product : " << product << std::endl;
}
}
}
int main(){
boost::thread producer(producing);
boost::thread consumer_1(consuming);
boost::thread consumer_2(consuming);
producer.join();
consumer_1.join();
consumer_2.join();
return 0;
}
all the code
为方便理解代码,下面的代码中,三个空行一个意群。
#include <iostream>
#include <queue>
#include "boost/thread.hpp"
#include "boost/thread/condition.hpp"
#include "boost/thread/mutex.hpp"
template<class Product>
class Repository
{
public:
Repository() : _capacity(2), _emptynum(_capacity){}
Repository(int capacity) : _emptynum(capacity), _capacity(capacity){}
// success : when new_capacity > _capacity - _emptynum
bool set_capacity(int new_capacity){
boost::mutex::scoped_lock lock();
if (_capacity - _emptynum < new_capacity ){
_emptynum = new_capacity - _capacity + _emptynum;
_capacity = new_capacity;
return true;
}
return false;
}
bool empty(){
boost::mutex::scoped_lock lock(_mutexer);
return _is_empty();
}
void put(Product &product){
{
boost::mutex::scoped_lock lock(_mutexer);
while (_is_full()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_put.wait(_mutexer);
}
_products.push(product); // need implement the copy constructor and =
_emptynum--; // decrease empty position
}
_condition_get.notify_one(); // have put one, one another thread can get product now
}
void get(Product &product){
{
// lock this repository
boost::mutex::scoped_lock lock(_mutexer);
while (_is_empty()){
// unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
_condition_get.wait(_mutexer);
}
product = _products.front(); // need implement the copy constructor and =
_products.pop();
_emptynum++; // increase empty position
}
// have removed one product, one another thread can put product now
_condition_put.notify_one();
}
private:
inline bool _is_full (){ return _emptynum == 0; } // if have empty position
inline bool _is_empty(){ return _emptynum == _capacity; } //
private:
int _capacity; // capacity of this repository
int _emptynum; // number of empty position
std::queue<Product> _products; // products in a FIFO queue
boost::mutex _mutexer; // race access
boost::condition _condition_put;
boost::condition _condition_get;
};
boost::mutex g_io_mutexer;
boost::mutex g_mutexer;
bool g_is_finished = false;
Repository<int> g_repository(4);
void producing(){
for (int i = 0; i < 100; i++){
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Producing product : " << i << std::endl;
}
g_repository.put(i);
}
boost::mutex::scoped_lock lock(g_mutexer);
g_is_finished = true;
}
void consuming(){
int product;
while (true){
{
boost::mutex::scoped_lock lock(g_mutexer);
if (g_is_finished && g_repository.empty()){
break;
}
}
g_repository.get(product);
{
boost::mutex::scoped_lock lock(g_io_mutexer);
std::cout << "Consuming product : " << product << std::endl;
}
}
}
int main(){
boost::thread producer(producing);
boost::thread consumer_1(consuming);
boost::thread consumer_2(consuming);
producer.join();
consumer_1.join();
consumer_2.join();
return 0;
}