C++11实现一个读写自旋锁-2

在上一篇文章中介绍的读写自旋锁方案,写者有可能饿死,本文介绍一种写者不会饿死的实现方案。

上文说到被饿死的原因是当写者正在等待读者释放锁时,它无法阻止排在它后面的读者继续成功申请到锁,这样就导致在它后面的读者都插队到它的前面去了。为了避免出现这种现象,可以在写者准备申请锁时,先设置一个标记,告诉后面到来的读者:我要准备申请锁了,你们后面来的统统排在我后面。只要这个标记没有被清除,读者就无法申请到锁,这样就能保证只要前面的读者释放锁了,写者就可以获取到锁。

那么,这个标记什么时候清除呢?当然,可以在写者获取得到锁后,清除这个标记,但是按照读写锁的语义,只要写者没有释放锁,读者和其它写者是不可能获取锁的,因此清除这个标记在写锁被释放之后才真正有意义,既然如此,可以把它作为获取和释放写锁的标记,写锁和读锁就不再共享同一个计数器了。当设置这个标记时,既可以表示写者准备申请锁,也可以表示写者正在持有锁,是写锁在申请过程中不同阶段的状态。

这样,在读写锁中有两个成员,一个用于标记是否有写者准备申请写锁或者正在占有写锁,另一个用于存放获取读锁的读者数量。

先看一下类的定义:

class rw_spin_lock {
public:
	rw_spin_lock() = default;
	~rw_spin_lock() = default;

	rw_spin_lock(const rw_spin_lock&) = delete;
	rw_spin_lock &operator=(const rw_spin_lock&) = delete;
	
	void lock_reader() noexcept;
	bool try_lock_reader() noexcept;
	void unlock_reader() noexcept;
	void lock_writer() noexcept;
	bool try_lock_writer() noexcept;
	void unlock_writer() noexcept;

private:
	atomic_bool write_flag{false}; // 标记是否正在申请或者正在占有写锁
	atomic_int read_counter{0}; // 标记持有读锁的读者个数
};

有两个数据成员,atomic_bool类型的write_flag用于标记是否有写者正准备申请写锁或者正占有写锁,如果等于true,表示有,如果等于false,表示没有;atomic_int类型的read_counter用于表示持有读锁的读者数量,如果为0,表示没有读者持有锁。

下面看一下各个函数的实现:

1、申请读锁

void rw_spin_lock::lock_reader() noexcept {
	while (true) {
		while (write_flag) {// 位置A,等待写锁释放,一旦写锁设置write_flag之后,获取读锁时只能等着,因为读写之间要互斥 
			pause_cpu();
		}

		read_counter.fetch_add(1); // 位置B,写锁已经释放,读者计数器加1 
		if (write_flag) {// 位置C,如果写锁释放后马上又被获取了 
			read_counter.fetch_sub(1); // 读锁计数器回滚到原状态
		} else {
			break;
		}
	}
}

读者先自旋等待write_flag为false,因为当write_flag为true时,说明要么写者已经持有写锁了,因为读写操作要互斥,读者只好等待;要么写者正准备申请写锁,因为希望写者优先,读者也只好等待。当write_flag为false后,说明已经没有写者要申请写锁了,就在第7行位置B处让read_counter加1,表示申请读锁成功,当然此处是个乐观机制,读者乐观地认为自己能够成功获得锁。不过,从判断write_flag为false后从while循环退出(A处),与让read_counter加1(B处),不是一个原子操作,这个过程在高度竞争中有可能另一个写者抢先获得了锁。因此,读者还要进一步判断,在read_counter成功加1之前,还有没有别的写者抢先获得锁了(C处),如果获得了,根据写者优先,读者会放弃读锁,即让read_counter减1,回滚为初值。

可见,获取读锁的过程分为三步,第一步,等待写锁释放,第二步,乐观加锁,第三步,判断是否成功锁定,若失败,重回第一步。

2、尝试申请读锁

bool rw_spin_lock::try_lock_reader() noexcept {
	if (write_flag) {// 写锁已被申请,因为读写之间要互斥,申请失败
		return false;
	}

	read_counter.fetch_add(1); // 乐观认为没有写锁,读者计数器加1 
	if (write_flag) {// 如果写锁抢先被申请了
		read_counter.fetch_sub(1); // 读锁回滚
		return false;
	} else {
		return true;
	}
}

