c++ 高并发队列的实现

JAVA如何进行CAS

讲到java的队列时,讲到java中的CAS操作
回顾下java中的cas,主要采用compareAndSet方法,如AtomicReference中所使用的:
AtomicRefrence.java

/**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(V expect, V update) {
        return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
    }

unsafe.java


    /**
     * Atomically update Java variable to <tt>x</tt> if it is currently
     * holding <tt>expected</tt>.
     * @return <tt>true</tt> if successful
     */
    public final native boolean compareAndSwapObject(Object o, long offset,
                                                     Object expected,
                                                     Object x);
  • obj :包含要修改的字段对象;
  • offset :字段在对象内的偏移量;
  • expect : 字段的期望值;
  • update : 如果该字段的值等于字段的期望值,用于更新字段的新值;

compareAndSwapObject是一个本地方法,调用的c++的实现

// Unsafe.h
virtual jboolean compareAndSwapObject(::java::lang::Object *, jlong, ::java::lang::Object *, ::java::lang::Object *);

// natUnsafe.cc
static inline bool
compareAndSwap (volatile jobject *addr, jobject old, jobject new_val)
{
	jboolean result = false;
	spinlock lock;
  
  	// 如果字段的地址与期望的地址相等则将字段的地址更新
	if ((result = (*addr == old)))
    	*addr = new_val;
	return result;
}

// natUnsafe.cc
jboolean
sun::misc::Unsafe::compareAndSwapObject (jobject obj, jlong offset,
                     jobject expect, jobject update)
{
	// 获取字段地址并转换为字符串
	jobject *addr = (jobject*)((char *) obj + offset);
	// 调用 compareAndSwap 方法进行比较
    return compareAndSwap (addr, expect, update);
}

这段代码主要做的是:
1、通过对象的首地址跟字段在对象内的偏移量来获取字段的地址
2、判断字段地址是否与我们期望的地址相同,如果相同即更新新的地址

在java中,地址相同说明两个对象相同。

注意到c++源码中使用的 spinlock,其实现:


// Use a spinlock for multi-word accesses
class spinlock
{
  static volatile obj_addr_t lock;

public:

spinlock ()
  {
    while (! compare_and_swap (&lock, 0, 1))
      _Jv_ThreadYield ();
  }
  ~spinlock ()
  {
    release_set (&lock, 0);
  }
};
  
// This is a single lock that is used for all synchronized accesses if
// the compiler can't generate inline compare-and-swap operations.  In
// most cases it'll never be used, but the i386 needs it for 64-bit
// locked accesses and so does PPC32.  It's worth building libgcj with
// target=i486 (or above) to get the inlines.
volatile obj_addr_t spinlock::lock;

volatile关键字让编译器不进行优化,从而lock值每次都从内存中读取。

C++如何进行CAS

通过前半文的了解,我们知道c 提供的函数 compare_and_swap

bool compare_and_swap ( int *memory_location, int expected_value, int new_value)
{
    if (*memory_location == expected_value)
    {
        *memory_location = new_value;
        return true;
    }
    return false;
}

(1)GGC对CAS支持
GCC4.1+版本中支持CAS原子操作。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

(2)Windows对CAS支持
Windows中使用Windows API支持CAS。

LONG InterlockedCompareExchange(
  LONG volatile *Destination,
  LONG          ExChange,
  LONG          Comperand
);

(3)C11对CAS支持
C11 STL中atomic函数支持CAS并可以跨平台。

template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );

template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );

atomic_compare_exchange_weak当存储值时会发生失败情况,且返回false,所以通常采用while直至比对交换成功,相比使用aotmic_compare_exchange_strong会有更高的性能,而使用while也即是通常所说的自旋效果。

已有的c++ 无锁队列

1、生产级队列ConcurrencyQueue
这是我在生产中使用的,因为消费者可以采用wait方式监听队列消息进行消费,所以蛮方便的。

2、boost方案:
boost提供了三种无锁方案,分别适用不同使用场景。
boost::lockfree::queue是支持多个生产者和多个消费者线程的无锁队列。
boost::lockfree::stack是支持多个生产者和多个消费者线程的无锁栈。
boost::lockfree::spsc_queue是仅支持单个生产者和单个消费者线程的无锁队列,比boost::lockfree::queue性能更好。
Boost无锁数据结构的API通过轻量级原子锁实现lock-free,不是真正意义的无锁。
Boost提供的queue可以设置初始容量,添加新元素时如果容量不够,则总容量自动增长;但对于无锁数据结构,添加新元素时如果容量不够,总容量不会自动增长。

