Doug Lea是JDK中concurrent工具包的作者,这位大神是谁可以自行google。
本文浅析ReentrantLock(可重入锁)的原理
Lock接口
Lock接口定义了这几个方法:
- lock()
用来获取锁,如果锁已经被其他线程占有,则进行等待,直到抢占到锁;该方法在发送异常时不会自动释放锁,所以在使用时需要在finall块中释放锁; - tryLock()和tryLock(long time, TimeUnit unit)
尝试获得锁,如果锁已经被其他线程占有,返回false,成功获取锁返回true;该方法不会等待,立即返回;而带有参数的tryLock在等待时长内拿到锁返回true,超时或者没拿到锁返回false;带参数的方法还支持响应中断; - lockInterruptibly()
支持中断的lock(); - unlock()
释放锁; - newCondition()
新建Condition
,Condition以后会分析;
ReentrantLock可重入锁
ReentrantLock实现了Lock接口,ReentrantLock中有一个重要的成员变量,同步器
sync继承了AbstractQueuedSynchronizer
简称AQS
,我们先介绍AQS
;
AQS用一个队列(结构是一个FIFO队列)来管理同步状态,当线程获取同步状态失败时,会将当前线程包装成一个Node
放入队列,当前线程进入阻塞状态;当同步状态释放时,会从队列去出线程获取同步状态。
AQS里定义了head、tail、state,他们都是volatile修饰的,head指向队列的第一个元素,tail指向队列的最后一个元素,state表示了同步状态,这个状态非常重要,在ReentrantLock中,state为0的时候代表锁被释放,state为1时代表锁已经被占用;
看下面代码:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
这一段静态初始化代码初始了state、head、tail等变量的在内存中的偏移量;Unsafe
类是sun.misc下的类,不属于java标准。Unsafe
让java可以像C语言一样操作内存指针,其中就提供了CAS
的一些原子操作和park、unpark
对线程挂起与恢复的操作;关于CAS
是concurrent工具包的基础,以后会单独介绍,其主要作用就是在硬件级别提供了compareAndSwap
的功能,从而实现了比较和交换的原子性操作。
AQS还有一个内部类叫Node,它将线程封装,利用prev和next可以将Node串连成双向链表,这就是一开始说的FIFO的结构;
ReentrantLock提供了公平锁和非公平锁,我们这里从非公平锁分析AQS的应用;
Lock调用lock()方法时调用了AQS的lock()方法,我们来看这个非公平锁NonfairSync
的lock方法:
final void lock() {
//首先调用CAS抢占同步状态state,如果成功则将当前线程设置为同步器的独占线程,
//这也是非公平的体现,因为新来的线程没有马上加入队列尾部,而是先尝试抢占同步状态。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//抢占同步状态失败,调用AQS的acquire
acquire(1);
}
瞄一眼acquire方法:
public final void acquire(int arg) {
//在这里还是先试着抢占一下同步状态
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire调用的是NonfairSync
的实现,然后又调用了Sync
的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//和之前一样,利用CAS抢占同步状态,成功则设置当前线程为独占线程并且返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程已经是独占线程,即当前线程已经获得了同步状态则将同步状态state加1,
//这里是可重入锁的体现
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//没有抢占到同步状态返回false
return false;
}
再看addWaiter方法:
private Node addWaiter(Node mode) {
//新建一个Node,封装了当前线程和模式,这里传入的是独占模式Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果tail不为空就不需要初始化node队列了
if (pred != null) {
//将node作为队列最后一个元素入列
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
//返回新建的node
return node;
}
}
//如果tail为空则表示node队列还没有初始化,此时初始化队列
enq(node);
return node;
}
瞄一眼enq方法:
private Node enq(final Node node) {
//无限loop直到CAS成功,其他地方也大量使用了无限loop
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//队列尾部为空,必须初始化,head初始化为一个空node,不包含线程,tail = head
if (compareAndSetHead(new Node()))
tail = head;
} else {
//队列已经初始化,将当前node加在列尾
node.prev = t;
//将当前node设置为tail,CAS操作,enqueue安全
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
拿到新建的node后传给acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//标记是否中断状态
boolean interrupted = false;
for (;;) {
//拿到当前node的前驱
final Node p = node.predecessor();
//如果前驱正好为head,即当前线程在列首,马上tryAcquire抢占同步状态
if (p == head && tryAcquire(arg)) {
//抢占成功后,将当前节点的thread、prev清空作为head
setHead(node);
p.next = null; // help GC 原来的head等待GC回收
failed = false;
return interrupted;
}
//没有抢占成功后,判断是否要park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
瞄一眼shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前驱node的状态为SIGNAL,说明当前node可以park
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//如果前驱的状态大于0说明前驱node的thread已经被取消
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//从前驱node开始,将取消的node移出队列
//当前节点之前的节点不会变化,所以这里可以更新prev,而且不必用CAS来更新。
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//前驱node状态等于0或者为PROPAGATE(以后会介绍)
//将前驱node状态设置为SIGNAL,返回false,表示当前node暂不需要park,
//可以再尝试一下抢占同步状态
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
看一下parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
//返回当前线程是否设置中断标志,并清空中断标志
return Thread.interrupted();
}
这里解释一下为什么要保存一下中断标志:中断会唤醒被park的阻塞线程,但被park的阻塞线程不会响应中断,所以这里保存一下中断状态并返回,如果状态为true说明发生过中断,会补发一次中断,即调用interrupt()方法
在acquireQueued中发生异常时执行cancelAcquire:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//清空node的线程
node.thread = null;
// Skip cancelled predecessors
//移除被取消的前继node,这里只移动了node的prev,没有改变next
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
//获取前继node的后继node
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//设置当前node等待状态为取消,其他线程检测到取消状态会移除它们
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
//如果当前node为tail,将前驱node设置为tail(CAS)
//设置前驱node(即现在的tail)的后继为null(CAS)
//此时,如果中间有取消的node,将没有引用指向它,将被GC回收
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//如果当前node既不是head也不是tail,设置前继node的后继为当前node后继
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//唤醒当前node后继
unparkSuccessor(node);
}
//当前node的next设置为自己
//注意现在当前node的后继的prev还指向当前node,所以当前node还未被删除,prev是在移除取消节点时更新的
//这里就是为什么在前面要从后往前找可换新的node原因了,next会导致死循环
node.next = node; // help GC
}
}
画图描述解析一下cancelAcquire:
首先看如何跳过取消的前驱
这时,前驱被取消的node并没有被移出队列,前驱的前驱的next还指向前驱;
如果当前node是tail的情况:
这时,没有任何引用指向当前node;
如果当前node既不是tail也不是head:
这时,当前node的前驱的next指向当前node的后继,当前node的next指向自己,pre都没有更新;
如果当前node是head的后继:
这时,只是简单的将当前node的next指向自己;
到这里,当线程抢占同步状态的时候,会进入FIFO队列等待同步状态被释放。在unlock()方法中调用了同步器的release方法;看一下release方法:
public final boolean release(int arg) {
//判断是否释放同步状态成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//如果head不为null,且head的等待状态不为0,
//唤醒后继node的线程
unparkSuccessor(h);
return true;
}
return false;
}
再来看一下tryRelease方法(在Sync类中实现):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//当前thread不是独占模式的那个线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果同步状态state为0,释放成功,将独占线程设置为null
free = true;
setExclusiveOwnerThread(null);
}
//更新同步状态state
setState(c);
return free;
}
继续看unparkSuccessor(唤醒后继node的tread)方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
//head的等待状态为负数,设置head的等待状态为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果head的后继node不存在或者后继node等待状态大于0(即取消)
//从尾部往当前node迭代找到等待状态为负数的node,unpark
//因为会有取消的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
总结
介绍完ReentrantLock后,我们大体了解了AQS的工作原理。AQS主要就是使用了同步状态和队列实现了锁的功能。有了CAS这个基础,AQS才能发挥作用,使得在enqueue、dequeque、节点取消和异常时能够保证队列在多线程下的完整性。