消息队列与无锁队列实现

1 解决问题

消息队列在服务器中的位置,一般用于生产消费者模式的,分类两种情况一种非线程安全的,一种在多线程下使用的。例如
线程池中处理任务队列,任务队列就是消息队列。如下图
消息队列与无锁队列实现
当多个线程同时操作一个任务队列或者消息队列或同一临界资源的时候,就可能会遇到线程切换的问题。而对于线程切换是一个开销很大的操作,故就产生了无锁队列的需求。

2 消息队列分类

非线程安全:例如stl中的list
线程安全:可以采用锁和无锁两种方式实现。

线程安全的消息队列:
锁实现方式: 互斥锁,互斥尝试锁,自旋锁,读写锁
无锁方式: 原子操作(__asm__汇编自己实现,gcc提供api实现
开源组件: zeromq(单读单写),ArrayLockFreeQueue(基于循环数组实现的),SimpleLockFreeQueue(基于链表实现的)

3 原子性

1 i++: 对于i++而言,不是原子性cpu不保证;对于它编译器会转化成汇编三条指令
消息队列与无锁队列实现
具体流程:
(1)把变量i从内存(RAM)加载到寄存器;(2)把寄存器的值加1;(3)把寄存器的值写回内存(RAM)。
三条指令在多线程环境下会被打乱执行,得到的值与预期的值不一致。

2 计算器体系结构::
消息队列与无锁队列实现
会产生cache实现等问题。
3 gcc原子api接口
c++编译器提供了一组原子api.

type __sync_fetch_and_add (type *ptr, type value, ...) 
type __sync_fetch_and_sub (type *ptr, type value, ...) 
type __sync_fetch_and_or (type *ptr, type value, ...) 
type __sync_fetch_and_and (type *ptr, type value, ...) 
type __sync_fetch_and_xor (type *ptr, type value, ...) 
type __sync_fetch_and_nand (type *ptr, type value, ...) 
8bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...) 
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_lock_test_and_set (type *ptr, type value, ...)
void __sync_lock_release (type *ptr, ...)

对于c++11也提供了一组atomic接口

4 无锁队列

1 解决问题:
对于消息队列可以采用有锁来实现,多线程操作有锁队列也会引起的问题:
1 同一个线程在不同cpu运行会发生切换 cache损坏(cache trashing)
2 在同步机制上争抢队列block_queue(mutex+condition)
3 动态分配内存(对于高性能队列,开辟释放内存,不是直接调用系统调用来实现,而是采用内存池来实现)

5 开源无锁队列组件

对于无锁队列可以自己采用原子操作来实现一个(汇编实现,gcc提供的api接口实现),但是大多数情况下都是使用开源组件,
对于业界比较有名的实现方式有三种: zeromq组件,ArrayLockFreeQueue,SimpleLockFreeQueue
1 zeromq
原理:主要采用yqueue_t和ypipe_t两个数据结构,还有chunke的设计,在一个结点上一次性开辟N个大小的值,以及spare_chunk局部性原理。
比较难理解的ypipe_t r指针的预读机制,r可以理解为read_end,r并不是读数据的位置索引,⽽是我们可以最多读到哪个位置的索引。读数据的索引位置还是begin_pos。
a 原子指针类

 template <typename T> class atomic_ptr_t 
 {  
public:  
	 inline void set (T *ptr_); //⾮原⼦操作  
	  inline T *xchg (T *val_); //原⼦操作,设置⼀个新的值,然后返回旧的值 
	  inline T *cas (T *cmp_, T *val_);//原⼦操作 
  private: 
	   volatile T *ptr;
 }

b queue_t类
此结点类型就是一个可以装载N个T类型的元素
构造,析构,front,pop,back,push,unpush(回滚)
成员变量:比较有特色的是spare_chunk

template <typename T, int N> class yqueue_t
{
public:
	inline yueue_t(); // 构造函数
	inline ~yueue_t();// 析构函数
	inline T &front(); // 获取队头
	inline T &back();  // 获取队尾
	inline void push(); // 入队,要先back后再写入data,push的时候更新位置索引
	inline void pop(); // 出队,要先通过front取出来,pop更新位置索引
	inline void unpush(); // 回滚
private:
	struct chunk_t
	{
		T values[N};
		chunk_t *prev;
		chunk_t *next;
	};

	chunk_t *begin_chunk;  // 开始块
	int begin_pos;
	chunk_t *back_chunk;   // 当前块
	int back_pos;
	chunk_t *end_chunk;   // 结束块
	int end_pos;

	atomic_ptr_t<chunk_t> spare_chunk; // 空闲块,所有已经出队的块称空闲块, 读写线程的共享变量	
};

c 数据结构逻辑
yqueue_t:每次批量分配一批元素,减少内存的分配和释放,解决不断动态内存分配问题。
内部由一个一个chunk组成,每个chunk保持N个元素。

	struct chunk_t
	{
		T values[N};
		chunk_t *prev;
		chunk_t *next;
	};

消息队列与无锁队列实现
当队列空间不足时,每次分配一个chunk_t,每个能存储N个元素;
数据出队列后,队列有多余空间时候,回收chunk不是妈说释放,而是根据局部性原理先回收到spare_chunk里面,当再次需要分配chunk_t时候,从spare_chunk中获取。

上一篇:mongodb 系列~ chunk原理


下一篇:堆知识--持续