ReenTrantLock (公平锁)Lock过程源码剖析
背景:
在JDK1.6之前,sync还是一把不会改变的重量锁,Doug Lea 实现了ReenTrantLock。在现在sync更新后,ReenTrantLock凭借自己特有的API,依旧被广泛使用。此文章,是对ReenTrantLock中的lock方法的简要剖析。其中列举了五种常见情况。
情况一:第一个进入的线程 t1
-
进入lock
-
进入acquire(1);
public final void acquire(int arg) {
if (!tryAcquire(arg) && //tryAcquire尝试获取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 进入tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//获取当前线程lock的次数,为0。
if (c == 0) {
if (!hasQueuedPredecessors() &&//hasQueuedPredecessors:判断是否有人排队
compareAndSetState(0, acquires)) { //CAS:比较和设置值状态,尝试加锁,成功返回true,失败返回false。底层为native操作。
setExclusiveOwnerThread(current);//设置当前执行线程,为复制操作。
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- 进入hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // 获取LinkedList头尾节点,此时没有链表即为null
Node h = head;
Node s;
return h != t && // 此时即h与t同为null,故为false,整个函数返回false
((s = h.next) == null || s.thread != Thread.currentThread());
}
-
CAS为native操作,尝试加锁成功返回true。最后设置当前执行线程后,返回true
-
此时!tryAcquire的值为false,故不会进入后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)操作,整个函数返回
综上,当第一次执行lock方法时,并未调用unsafe的park()方法。故该情况下的ReenTrantLock是轻量锁。
情况二:当t1正在执行时,再次进入lock()
-
进入lock()
-
进入acquire(1)中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 进入tryAcquire(arg)中,尝试获取线程
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 获取当前线程t1.
int c = getState(); //获取当前lock次数,此时为1
if (c == 0) {// 进入else
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 判断当前线程是否是执行线程,true,进入if。
int nextc = c + acquires; // acquires值为传参1,c为1,故nextc为2。
if (nextc < 0) //nextc = 2 ,故不进入if
throw new Error("Maximum lock count exceeded");
setState(nextc);// 设置当前lock的次数为2
return true; // 返回true
}
return false;
}
}
- 此时if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))中的!tryAcquire(arg)为false,故不进行之后的判断,t1继续执行。
综上,在t1执行时,t1再次进入lock(),并未调用unsafe的方法。故ReenTrantLock依旧是一把轻量锁。而State的设置也表明了ReenTrantLock的可重入性。
情况三:当t1正在执行时,t2尝试获取Lock
-
t2 进入lock()
-
进入acquire(1);
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 进入tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
-
进入hasQueuedPredecessors(),因为此时LinkedList还没有创建,故返回false。而整个!hasQueuedPredecessors()为true。故进入compareAndSetState(0, acquires)中。该操作为native操作。尝试加锁,不能加锁,返回false。
-
因为if判断未false,故不会执行setExclusiveOwnerThread(current);。并返回false。
-
此时第一次尝试获取锁失败。故!tryAcquire(arg)值为true,进入后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中。
-
先进入addWaiter(Node.EXCLUSIVE), arg)中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //新建node节点,并把当前线程传入,而后继
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; // 此时,给创建新节点pre,并把List尾部(null)赋给tail。
if (pred != null) { //此时pre为null,故不会进入if中。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
- 进入enq(node)中,将新结点作为传参传入。
第一次循环:
private Node enq(final Node node) {
for (;;) {
Node t = tail; // 创建新节点,并复制为List尾部,即为null
if (t == null) { // 进入if判断体。
if (compareAndSetHead(new Node()))// 创造空节点,并设置为List头部
tail = head;// 尾部和头部指向空节点。
//此时第一次循环结束
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第二次循环:
for (;;) {
Node t = tail;
if (t == null) { //此时tail为头结点,即不为null,进入else。
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; //新结点的前驱指向头结点
if (compareAndSetTail(t, node)) { // 设置尾部
t.next = node;// 将尾部设置为node节点
return t; //返回t,但没有接收。即为新结点前面一个节点。此时为null节点。
}
}
}
-
进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))中。
第一次循环:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //创建p为node的前继。
if (p == head && tryAcquire(arg)) { //此时p为头结点也就是head,但尝试获取失败,故返回false,不进入if中。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 判断是否应该park线程
parkAndCheckInterrupt())// park线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 进入shouldParkAfterFailedAcquire(p, node)中,并将node与node前继传入。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前继的状态,此时为-1。
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //进入else
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);。
}
return false;
}
- 返回true,并进入parkAndCheckInterrupt()中。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- 调用Unsafe的park,停止线程。
综上,在有t1获取lock(),并未释放lock时,t2加入,调用了unsafe的park方法,故该情况下ReenTrantLock是一把重量锁。
情况四:t1未执行完成,List中还有t2,此时新线程t3.star()
接上,t3进入队列:
-
进入lock();
-
进入acquire(1)中。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 进入tryAcquire(arg)中,尝试获取lock。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前线程lock的次数,为0。
if (c == 0) {// 进入if
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- 进入hasQueuedPredecessors()中,查看队列是否有有排队的线程。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // 此时尾部指向t2,故t = t2(node)
Node h = head; // 此时头部指向null,故h = null(node)
Node s;
return h != t && // t2 != null,故进入后面的判断。
((s = h.next) == null || s.thread != Thread.currentThread());
// s = t2,t2不等于null,故成立。函数返回true。
}
-
在if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires))中,因为!hasQueuedPredecessors() 显示false,故不进行后面的判断以及执行if内部方法。并返回false。
-
回到原函数:
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 此时!tryAcquire(arg)为true,进行后面的判断。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 进入addWaiter(Node.EXCLUSIVE)中。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //为t3创建新node节点。
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;// 新节点pred 为List尾部,即pred = t2(node)
if (pred != null) {// t2不等于null,进入if内部。
node.prev = pred; // 设置t3的前继为t2.
if (compareAndSetTail(pred, node)) { // CAS
pred.next = node; // 设置t2的后继为t3.
return node; // 返回t3(node)
}
}
enq(node);
return node;
}
- 进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)中。
第一次循环:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // p = t2
if (p == head && tryAcquire(arg)) { // p != head,故不进入if。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 进入shouldParkAfterFailedAcquire(p, node)中,修改前继状态。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前继状态,此时t2状态未改变,为0。
if (ws == Node.SIGNAL)// SIGNAL为-1,故不进入if。
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 进入else
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // CAS,设置t2状态为-1.
}
return false; // 返回false
}
-
在if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())中,shouldParkAfterFailedAcquire(p, node)返回false,不进入if,再次循环。
-
第二次循环:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // p = t2
if (p == head && tryAcquire(arg)) { // p != head,故不进入if。
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 进入shouldParkAfterFailedAcquire(p, node)中,进行判断。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 第一次循环修改了t2(node)的state的状态为-1。
if (ws == Node.SIGNAL) //ws = -1。进入if中。
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;// 返回true
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 在if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())中,shouldParkAfterFailedAcquire(p, node)返回true,故进入后面的 parkAndCheckInterrupt()中,调用unsafe的park方法停止线程。
综上,在t1执行时,t2在队列排队,t3执行lock()时,调用了unsafe的park()方法。故此时的ReenTrantLock是一把重量锁。
情况五:此时t1,t2,t3都已执行完成,AQS内无待执行线程t4.lock()
-
进入lock();
-
进入acquire(1);中。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 进入tryAcquire(arg)中,尝试获取lock。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // c = 0;
if (c == 0) { // 进入if中
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- 进入hasQueuedPredecessors()中,判断List状态。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // t = 空节点
Node h = head; // h = 空节点
Node s;
return h != t && // t等于h,故为false,返回false。
((s = h.next) == null || s.thread != Thread.currentThread());
}
-
在if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires))中,!hasQueuedPredecessors()显示为true。故执行后面的compareAndSetState(0, acquires),尝试获取锁,并获取成功返回true。进入后面的setExclusiveOwnerThread(current);,设置当前执行线程。并返回true。
-
此时if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))中,!tryAcquire(arg) 显示为false。故不会进入后面的判断,并返回函数。并开始执行t4。
综上,在AQS内的所有线程执行完毕后,t4进入lock(),并未调用unsafe的park方法,故此时的ReenTrantLock依旧是一把轻量锁。