要讲AbstractQueuedSynchronizer,我觉得最好还是实际使用一个重入锁,来看内部实现,这里使用ReentrantLock
ReentrantLock NonFairLock = new ReentrantLock();
ReentrantLock fairLock = new ReentrantLock(true);
可以看到ReentrantLock构造函数有两种实现,一个是默认的非公平锁,另一个是非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
看一下ReentrantLock内部构造
//首先实际lock,调用的是一个Sync类,Sync继承了AQS
abstract static class Sync extends AbstractQueuedSynchronizer {}
//具体的非公平锁,继承了Sync类
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
非公平锁
加锁
以下是nonFairLock.lock()方法的大致流程
ReentrantLock.lock()
public void lock() {
sync.lock();
}
这里使用了ReentrantLock的类常量,private final Sync sync,sync调用它子类NonfairSync.lock()方法
NonfairSync.lock()
final void lock() {
//初次获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//判断是重入锁还是去竞争别人的锁
acquire(1);
}
重点关注下,当使用CAS更改state变量失败后(也即是没有初次获取到锁,如果是重入锁,也会失败),AQS.acuire(1)
AQS.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
//addWaiter(独占锁模式)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法首先会去调用子类实现的tryAcquire(int arg)方法,在非公平锁中会调用Sync的nonfairTryAcquire(int acquires)方法
ReentrantLock.Sync.nonfairTryAcquire(int acquires)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取当前的state
int c = getState();
//如果当前state == 0,那么使用CAS修改state,即获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//当前线程已经获取了锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//获取锁失败,返回false,调用后续的函数
return false;
}
获取锁成功,!tryacquire(arg)相当于是false,直接返回
如果获取锁失败,!tryacquire(arg)相当于是true,需要执行if判断中的acquireQueued(final Node node, int arg)函数
这里看一看获取锁失败后的流程
首先将当前线程封装成Node节点,进入队列(实际是链表)
AQS.addWaiter(Node mode)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//直接获取连边末尾的节点,尝试进行快速插入
//这里的条件是链表中已经有了等待节点,也即是链表初始化了
//当采用一次CAS设置末尾节点的时候,还是会调用enq方法,进行for死循环插入队列
Node pred = tail;
if (pred != null) {
node.prev = pred;
//看,这里又用到了CAS
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果CAS设置失败
//或者链表还没有初始化,则进行初始化
enq(node);
return node;
}
看一下enq(final Node node) 方法
AQS.enq(final Node node)
private Node enq(final Node node) {
//这里采用了for(;;)循环的方式
for (;;) {
Node t = tail;
//链表没有初始化,此时会生成一个dummy虚头节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//链表已经初始化了
//将新的node节点放入末尾
node.prev = t;
//再次采用CAS设置,如果失败,则会反复for循环
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
将等待线程封装成节点后,调用acquireQueued(final Node node, int arg)方法
AQS.acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//再次for(;;)循环
for (;;) {
//获取到当前节点的前继
final Node p = node.predecessor();
//当前继为头结点的时候,可以尝试使用tryAcuqire(arg)方法获取锁
//在非公平锁中会调用Sync的nonfairTryAcquire(int acquires)方法
if (p == head && tryAcquire(arg)) {
//获取锁成功,将当前节点设置为头结点
setHead(node);
/**
AQS.setHead(node)方法
private void setHead(Node node) {
head = node;
//将头结点中的线程属性设位null
node.thread = null;
node.prev = null;
}
*/
p.next = null; // help GC
failed = false;
return interrupted;
}
//当该节点的前继不是head节点,或者获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果挂起线程,检测到被中断,那么将中断标识设置为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在for循环中,如果节点的前继不是head,或者前继是head,但是获取锁失败(前继没有完全释放,state != 0),会在if()判断中调用shouldParkAfterFailedAcquire(p, node) 和 parkAndCheckInterrupt()方法
注意,如果should()方法返回是false,不会执行park()方法
AQS.shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
//前继的等待状态已经设置为SIGNAL,该节点可以调用LockSupport.park()方法挂起
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
//前继节点已经取消了,此时需要找新的前继节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//前继节点为0或者PROPAGARE,采用CAS的方式更改为SIGNAL,并返回false
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果should()返回true,说明前继的等待状态已经是SIGNAL了,则执行parkAndCheckInterrupt()挂起当前线程
AQS.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//判断当前线程是否被中断
return Thread.interrupted();
}
解锁
因为解锁的线程首先要求获取到了锁,所以不存在锁竞争,调用链相对简单
ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}
调用了sync.release(int arg)方法,也即是父类AQS的release(int arg)方法
AQS.release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先会执行tryRelease(1)方法
ReentrantLock.Sync.tryRelease(int releases)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
//如果当前线程没有获得锁,抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果state被释放为0,则标记free,锁完全释放
free = true;
//设置当前锁没有被任何线程获取
setExclusiveOwnerThread(null);
}
//更新state
setState(c);
//返回free标记,用于AQS.release()流程判断
return free;
}
如果Sync.tryRelease()方法完全释放了锁,则AQS.release()方法中会执行unparkSuccessor(head)方法,去唤醒后继节点
AQS.unparkSuccessor(Node node)
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
//如果node的后继节点为空或者被取消,则从后往前找下一个可以唤醒的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒后继节点(线程)
LockSupport.unpark(s.thread);
}
LockSupport.unpark()
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
至此,ReentrantLock的非公平加锁和解锁,流程看完了