因为boost太大了,所以生产比较少用

自己造*

当我们能把一样东西做出来,说明我们才是真正的了解,所以多造*对自己有帮助。

RingBuffer 环形队列

c++ 高并发队列的实现

数据结构

我们采用数组的线性空间来实现环形接口,当数据到达尾部时将其转回到0的位置重写入。
环形结构的容量位置从数组q[0] 到 q[max - 1]。
head表示队列头,tail表示队列尾,当(tail + 1) % max 即表示队列已满

算法

%max 取余 可以 通过位运算  head &  (max - 1),需要保证max是2的幂次方

RingBuffer实现

单消费者,单生产者

#pragma once

template<typename T>
class RingBuffer {
private:
	//队列大小
	unsigned int _size;
	//队列头部索引
	int _front;
	//队列尾部索引
	int _tail;
	//数据缓冲区
	T* _data;
public:
	RingBuffer(unsigned int size) :_size(size),_front(0), _tail(0) {
		_data = new T[size];
	}

	~RingBuffer() {
		delete[]T;
		_data = nullptr;
	}

	inline bool isEmpty() {
		return _front == _tail;
	}

	inline bool isFull() {
		//当尾部距离头部间隔1时队列已满
		return _front == (_tail + 1) % _size;
	}

	inline bool push(const T& v) {
		if (isFull()) {
			return false;
		}
		//先在tail的位置写上数据,再移动
		_data[_tail] = v;
		_tail = (_tail + 1) % _size;
		return true;
	}

	inline bool push(const T* v) {
		return push(*v);
	}

	inline bool pop(T& v) {
		if (isEmpty()) {
			return false;
		}
		v = _data[_front];
		_front = (_front + 1) % _size;
		return true;
	}

	inline unsigned int front() {
		return _front;
	}

	inline unsigned int tail() {
		return _tail;
	}

	inline unsigned int size() {
		return _size;
	}
};

上文的head,tail并不是线程安全的,当多生产者多消费者,需要进行加锁,而这个时候,我们便可以使用atomic_compare_exchange_weak了,当然了,我们也可以使用c++11 提供的atomic

LockFreeQueue实现

LockFreeQueue.hpp

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/mman.h>

#define SHM_NAME_LEN 128
#define MIN(a, b) ((a) > (b) ? (b) : (a))
#define IS_POT(x) ((x) && !((x) & ((x)-1)))
#define MEMORY_BARRIER __sync_synchronize()

template <class T>
class LockFreeQueue
{
protected:
    typedef struct
    {
	int m_lock;
	inline void spinlock_init()
	{
	    m_lock = 0;
	}
	inline void spinlock_lock()
	{
	    while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {}
	}
	inline void spinlock_unlock()
	{
	    __sync_lock_release(&m_lock);
	}
    } spinlock_t;
public:
    // size:队列大小
    // name:共享内存key的路径名称,默认为NULL,使用数组作为底层缓冲区。
    LockFreeQueue(unsigned int size, const char* name = NULL)
    {
	memset(shm_name, 0, sizeof(shm_name));
	createQueue(name, size);
    }
    ~LockFreeQueue()
    {
	if(shm_name[0] == 0)
	{
	    delete [] m_buffer;
	    m_buffer = NULL;
	}
	else
	{
	    if (munmap(m_buffer, m_size * sizeof(T)) == -1) {
		perror("munmap");
	    }
	    if (shm_unlink(shm_name) == -1) {
		perror("shm_unlink");
	    }
	}
    }
    
    bool isFull()const
    {
#ifdef USE_POT
        return m_head == (m_tail + 1) & (m_size - 1);
#else
	return m_head == (m_tail + 1) % m_size;
#endif
    }

    bool isEmpty()const
    {
	return m_head == m_tail;
    }
    unsigned int front()const
    {
	return m_head;
    }

    unsigned int tail()const
    {
	return m_tail;
    }

    bool push(const T& value)
    {
#ifdef USE_LOCK
        m_spinLock.spinlock_lock();
#endif
        if(isFull())
        {
#ifdef USE_LOCK
            m_spinLock.spinlock_unlock();
#endif
            return false;
        }
        memcpy(m_buffer + m_tail, &value, sizeof(T));
#ifdef USE_MB
        MEMORY_BARRIER;
#endif

#ifdef USE_POT
        m_tail = (m_tail + 1) & (m_size - 1);
#else
        m_tail = (m_tail + 1) % m_size;
#endif

#ifdef USE_LOCK
        m_spinLock.spinlock_unlock();
#endif
        return true;
    }

