Java并发之AQS详解

}

public static void main(String[] args) {

final Mutex mutex = new Mutex();

new Thread(() -> {

System.out.println(“thread1 acquire mutex”);

mutex.acquire(1);

// 获取资源后sleep保持

try {

TimeUnit.SECONDS.sleep(5);

} catch(InterruptedException ignore) {

}

mutex.release(1);

System.out.println(“thread1 release mutex”);

}).start();

new Thread(() -> {

// 保证线程2在线程1启动后执行

try {

TimeUnit.SECONDS.sleep(1);

} catch(InterruptedException ignore) {

}

// 等待线程1 sleep结束释放资源

mutex.acquire(1);

System.out.println(“thread2 acquire mutex”);

mutex.release(1);

}).start()

}

}

复制代码

示例代码简单通过AQS实现一个互斥操作,线程1获取mutex后,线程2的acquire陷入阻塞,直到线程1释放。其中tryAcquire/acquire/tryRelease/releasearg参数可按实现逻辑自定义传入值,无具体要求。

@param arg the acquire argument. This value is conveyed to {@link #tryAcquire} but is otherwise uninterpreted and can represent anyting you like.

AQS核心结构


Node

前文提到,在AQS中如果线程获取资源失败,会包装成一个节点挂载到CLH队列上,AQS中定义了Node类用于包装线程。

Java并发之AQS详解

Node主要包含5个核心字段:

  • waitStatus:当前节点状态,该字段共有5种取值:

  • CANCELLED = 1。节点引用线程由于等待超时或被打断时的状态。

  • SIGNAL = -1。后继节点线程需要被唤醒时的当前节点状态。当队列中加入后继节点被挂起(block)时,其前驱节点会被设置为SIGNAL状态,表示该节点需要被唤醒。

  • CONDITION = -2。当节点线程进入condition队列时的状态。(见ConditionObject)

  • PROPAGATE = -3。仅在释放共享锁releaseShared时对头节点使用。(见共享锁分析)

  • 0。节点初始化时的状态。

  • prev:前驱节点。

  • next:后继节点。

  • thread:引用线程,头节点不包含线程。

  • nextWaitercondition条件队列。(见ConditionObject)

独占锁分析

acquire

public final void acquire(int arg) {

// tryAcquire需实现类处理

// 如获取资源成功,直接返回

if (!tryAcquire(arg) &&

// 如获取资源失败,将线程包装为Node添加到队列中阻塞等待

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

// 如阻塞线程被打断

selfInterrupt();

}

复制代码

acquire核心为tryAcquireaddWaiteracquireQueued三个函数,其中tryAcquire需具体类实现。 每当线程调用acquire时都首先会调用tryAcquire,失败后才会挂载到队列,因此acquire实现默认为非公平锁

addWaiter将线程包装为独占节点,尾插式加入到队列中,如队列为空,则会添加一个空的头节点。值得注意的是addWaiter中的enq方法,通过CAS+自旋的方式处理尾节点添加冲突。

Java并发之AQS详解

acquireQueue在线程节点加入队列后判断是否可再次尝试获取资源,如不能获取则将其前驱节点标志为SIGNAL状态(表示其需要被unpark唤醒)后,则通过park进入阻塞状态。

Java并发之AQS详解

参照流程图,acquireQueued方法核心逻辑为for(;;)shouldParkAfterFailedAcquiretail节点默认初始状态为0,当新节点被挂载到队列后,将其前驱即原tail节点状态设为SIGNAL,表示该节点需要被唤醒,返回true后即被park陷入阻塞。for循环直到节点前驱为head后才尝试进行资源获取。

release

release流程较为简单,尝试释放成功后,即从头结点开始唤醒其后继节点,如后继节点被取消,则转为从尾部开始找阻塞的节点将其唤醒。阻塞节点被唤醒后,即进入acquireQueued中的for(;;)循环开始新一轮的资源竞争。

Java并发之AQS详解

共享锁分析

acquireShared & releaseShared

public final void acquireShared(int arg) {

// 负数表示获取共享锁失败,不同于tryAcquire的bool返回

if (tryAcquireShared(arg) < 0)

doAcquireShared(arg);

}

public final boolean releaseShared(int arg) {

if (tryReleaseShared(arg)) {

doReleaseShared();

return true;

}

return false;

}

复制代码

acquireSharedreleaseShared整体流程与独占锁类似,tryAcquireShared获取失败后以Node.SHARED挂载到队尾阻塞,直到队头节点将其唤醒。在doAcquireShared与独占锁不同的是,由于共享锁是可以被多个线程获取的,因此在首个阻塞节点被唤醒后,会通过setHeadAndPropagate传递唤醒后续的阻塞节点。

// doAcquireShared核心代码

final Node node = addWaiter(Node.SHARED);

for (;

上一篇:ruby-安装RVM时出错


下一篇:JUC同步框架详解