(原创)C++ 同步队列
同步队列作为一个线程安全的数据共享区,经常用于线程之间数据读取,比如半同步半异步线程池的同步队列。
其实做起来比较简单,要用到list、锁和条件变量,条件变量的作用是在队列满了或者空了的时候等待通知。先看一个简单的同步队列:
#include <thread> #include <condition_variable> #include <mutex> #include <list> #include <iostream> using namespace std; template<typename T> class SimpleSyncQueue { public: void Put(const T& x) { std::lock_guard<std::mutex> locker(m_mutex); m_queue.push_back(x); m_notEmpty.notify_one(); } void Take(T& x) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this]{return !m_queue.empty();}); x = m_queue.front(); m_queue.pop_front(); } private: std::list<T> m_queue; std::mutex m_mutex; std::condition_variable_any m_notEmpty; };
再看一个带上限的同步队列:
#include<list> #include<mutex> #include<thread> #include<condition_variable> #include <iostream> using namespace std; template<typename T> class SyncQueue { public: SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false) { } void Put(const T&x) { Add(x); } void Put(T&&x) { Add(std::forward<T>(x)); } void Take(std::list<T>& list) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); if (m_needStop) return; list = std::move(m_queue); m_notFull.notify_one(); } void Take(T& t) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); if (m_needStop) return; t = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); } void Stop() { { std::lock_guard<std::mutex> locker(m_mutex); m_needStop = true; } m_notFull.notify_all(); m_notEmpty.notify_all(); } bool Empty() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.empty(); } bool Full() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size() == m_maxSize; } size_t Size() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size(); } int Count() { return m_queue.size(); } private: bool NotFull() const { bool full = m_queue.size() >= m_maxSize; if (full) cout << "full, waiting,thread id: " << this_thread::get_id() << endl; return !full; } bool NotEmpty() const { bool empty = m_queue.empty(); if (empty) cout << "empty,waiting,thread id: " << this_thread::get_id() << endl; return !empty; } template<typename F> void Add(F&&x) { std::unique_lock< std::mutex> locker(m_mutex); m_notFull.wait(locker, [this]{return m_needStop || NotFull(); }); if (m_needStop) return; m_queue.push_back(std::forward<F>(x)); m_notEmpty.notify_one(); } private: std::list<T> m_queue; //缓冲区 std::mutex m_mutex; //互斥量和条件变量结合起来使用 std::condition_variable m_notEmpty;//不为空的条件变量 std::condition_variable m_notFull; //没有满的条件变量 int m_maxSize; //同步队列最大的size bool m_needStop; //停止的标志 };
测试代码比较简单,就不写了。实际的运用在后面的线程池中会用到。
一点梦想:尽自己一份力,让c++的世界变得更美好!