    bool pop(T& value)
    {
#ifdef USE_LOCK
        m_spinLock.spinlock_lock();
#endif
        if (isEmpty())
        {
#ifdef USE_LOCK
            m_spinLock.spinlock_unlock();
#endif
            return false;
        }
        memcpy(&value, m_buffer + m_head, sizeof(T));
#ifdef USE_MB
        MEMORY_BARRIER;
#endif

#ifdef USE_POT
        m_head = (m_head + 1) & (m_size - 1);
#else
        m_head = (m_head + 1) % m_size;
#endif

#ifdef USE_LOCK
        m_spinLock.spinlock_unlock();
#endif
        return true;
    }
protected:
    virtual void createQueue(const char* name, unsigned int size)
    {
#ifdef USE_POT
        if (!IS_POT(size))
        {
            size = roundup_pow_of_two(size);
        }
#endif
        m_size = size;
	m_head = m_tail = 0;
	if(name == NULL)
	{
	    m_buffer = new T[m_size];
	}
        else
        {
		int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
		if (shm_fd < 0)
		{
		    perror("shm_open");
		}
		if (ftruncate(shm_fd, m_size * sizeof(T)) < 0)
		{
		    perror("ftruncate");
		    close(shm_fd);
		}
		void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
		if (addr == MAP_FAILED)
		{
		    perror("mmap");
		    close(shm_fd);
		}
		if (close(shm_fd) == -1)
		{
		    perror("close");
		    exit(1);
		}
		m_buffer = static_cast<T*>(addr);
		memcpy(shm_name, name, SHM_NAME_LEN - 1);
        }
#ifdef USE_LOCK
	spinlock_init(m_lock);
#endif
    }
    inline unsigned int roundup_pow_of_two(size_t size)
    {
	size |= size >> 1;
	size |= size >> 2;
	size |= size >> 4;
	size |= size >> 8;
	size |= size >> 16;
	size |= size >> 32;
	return size + 1;
    }

protected:
    char shm_name[SHM_NAME_LEN];
    volatile unsigned int m_head;
    volatile unsigned int m_tail;
    unsigned int m_size;
#ifdef USE_LOCK
    spinlock_t m_spinLock;
#endif
    T* m_buffer;
};

#define USE_LOCK
开启spinlock锁,多生产者多消费者场景
#define USE_MB
开启Memory Barrier
#define USE_POT
开启队列大小的2的幂对齐

kfifo内核队列

kfifo是Linux内核的一个FIFO数据结构,采用环形循环队列的数据结构来实现,提供一个无边界的字节流服务,并且使用并行无锁编程技术,即单生产者单消费者场景下两个线程可以并发操作,不需要任何加锁行为就可以保证kfifo线程安全。

kfifo的实现

struct kfifo
{
	//首地址
    unsigned char *buffer;
    //队列大小
    unsigned int size;
    //头
    unsigned int in;
    //尾
    unsigned int out;
    //自旋锁
    spinlock_t *lock;
};

// 创建队列
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    struct kfifo *fifo;
    // 判断是否为2的幂,用于位运算
    BUG_ON(!is_power_of_2(size));
    fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
    if (!fifo)
        return ERR_PTR(-ENOMEM);
    fifo->buffer = buffer;
    fifo->size = size;
    //队列位空
    fifo->in = fifo->out = 0;
    fifo->lock = lock;

    return fifo;
}
// 分配空间
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;
    // 判断是否为2的幂
    if (!is_power_of_2(size))
    {
        BUG_ON(size > 0x80000000);
        // 向上扩展成2的幂
        size = roundup_pow_of_two(size);
    }
    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);
    ret = kfifo_init(buffer, size, gfp_mask, lock);

    if (IS_ERR(ret))
        kfree(buffer);
    return ret;
}

void kfifo_free(struct kfifo *fifo)
{
    kfree(fifo->buffer);
    kfree(fifo);
}

