}
public static void main(String[] args) {
final Mutex mutex = new Mutex();
new Thread(() -> {
System.out.println(“thread1 acquire mutex”);
mutex.acquire(1);
// 获取资源后sleep保持
try {
TimeUnit.SECONDS.sleep(5);
} catch(InterruptedException ignore) {
}
mutex.release(1);
System.out.println(“thread1 release mutex”);
}).start();
new Thread(() -> {
// 保证线程2在线程1启动后执行
try {
TimeUnit.SECONDS.sleep(1);
} catch(InterruptedException ignore) {
}
// 等待线程1 sleep结束释放资源
mutex.acquire(1);
System.out.println(“thread2 acquire mutex”);
mutex.release(1);
}).start()
}
}
复制代码
示例代码简单通过AQS
实现一个互斥操作,线程1获取mutex
后,线程2的acquire
陷入阻塞,直到线程1释放。其中tryAcquire/acquire/tryRelease/release
的arg
参数可按实现逻辑自定义传入值,无具体要求。
@param arg the acquire argument. This value is conveyed to {@link #tryAcquire} but is otherwise uninterpreted and can represent anyting you like.
AQS核心结构
Node
前文提到,在AQS
中如果线程获取资源失败,会包装成一个节点挂载到CLH
队列上,AQS
中定义了Node
类用于包装线程。
Node
主要包含5个核心字段:
-
waitStatus
:当前节点状态,该字段共有5种取值: -
CANCELLED = 1
。节点引用线程由于等待超时或被打断时的状态。 -
SIGNAL = -1
。后继节点线程需要被唤醒时的当前节点状态。当队列中加入后继节点被挂起(block)
时,其前驱节点会被设置为SIGNAL
状态,表示该节点需要被唤醒。 -
CONDITION = -2
。当节点线程进入condition
队列时的状态。(见ConditionObject
) -
PROPAGATE = -3
。仅在释放共享锁releaseShared
时对头节点使用。(见共享锁分析) -
0
。节点初始化时的状态。 -
prev
:前驱节点。 -
next
:后继节点。 -
thread
:引用线程,头节点不包含线程。 -
nextWaiter
:condition
条件队列。(见ConditionObject
)
独占锁分析
acquire
public final void acquire(int arg) {
// tryAcquire需实现类处理
// 如获取资源成功,直接返回
if (!tryAcquire(arg) &&
// 如获取资源失败,将线程包装为Node添加到队列中阻塞等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如阻塞线程被打断
selfInterrupt();
}
复制代码
acquire
核心为tryAcquire
、addWaiter
和acquireQueued
三个函数,其中tryAcquire
需具体类实现。 每当线程调用acquire
时都首先会调用tryAcquire
,失败后才会挂载到队列,因此acquire
实现默认为非公平锁。
addWaiter
将线程包装为独占节点,尾插式加入到队列中,如队列为空,则会添加一个空的头节点。值得注意的是addWaiter
中的enq
方法,通过CAS+自旋
的方式处理尾节点添加冲突。
acquireQueue
在线程节点加入队列后判断是否可再次尝试获取资源,如不能获取则将其前驱节点标志为SIGNAL
状态(表示其需要被unpark
唤醒)后,则通过park
进入阻塞状态。
参照流程图,acquireQueued
方法核心逻辑为for(;;)
和shouldParkAfterFailedAcquire
。tail
节点默认初始状态为0,当新节点被挂载到队列后,将其前驱即原tail
节点状态设为SIGNAL
,表示该节点需要被唤醒,返回true
后即被park
陷入阻塞。for
循环直到节点前驱为head
后才尝试进行资源获取。
release
release
流程较为简单,尝试释放成功后,即从头结点开始唤醒其后继节点,如后继节点被取消,则转为从尾部开始找阻塞的节点将其唤醒。阻塞节点被唤醒后,即进入acquireQueued
中的for(;;)
循环开始新一轮的资源竞争。
共享锁分析
acquireShared & releaseShared
public final void acquireShared(int arg) {
// 负数表示获取共享锁失败,不同于tryAcquire的bool返回
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
复制代码
acquireShared
和releaseShared
整体流程与独占锁类似,tryAcquireShared
获取失败后以Node.SHARED
挂载到队尾阻塞,直到队头节点将其唤醒。在doAcquireShared
与独占锁不同的是,由于共享锁是可以被多个线程获取的,因此在首个阻塞节点被唤醒后,会通过setHeadAndPropagate
传递唤醒后续的阻塞节点。
// doAcquireShared核心代码
final Node node = addWaiter(Node.SHARED);
…
for (;