1.引子
StampedLock类,在JDK1.8引入,是对ReenrantReadWriteLock的补充增强,它不是依赖AQS框架实现的锁,但还是等待队列这种并发实现机制,和ReenrantReadWriteLock有很多相似之处。StampedLock增加了一些新的功能,如读/写锁的转换、还能实现乐观锁,在竞争少的情况下能提高并发的性能。
如果能熟练灵活使用,搭配其他并发组件,可提高系统性能。使用此类需要对它的实现有一定了解,否则可能出现副作用,出现死锁和其他的问题。另外要注意StampedLock不支持锁重入。
2.结构组成
1)常量
/** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** The number of bits to use for reader count before overflowing */ private static final int LG_READERS = 7; // Values for lock state and stamp operations private static final long RUNIT = 1L; //,CAS更新state的增加量 private static final long WBIT = 1L << LG_READERS; // 128 二进制形式:0b10000000 private static final long RBITS = WBIT - 1L;// 127 二进制形式:0b01111111 private static final long RFULL = RBITS - 1L;// 126 二进制形式:0b01111110 private static final long ABITS = RBITS | WBIT; // 255 二进制形式:0b11111111 private static final long SBITS = ~RBITS; // -128 二进制形式:0b11111111_11111111_11111111_10000000 // Initial value for lock state; avoid failure value zero private static final long ORIGIN = WBIT << 1; // 256 二进制形式:0b100000000 // Special value from cancelled acquire methods so caller can throw IE private static final long INTERRUPTED = 1L; // Values for node status; order matters private static final int WAITING = -1; private static final int CANCELLED = 1; // Modes for nodes (int not boolean to allow arithmetic) private static final int RMODE = 0; private static final int WMODE = 1;
RUNIT: 值为1,更新读锁状态会用到的常量,“next = s + RUNIT”一个新线程获取获取到了读锁那么state将增加1.
ABITS:值为255,计算写锁状态时会用到的常量,位运算“((s = state) & ABITS)”来取state的低8位。
WBIT: 值为123,更新写锁状态会用到的常量,“next = s + WBIT”一个线程获取到写锁就将state属性加128。
RBITS:值为127,计算读锁状态会到的常量,“s & RBITS”用来取state的低7位。
RFULL:值为126,获取到读锁的最大线程数,“(readers = s & RBITS) >= RFULL”。
SBITS:计算写锁变化量会用到的常量,“(s & SBITS)”将s的低7位全部置0,只关注s的高25位。
ORIGIN:state的初始值,二进制形式只有第9位为1,其他位全为0。主要是不让statet等于0,只有尝试获取锁失败时,state才能为0.
2)成员变量
/** Head of CLH queue */ private transient volatile WNode whead; //等待队列头节点 /** Tail (last) of CLH queue */ private transient volatile WNode wtail; //等待队列尾节点 // views transient ReadLockView readLockView; //读锁视图 transient WriteLockView writeLockView;//写锁视图 transient ReadWriteLockView readWriteLockView;//读写锁视图 /** Lock sequence/state */ private transient volatile long state; //锁状态 /** extra reader count when state read count saturated */ private transient int readerOverflow; //当读锁线程数超过RFULL时,额外记录超出的的线程数
上面有一个特别重要的成员变量,它就是state,它表示了锁的状态。state是int类型变量,它有32个比特位,它的二进制形式的低7位表示读锁状态,它的二进制形式的第8位表示当前是否持有写锁(1表示持有写锁,0表示没持有写锁)。一个线程获取到读锁,其低7位就要加1,一个线程释放读锁,其低7位就要减1,因此在StampedLock的一些方法中常见“ s - RUNIT”和“s - RUNIT”这样的代码片段。
使用位运算"state&0b11111111"可取出state二进制的低8位,而常量ABITS的值则是0b11111111,所以在StampedLock的方法中有很多“(s = state) & ABITS”的代码片段。当取出state的低8位是0b10000000时表示当前持有写锁,而当取出的state的低8位小于0b10000000且不为0时表示当前至少持有1个读锁。
另外state的高24位并不是不用它,每获取或释放一次写锁,都是在state的第8位上加1,加1之后可能就需要向高位进位。如当前是持有写锁的,那么state二进制第8位是1,而此刻要释放写锁,就要在state二进制第8位加1,之前第8位就已经是1了,所以state第9位要进位,而进位后第8位就是0了。此时锁已被释放了,而state的第8位也是0,这也与预设规定“state的第8位若为1表示持有写锁,若为0表示没持有写锁”一致。不得不说,这个类的作者对位运算的特点运用得炉火纯青。
由此可看出,state的高25位都是描述写锁状态的,不管是获取还是释放一次写锁,其高25位都会在原基础上加1。这其实可以看作写锁的一个版本号,这有点类似于集合类的modCount属性(只要结构被修改了,modCount自增1),它本为就是为了解决ABA的问题而产生的。在读锁的释放和锁的转换时均要先比较写锁的这个版本号,通过位运算“state&SBITS”来取state的高25位,若版本号不一致,表明这个期间写锁的状态可能被改变过,这是不正常的状态,读锁的释放或锁的转换直接失败。
3)静态内部类WNode
WNode类是记录等待节点的内部类,与AQS的Node节点有些类似,最大不同之处在于有cowait这个读模式节点的链表。
static final class WNode { //前驱节点 volatile WNode prev; //后继节点 volatile WNode next; //等待的读模式节点 volatile WNode cowait; // list of linked readers //当前等待节点对应的线程 volatile Thread thread; // non-null while possibly parked //记录节点状态 volatile int status; // 0, WAITING, or CANCELLED //记录节点读、写模式 final int mode; // RMODE or WMODE WNode(int m, WNode p) { mode = m; prev = p; } }
4)实现Lock接口的相关成员内部类
ReadLockView 、WriteLockView和ReadWriteLockView均是成员内部员,ReadLockView 、WriteLockView的实现基本上是委托外部类StampedLock的相应方法完成的,而ReadWriteLockView又完全是委托ReadLockView 、WriteLockView这两上类实现的。值得注意的是ReadLockView 、WriteLockView均不支持等待条件Condition,执行newCondition方法都要直接抛出异常。
final class ReadLockView implements Lock { public void lock() { readLock(); } public void lockInterruptibly() throws InterruptedException { readLockInterruptibly(); } public boolean tryLock() { return tryReadLock() != 0L; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryReadLock(time, unit) != 0L; } public void unlock() { unstampedUnlockRead(); } public Condition newCondition() {//不支持条件 throw new UnsupportedOperationException(); } } final class WriteLockView implements Lock { public void lock() { writeLock(); } public void lockInterruptibly() throws InterruptedException { writeLockInterruptibly(); } public boolean tryLock() { return tryWriteLock() != 0L; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryWriteLock(time, unit) != 0L; } public void unlock() { unstampedUnlockWrite(); } public Condition newCondition() {//不支持条件 throw new UnsupportedOperationException(); } } final class ReadWriteLockView implements ReadWriteLock { public Lock readLock() { return asReadLock(); } //返回一个ReadLockView public Lock writeLock() { return asWriteLock(); }//返回一个WriteLockView }
5)构造方法
public StampedLock() { state = ORIGIN;//256,state二进制的第9位设为0,避免state为0,在stampedLock中0这种返回值表示相关操作失败。 }
3.写锁
1)获取写锁
writeLock方法的基本逻辑:
①若写/读锁均没有被任何线程持有,当前线程可以进行CAS更新state后,CAS更新成功就抢锁成功。②若CAS更新失败则调用acquireWrite完整地CAS更新state,acquireWrite方法会阻塞线程直到抢锁成功才能返回。③写/读锁任何之一被某些线程持有,直接调用acquireWrite阻塞当前线程,直到写锁被其他线程释放后且当前线程抢锁成功方法才得以返回
public long writeLock() { long s, next; // bypass acquireWrite in fully unlocked case only /** * ABITS的低8位全为1,二进制形式0b011111111,这里(s&ABITS)位运算就是获取state的低8位。 * 尝试CAS更新state(在state进制位的第8位加1)成功则返回新的state, * * state的低8位是表示读锁和写锁的状态,低8位是0表明写锁和读锁均没有被任何线程持有。 * 写/读锁均没有被任何线程持有,当前线程可以进行CAS更新state后,CAS更新成功就抢锁成功, * 若CAS更新失败则调用acquireWrite完整地CAS更新state,直到更新成功acquireWrite的死循环自旋才能返回. * 写/读锁任何之一被某些线程持有,直接调用acquireWrite会阻塞当前线程,直到写锁被其他线程释放后且当前线程抢锁成功方法才得以返回. */ return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L)); }
2)释放写锁
unlockWrite方法的基本流程:
①先检查锁状态是否正常,若不正常则抛出异常 。②无条件更新state,将表示写锁是否持有的state二进制第8位置为0。 ③唤醒等待队列头节点的后继节点
public void unlockWrite(long stamp) { WNode h; //state!=stamp表示预期的state与实际的state不等,表明状态不对 // (stamp & WBIT) == 0L 表示写/读锁均没有被任何线程持有。写锁都没被获取到就释放锁显然不是正常的状态。 if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); //无条件更新state(在state进制位的第8位加1,因为之前第8位就是1,更新后在第9位进1位,而第8位变成0). //若更新后的state是0,则将其默认初始值ORIGIN(二进制的第9位为1,其他位全是0). //这里state不能使用0,是因为state为0时表示获取锁失败的state值, state = (stamp += WBIT) == 0L ? ORIGIN : stamp; //“(h = whead) != null”头节点为空表明等待队列中还有等待节点,即有线程在等待获取锁 //status为零表示对应节点代表的线程刚释放锁。 if ((h = whead) != null && h.status != 0) //等待队列中有线程在等待抢锁,且此头节点不是刚释放锁 // 调用release方法,更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); }
4.读锁
1)获取读锁
readLock方法的基本逻辑:
①若写锁未被任何线程持有且读模式线程数小于规定的最大值,尝试CAS更新state(将state加1)。②若CAS更新失败则调用acquireRead完整地CAS更新state,acquireRead方法会阻塞线程直到抢锁成功才能返回。③若写锁已被某线程持有且读模式线程数大于等于规定的最大值,直接调用acquireWrite阻塞当前线程,直到抢锁成功才能返回。
public long readLock() { long s = state, next; // bypass acquireRead on common uncontended case //state的第8位是写锁状态,state的低7位是读锁状态,而“(s & ABITS)”是取state的低8位, // 则可以包含了写锁的状态。但实际情况是,“(s & ABITS) < RFULL”一定能确定没有写锁被获取到。 // 如果此时有写锁,那么state的第8位是1。(s & ABITS)的结果是128(常量WBIT),而RFULL的值是126。 //要想表达式“(s & ABITS) < RFULL”成立,state的第8位不能是1,只能是0. //而state的第8位是0,就表明当前写锁没有被任何线程持有。 return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? //若写锁未被任何线程持有且获取到读锁的线程数小于规定的最大值RFULL, //可以进行CAS更新state(将state加1),若CAS更新state成功,就获取到读锁,然后返回新的state //若CAS更新state失败,就进入acquireRead完整的获取读锁,直到获取锁成功,acquireRead方法才能返回 next : acquireRead(false, 0L)); }
2)释放读锁
unlockRead方法释放读锁的基本流程:
①先检查锁状态是否正常,若不正常则抛出异常 。②若获取到读锁的线程数小于规定的最大值时,CAS更新state(将state减1),若读模式线程数为0就唤醒等待队列头节点的后继节点。③若获取到读锁的线程数大于等于规定的最大值,调用tryDecReaderOverflow方法,尝试减少成员变量readerOverflow的值,并将其减少的值增加到state上。
public void unlockRead(long stamp) { long s, m; WNode h; for (;;) { //(s = state) & SBITS) != (stamp & SBITS)获取读锁到释放读锁期间,写锁状态变化了。这是不正常状态 // (stamp & ABITS) == 0L表明没有任何线程获取到了读/写锁,没得到锁就释放锁,这也是不正常状态 //"(m = s & ABITS) == 0L || m == WBIT"表明没有线程获取到读/写锁或有线程获取到了写锁,这还是不正常状态。 if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)//m=WBIT表明有线程获取了写锁 throw new IllegalMonitorStateException();//不正常状态需要抛出异常 if (m < RFULL) { //获取到读锁的线程数小于规定的最大值。 //CAS更新state(将state减1)。此处与写锁不同,不能无条件更新state, // 因为此时可能有多个线程同时释放读锁,而写锁在某一刻只可能有唯一的一个线程释放锁。 if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) //CAS更新成功,state已减至1(马上为就为0),且等待队列中还有可用的等待线程 // 调用release方法,更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); break; } } //获取到读锁的线程数大于等于规定的最大值 //调用tryDecReaderOverflow方法,尝试减少readerOverflow的值,并将其减少的值增加到state上。 else if (tryDecReaderOverflow(s) != 0L) break; } }
tryDecReaderOverflow方法基本逻辑:
①若获取到读锁的线程数等于规定的最大值 ,将成员变量readerOverflow的值减少1,将成员变量state的值加1,最终使得state二进制的低7位全为1,返回最新的state.②若获取到读锁的线程数大于规定的最大值,尝试减少readerOverflow失败,返回0
private long tryDecReaderOverflow(long s) { // assert (s & ABITS) >= RFULL; if ((s & ABITS) == RFULL) { //如果获取到读锁的线程数恰好达到规定的最大值 //CAS更新state,这里不太好理解,如果能联系这些常量的二进制值班就好容易理解些。 //"(s & ABITS) == RFULL"成立,可得出s的低8位是:01111110,而RBITS二进制是01111111 //那么这里的位运算“s | RBITS”的效果就是将s的最低位设为1、其他位不变,即s自增1. if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) { //CAS更新state成功,更新后state的二进制的低7位全为1 int r; long next; // readerOverflow大于0,表明之前读锁线程数超过了RFULL, // 并将超出的数量放在了readerOverflow成员变量中。 if ((r = readerOverflow) > 0) { //之前已经将state加1了,这里将readerOverflow减1,一加一减刚好平衡 readerOverflow = r - 1; next = s; } else //readerOverflow 本身就为0,它没有记录读锁线程超出数, //不能将readerOverflow减1,只能将state减1.这前对state加1,这里又对state减1,也能加减平衡 next = s - RUNIT;//对state无条件减1 state = next;// return next; } } //获取到读锁的线程数已经大于规定的最大值了,不能decReaderOverflow else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0) Thread.yield();//随机种子合适,当前线程礼让 return 0L;//获取到读锁的线程数已经大于规定的最大值,tryDecReaderOverflow失败,返回常量0 }
5.乐观读锁
tryOptimisticRead()的基本逻辑是:只要当前写锁被未被任何线程获取到,就返回一个有效的stamp,这是stamp是写锁的累计变化次数。若写锁被某线程获取到了,就返回一个失败状态的零。
public long tryOptimisticRead() { long s; //当写锁被某线程持有时,state二进制形式的第8位是1,反之第8位是0。 // 而WBIT的二进制形式(只有第8位是1,其他位均是0)10000000。 //因此只要state的第8位是0,表达式”(s = state) & WBIT“的结果就是0 //当前没有写锁被任何线程持有时,state的第8位是0,可以获取乐观读锁, //“(s = state) & WBIT”为零,使用“s & SBITS”(将s的低7位全部置0)计算出stamp. //当前已有线程获取了写锁时,state的第8位是0,不能获取乐观读锁,“(s = state) & WBIT”非零, //尝试获取乐观读锁失败,返回0. return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;//(s&SBITS)是写锁的变化次数 }
tryOptimisticRead方法常与validate结合使用,我们可以分析下validate如何实现的。
validate方法的逻辑很简单,就是检查从获取到stamp到当前这期间内,写锁的stamp是写锁的累计变化次数是否改变了。
public boolean validate(long stamp) { U.loadFence();//内存屏障,确保此屏障之前的读操作都已完成。 //当成功获取乐观读锁时,方法tryOptimisticRead返回(s&SBITS), //而传入的参数stamp是方法tryOptimisticRead的返回值,即stamp=(s&SBITS), //因数字SBITS的特殊性而有等式(stamp&SBITS)=(s&SBITS)&SBITS=(s&SBITS) 此时下面条件成立返回true. //当获取乐观读锁失败时,方法tryOptimisticRead返回常量0, //而传入的参数stamp为0时,此时下面条件不成立返回false return (stamp & SBITS) == (state & SBITS); }
6.各种锁的转化方法
1)尝试转化为乐观读锁
tryConvertToOptimisticRead()方法:
所有返回正常戳记的前提都是,传入的参数stamp与实际的state相匹配,不匹配直接返回0. ①首先检查写锁的累计变化次数是否改变了,若发生改变了直接返回0。 ②若当前state表示未持有任何锁,则直接返回当前的state。 ③若当前state表示持有写锁,则无条件更新state,并唤醒后继节点,再返回最新的state 。④若当前state表示持有读锁,且持有读锁的线程数小于规定的最大值,则CAS更新state(将state减1),返回写锁的累计变化次数,若state为1时还需唤醒后继节点 。⑤获取到读锁的线程数大于等于规定的最大值,则调用tryDecReaderOverflow尝试减少readerOverflow的值,若尝试成功就返回写锁的累计变化次数,若尝试失败就返回0。
public long tryConvertToOptimisticRead(long stamp) { //a表示期望的state的低8位,m实际的state的低8位,s表示当前的state,h是等待队列的头节点 long a = stamp & ABITS, m, s, next; WNode h; U.loadFence();//内存屏障,确保此屏障之前的读操作都已完成。 for (;;) { //写锁的变化量不一致 if (((s = state) & SBITS) != (stamp & SBITS)) break; //返回0,将失败 if ((m = s & ABITS) == 0L) {//实际上state的低8位为0,写/读锁均没有被任何线程持有 if (a != 0L) //实际的state的低8位是0,而预期的state的低8位不为0.两者状态不一致,返回0 break; //期望与实际的state的低8位相同,均为0,写/读锁均没有被任何线程持有. // 既然现在没有任何锁,可以马上获取任何锁,返回状态state return s; } else if (m == WBIT) { //表示当前写锁被某线程持有了 if (a != m)//预期与实际状态不一致,将返回0 break; //无条件更新state,state的第8位加1,因为此时之前state的第8位已经是1了, // 所以在更新后state的第9位将进一位,而第8位就变成0了. state = next = (s += WBIT) == 0L ? ORIGIN : s; if ((h = whead) != null && h.status != 0) //有可用的等待节点,调用release方法更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); return next; } //实际的state的低8位不为0也不为WBIT时 else if (a == 0L || a >= WBIT) //预期的state的低8位是0,状态不一致 //a的理论最大值是WBIT,参数stamp不正确。 //将返回0 break; else if (m < RFULL) {//获取到读锁的线程数小于规定的最大值,还可以继续获取读锁。 //CAS更新state(将state减1)。此处与写锁不同,不能无条件更新state, // 因为此时可能有多个线程同时释放读锁,而写锁在某一刻只可能有唯一的一个线程释放锁。 if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) //CAS更新成功,state已减至1(马上为就为0),且等待队列中还有可用的等待线程 // 调用release方法,更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); return next & SBITS; //返回写锁的累计变化次数 } } //获取到读锁的线程数大于等于规定的最大值 //调用tryDecReaderOverflow方法,尝试减少readerOverflow的值,并将其减少的值增加到state上。 else if ((next = tryDecReaderOverflow(s)) != 0L) //尝试减少成员变量readerOverflow的值成功 //返回写锁的累计变化次数 return next & SBITS; } return 0L; }
2)尝试转化为读锁
tryConvertToReadLock方法的主要逻辑:
①所有返回正常戳记的前提都是,传入的参数stamp与实际的state相匹配。②若当前state表示未持有任何锁,则获取一个读锁,再返回最新的state 。③若当前的state表示持有写锁,则释放写锁并获得读锁,返回最新的state。④ 若当前的state表示持有读锁,则将直接将这个传入的stamp返回。 ⑤在所有其他情况下,此方法均返回零
public long tryConvertToReadLock(long stamp) { long a = stamp & ABITS, m, s, next; WNode h; //写锁的变化量没变,即期间没有获取/释放写锁,传入的参数stamp与实际的state相匹配才尝试转为读锁 while (((s = state) & SBITS) == (stamp & SBITS)) { // if ((m = s & ABITS) == 0L) {//实际上state的低8位为0,写/读锁均没有被任何线程持有 if (a != 0L) ////实际的state的低8位是0,而预期的state的低8位不为0.两者状态不一致,返回0 break; else if (m < RFULL) {//这是一个BUG,此时m=0,m一定小于RFULL // 既然现在没有任何锁,可以马上获取任何锁, if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) //CAS更新成功,获取了一个读锁,返回最新的state return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } else if (m == WBIT) {//表示当前写锁被某线程持有了 if (a != m) break;//预期与实际状态不一致,将返回0 //释放写锁并获取一个读锁 state = next = s + (WBIT + RUNIT); //写锁状态加1(在state的第8位加1),读锁状态也加1 if ((h = whead) != null && h.status != 0) //有可用的等待节点,调用release方法更新头节点的status为0,并唤醒下一个等待节点(线程) release(h); return next;//返回最新的state } else if (a != 0L && a < WBIT) //已经是读锁了,直接返回这个读锁 return stamp; else break; } return 0L;//写锁的变化量变了,锁转换失败,返回0 }
3)尝试转化为写锁
tryConvertToWriteLock的主要逻辑:
所有返回正常戳记的前提都是,传入的参数stamp与实际的state相匹配 。① 若当前state表示未持有任何锁(可读取乐观)CAS更新state,获取一个读锁,返回最新的state 。② 若当前state表示持有写锁,则直接将传入的参数stamp返回。③若state表示持有读锁,则CAS更新state(同时释放该读锁状态再获取写锁状态),返回最新的state。④在所有其他情况下,此方法均返回零。
public long tryConvertToWriteLock(long stamp) { long a = stamp & ABITS, m, s, next; while (((s = state) & SBITS) == (stamp & SBITS)) {//写锁的变化量没变,即期间没有获取/释放写锁,才尝试转为读锁 if ((m = s & ABITS) == 0L) {//实际上(当前)state的低8位为0,写/读锁均没有被任何线程持有 if (a != 0L) break;//预期与实际状态不一致,将返回0 if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) //CAS更新成功,获取了一个写锁,返回最新的state return next; } else if (m == WBIT) { //表示当前写锁被某线程持有了 if (a != m) break;//预期与实际状态不一致,将返回0 return stamp;//直接返回这个写锁 } else if (m == RUNIT && a != 0L) {//有一个读锁了 if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))//释放一个读锁(减RUNT)再获取一个写锁(加WBIT) //CAS更新state成功,返回新的state return next; } else break; } return 0L; }