// 入队操作
static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    //linux内核提供的锁机制
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_put(fifo, buffer, len);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}
// 出队操作
static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_get(fifo, buffer, len);
    //当fifo->in == fifo->out时,buufer为空
    if (fifo->in == fifo->out)
        fifo->in = fifo->out = 0;
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}
// 入队操作
unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //buffer中空的长度
    len = min(len, fifo->size - fifo->in + fifo->out);
    // 内存屏障:smp_mb(),smp_rmb(), smp_wmb()来保证对方观察到的内存操作顺序
    smp_mb();
    // 将数据追加到队列尾部
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);
	//写内存屏障
    smp_wmb();
    //每次累加,到达最大值后溢出,自动转为0
    fifo->in += len;
    return len;
}
// 出队操作
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //有数据的缓冲区的长度
    len = min(len, fifo->in - fifo->out);
    //读内存屏障
    smp_rmb();
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);
    //内存屏障
    smp_mb();
    fifo->out += len; //每次累加,到达最大值后溢出,自动转为0
    return len;
}

static inline void __kfifo_reset(struct kfifo *fifo)
{
    fifo->in = fifo->out = 0;
}

static inline void kfifo_reset(struct kfifo *fifo)
{
    unsigned long flags;
    spin_lock_irqsave(fifo->lock, flags);
    __kfifo_reset(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
}

static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
    return fifo->in - fifo->out;
}

static inline unsigned int kfifo_len(struct kfifo *fifo)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_len(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}

总结:
Linux内核中有spin_lock、spin_lock_irq和spin_lock_irqsave保证同步。

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
    local_irq_disable();
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

spin_lock比spin_lock_irq速度快,但并不是线程安全的。spin_lock_irq增加调用local_irq_disable函数,即禁止本地中断,是线程安全的,既禁止本地中断,又禁止内核抢占。
spin_lock_irqsave是基于spin_lock_irq实现的一个辅助接口,在进入和离开临界区后,不会改变中断的开启、关闭状态。
如果自旋锁在中断处理函数中被用到,在获取自旋锁前需要关闭本地中断,spin_lock_irqsave实现如下:
A、保存本地中断状态;
B、关闭本地中断;
C、获取自旋锁。
解锁时通过 spin_unlock_irqrestore完成释放锁、恢复本地中断到原来状态等工作。
(3)线性代码结构
代码中没有任何if-else分支来判断是否有足够的空间存放数据,kfifo每次入队或出队只是简单的 +len 判断剩余空间,并没有对kfifo->size 进行取模运算,所以kfifo->in和kfifo->out总是一直增大,直到unsigned in超过最大值时绕回到0这一起始端,但始终满足:kfifo->in - kfifo->out <= kfifo->size。
(4)使用Memory Barrier
mb():适用于多处理器和单处理器的内存屏障。
rmb():适用于多处理器和单处理器的读内存屏障。
wmb():适用于多处理器和单处理器的写内存屏障。
smp_mb():适用于多处理器的内存屏障。
smp_rmb():适用于多处理器的读内存屏障。
smp_wmb():适用于多处理器的写内存屏障。  
Memory Barrier使用场景如下:
A、实现同步原语(synchronization primitives)
B、实现无锁数据结构(lock-free data structures)
C、驱动程序
程序在运行时内存实际访问顺序和程序代码编写的访问顺序不一定一致,即内存乱序访问。内存乱序访问行为出现是为了提升程序运行时的性能。内存乱序访问主要发生在两个阶段:
A、编译时,编译器优化导致内存乱序访问(指令重排)。
B、运行时,多CPU间交互引起内存乱序访问。
Memory Barrier能够让CPU或编译器在内存访问上有序。Memory barrier前的内存访问操作必定先于其后的完成。Memory Barrier包括两类:
A、编译器Memory Barrier。
B、CPU Memory Barrier。
通常,编译器和CPU引起内存乱序访问不会带来问题,但如果程序逻辑的正确性依赖于内存访问顺序,内存乱序访问会带来逻辑上的错误。
在编译时,编译器对代码做出优化时可能改变实际执行指令的顺序(如GCC的O2或O3都会改变实际执行指令的顺序)。
在运行时,CPU虽然会乱序执行指令,但在单个CPU上,硬件能够保证程序执行时所有的内存访问操作都是按程序代码编写的顺序执行的,Memory Barrier没有必要使用(不考虑编译器优化)。为了更快执行指令,CPU采取流水线的执行方式,编译器在编译代码时为了使指令更适合CPU的流水线执行方式以及多CPU执行,原本指令就会出现乱序的情况。在乱序执行时,CPU真正执行指令的顺序由可用的输入数据决定,而非程序员编写的顺序。

上一篇:磁盘I/O流程的场景分类和linux系统中的I/O调度策略


下一篇:Linux中管道、命名管道