该成员函数的实现逻辑简单一些,如果write_flag为true,则说明写者要么已经持有写锁,要么正要申请写锁,读锁申请失败,返回false。如果write_flag为false,则暂时让read_counter加1,乐观地认为能够申请到读锁,但是否真正获得锁,还得要进一步检查此期间是否有写者抢占了写锁,如果有,则获取读锁失败,回滚read_counter的值,并返回false,否则说明获取读锁成功,返回true。

3、释放读锁

void rw_spin_lock::unlock_reader() noexcept {
	read_counter.fetch_sub(1);
}

释放读锁非常简单,就是让counter减1,当最后一个持有锁的读者线程成功释放后,counter等于0,说明没有任何读者持有读锁了。

4、申请写锁

void rw_spin_lock::lock_writer() noexcept {
	bool expect;
	do { // 先设置写标记,表示有写请求,此时如果后面有读操作则会申请不到读锁,如果同时有别的写者已设置该标记,就进入自旋中 
		expect = false;
	} while (!write_flag.compare_exchange_strong(expect, true)); // write_flag与expect相等,write_flag更新为true,并返回true,否则返回false

	while (read_counter != 0) { // 等待所有的读者释放读锁 
		pause_cpu();
	}
}

写者在申请锁时由两个循环构成,对应两个操作过程,第一步是宣称要申请锁,第二步是等待真正持有锁。

第一步是先和其它的写者竞争,竞争成功的写者线程把write_flag置为true,不成功的继续自旋等到这个write_flag为false。第二步是竞争成功的写者还得等待获得读锁的存量读者释放锁,也是使用自旋等待的机制。写者在第一步设置write_flag还同时告诉正准备申请读锁的读者,你们要等待这个标记清除之后才能申请,这些读者也处于自旋等待中,详见lock_reader()函数的位置A处。

可见在申请锁的过程中,可能会同时有三种形式的自旋等待,第一种是已经竞争到write_flag的写者在read_counter上自旋,没有竞争到write_flag的写者在write_flag上自旋,正在申请读锁的读者在write_flag上自旋。在read_counter上自旋的只有已经获得write_flag的写者线程自己,没有别的读者或者写者线程和它竞争,可见,只要读者线程释放了锁,这个写者线程就能立即获得锁,从而保证了写者优先。

5、尝试申请写锁

bool rw_spin_lock::try_lock_writer() noexcept {
	bool expect = false; // 假设其它写者没有持有写锁
	if (!write_flag.compare_exchange_strong(expect, true))
		return false;

	if (read_counter == 0)
		return true; // 读者没有持有读锁
	else {// 读者正在持有读锁,回滚write_flag,返回false
		write_flag = false;
		return false;
	}
}

首先使用cas算法尝试把write_flag从false更新为true,如果cas返回true,说明能够成功更新,则有可能成功申请到写锁,进一步判断读锁的占有状态;如果cas返回false,说明没有成功更新,则说明有别的写者正在申请或已经申请到锁了,返回false。当成功设置write_flag为true后,接下来检查read_counter是否为0,如果read_counter为0,说明没有读者持有锁,则成功获取写锁,返回true;如果不为0,说明有读者持有锁,根据写锁和读锁互斥的原则,无法成功获取写锁,回滚write_flag的状态为false,并返回false。

6、释放写锁

void rw_spin_lock::unlock_writer() noexcept {
    write_flag = false;
}

释放锁非常简单,让write_flag等于false即可。

该实现方案保证了写者在申请锁时,不会被饿死,但是如果一个写者正在持有锁,还有一些写者和读者线程也在同时申请锁,当写者释放锁后,这些读者会和写者一起进行竞争。也就是当前面一个写者释放锁(让write_flag为false)的操作,和下一个写者获取锁(让write_flag为true)的操作,不是原子的,中间有时间间隙,在这个间隙期间,当读者和写者一起竞争时,有可能读者会抢先申请到锁,导致写者不能及时获得锁,不过,当这个读者释放锁之后,写者会立即获取到锁,不会饿死。

上一篇:Java第21天总结


下一篇:Linux防止暴力破解密码脚本