Java多线程(8)
AQS
AbstractQueuedSynchronized(AQS),类如其名,抽象的队列式的同步器,AQS定义了套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…
双向队列
Node节点
AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,对列元素的类型为Node,其中Node中的Thread变量用来存放进入AQS对列里面的线程;Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后进入AQS队列的;EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的;waitStatus记录当前线程的等待状态,可以为CANCELLED(线程被取消了),SIGNAL(线程需要被唤醒),CONDITION(线程在条件队列里面等待),PROPAGATE(释放共享资源时需要通知其他节点)
看下Node节点结构:
其中这四个常量的意思:
//waitStatus值为1时表示该线程节点已释放(超时、中断),已取消的节点不会再阻塞
static final int CANCELLED = 1;
//waitStatus为-1时表示该线程的后续线程需要阻塞,即只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
static final int SIGNAL = -1;
//waitStatus为-2时,表示该线程在condition队列中阻塞(Condition有使用)
static final int CONDITION = -2;
//waitStatus为-3时,表示该线程以及后续线程进行无条件传播(CountDownLatch中有使用)共享模式下, PROPAGATE 状态的线程处于可运行状态
static final int PROPAGATE = -3;
这个双向队列本质上就是一个双向链表,Node就是该链表的节点:
5个属性
waitStatus :
waitStatus是当前节点的一个等待状态标志位,该标志位决定了该节点在当前情况下处于何种状态。
prve :
同步线程队列中保存的前置节点的地址
next :
同步线程队列中保存的后续节点的地址
thread :
同步线程队列主要存储的线程信息
nextWaiter:
AQS中条件队列是使用单向列表保存的,用nextWaiter来连接。阻塞队列和条件队列并不是使用的相同的数据结构
nextWaiter实际上标记的就是在该节点唤醒后依据该节点的状态判断是否依据条件唤醒下一个节点(注:比如说当前节点A是共享的,那么它的这个字段是shared,也就是说在这个等待队列中,A节点的后继节点也是shared。如果A节点不是共享的,那么它的nextWaiter就不是一个SHARED常量,即是独占的。)
:
-
SHARED(共享模式):直接唤醒下一个节点
-
EXCLUSIVE(独占模式):等待当前线程执行完成后再唤醒
方法解析
// 构造方法为空参构造,一般用于创建head节点,或者为nextWaiter设置共享标志。
Node() {
}
// 构造方法用于创建一个带有条件队列的节点
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 用于创建一个带有初始等waitStatus的节点
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
// 检查当前节点是否为共享节点
final boolean isShared() {
return nextWaiter == SHARED;
}
// 用来查找前置节点是否存在,相当于为前置节点查空
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
ConditionObject
AQS有一个内部类ConditionObject,用来结合锁实现线程同步,ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),用来存放调用条件变量的await方法后被阻塞的线程
看下ConditionObject结构:
定义的变量
//条件(等待)队列的第一个节点
private transient Node firstWaiter;
//条件(等待)队列的最后一个节点
private transient Node lastWaiter;
方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//将当前线程加入到链表最后,并返回该节点
//释放当前线程占有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
/*
* while循环通过isOnSyncQueue方法判断节点node是否在同步队列中
*
* 这里有个理解难点,为什么需要判断节点Node是否在同步队列中呢?
* 因为当线程调用signal或signalAll时,会从firstWaiter节点开始,
* 将节点依次从等待队列中移除,并通过enq方法重新添加到同步队列中
*
* 因此当其他线程调用signal或者signalAll方法时,该线程可能从条件(等待)队列中移除,并重新加入到同步队列中
* 1. 如果没有,则阻塞当前线程,同时调用checkInterruptWhileWaiting检测当前线程在等待过程中是否发生中断,
* 设置interruptMode表示中断状态。
* 2. 如果isOnSyncQueue方法判断出当前线程已经处于同步队列中了,则跳出while循环
*/
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//利用acquireQueued方法循环尝试获取同步状态(锁)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();//将等待队列中,不是Node.CONDITION状态的节点移除
if (interruptMode != 0)//判断中断状态,
reportInterruptAfterWait(interruptMode);
}
//将当前线程生成的节点加入到链表末尾,并返回该节点
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果最后一个节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();//这个方法就是将等待队列中不是Node.CONDTION状态的节点从链表中移除
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//遍历一次CONDITION链表,删除状态为CANCELLED的节点。
private void unlinkCancelledWaiters() {
//首节点
Node t = firstWaiter;
Node trail = null;
while (t != null) {
//下一个节点
Node next = t.nextWaiter;
//如果t的状态是cancelled的,则需要删除t
if (t.waitStatus != Node.CONDITION) {
//清除t的nextWaiter连接
t.nextWaiter = null;
//删除的是首节点
if (trail == null)
firstWaiter = next;
else
//直接将前一个节点的连接指向该节点的下一个节点
trail.nextWaiter = next;
//设置新的尾节点
if (next == null)
lastWaiter = trail;
}
//状态为CONDITION的节点不需要清除
else
trail = t;
t = next;
}
}
//完全释放锁,释放成功则返回,失败则将当前节点的状态设置成cancelled表示当前节点失效
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取当前锁重入的次数
int savedState = getState();
//释放锁
if (release(savedState)) {
//释放成功
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//释放锁失败,则当前节点的状态变为cancelled(此时该节点在CONDITION队列中)
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//尝试释放锁,释放成功则调用unparkSuccessor唤醒后继节点
public final boolean release(int arg) {
//调用tryRelease释放锁。
if (tryRelease(arg)) {
//释放成功,则查看head节点状态,如果不为null且状态不为0(为0表示没有后继或者当前节点已经unparkSuccessor过),则调用unparkSuccessor唤醒后继节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//判断该节点是否在CLH队列中
final boolean isOnSyncQueue(Node node) {
//如果该节点的状态为CONDITION(该状态只能在CONDITION队列中出现,CLH队列中不会出现CONDITION状态),或者该节点的prev指针为null,则该节点一定不在CLH队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果该节点的next(不是nextWaiter,next指针在CLH队列中指向下一个节点)状态不为null,则该节点一定在CLH队列中
if (node.next != null) // If has successor, it must be on queue
return true;
//否则只能遍历CLH队列(从尾节点开始遍历)查找该节点
return findNodeFromTail(node);
}
//从尾节点开始,使用prev指针,遍历整个CLH队列
private boolean findNodeFromTail(Node node) {
Node t = tail;
//从尾节点开始,使用prev指针,开始遍历整个CLH队列
for (;;) {
//找到该节点
if (t == node)
return true;
//遍历完成,没有找到该节点
if (t == null)
return false;
t = t.prev;
}
}
//在等待后发生中断,在此处根据interruptMode统一处理
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
awaitNanos:如果当前线程发生中断,则抛出异常;超时则强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
与await的区别:
-
(1)超时则强制将该节点从CONDITION队列transfer到CLH队列中。
-
(2)阻塞调用的是LockSupport.parkNanos(this, nanosTimeout),带有时间
-
(3)每次循环都要更新nanosTimeout,如果超时则发生(1)
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//新创建一个CONDITION状态的节点,并加入队列尾部
Node node = addConditionWaiter();
//node(此时处于CLH队列队首)释放占有的锁(在CLH队列中出队了),并且唤醒后继节点
int savedState = fullyRelease(node);
//根据nanosTimeout,计算deadline
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
//检测当前节点是否处于CLH队列中,没有则park当前线程,等待signal唤醒(从而将node节点从CONDITION队列中transfer到CLH队列中)
while (!isOnSyncQueue(node)) {
//如果超时,则调用transferAfterCancelledWait将当前Node强制transfer到CLH队列中
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//nanosTimeout大于spinForTimeoutThreshold,则调用parkNanos等待nanosTimeout时间
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//park的过程中发生中断,则跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//更新nanosTimeout
nanosTimeout = deadline - System.nanoTime();
}
//出了while循环,代表线程被唤醒,并且已经将该node从CONDITION队列transfer到了CLH队列中,或者发生中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//统一处理上面发生的中断或者异常情况。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//返回距离超时还剩多长时间
return deadline - System.nanoTime();
}
final boolean transferAfterCancelledWait(Node node) {
//将该节点状态由CONDITION变成0,调用enq将该节点从CONDITION队列添加到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//循环检测该node是否已经成功添加到CLH队列中
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
awaitUntil:在deadline时间之前没有被唤醒,则强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消),发生中断则抛出异常
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
//获取绝对时间
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
//新new一个node,添加到CLH队列的尾部
Node node = addConditionWaiter();
//释放锁
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
//循环检测该node是否已经成功添加到CLH队列中
while (!isOnSyncQueue(node)) {
//超时,强制transfer到CLH队列中(但是在CONDITION队列中的nextWaiter连接并没有取消)
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
//park到abstime时间
LockSupport.parkUntil(this, abstime);
//发生异常,跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//已经在CLH队列中了,或者抛出了异常
//调用acquireQueued(同步阻塞方法)在CLH队列中获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//该节点调用transferAfterCancelledWait添加到CLH队列中的,此时该节点的nextWaiter不为null,需要调用unlinkCancelledWaiters将该节点从CONDITION队列中删除
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//统一处理上面发生的中断或者异常情况。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//返回是否超时
return !timedout;
}
await(long time, TimeUnit unit):中断则抛出异常;超时强制转换到CLH队列中(在CONDITION队列中的nextWaiter连接并没有取消,此时同时处于CLH队列和CONDITION队列中)
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
//新new一个node,添加到CONDITION队列末尾
Node node = addConditionWaiter();
//释放锁(此节点在CLH队列中拥有锁,此时是CLH队列头结点)并唤醒后继节点。
int savedState = fullyRelease(node);
//计算deadline
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
//检测当前节点是否处于CLH队列中,没有则park当前线程,等待signal唤醒(从而将node节点从CONDITION队列中transfer到CLH队列中)
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
//park
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
//更新nanosTimeout
nanosTimeout = deadline - System.nanoTime();
}
//在CLH队列中获取锁,或者发生中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
//统一处理上面发生的中断或者异常情况。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
awaitUninterruptibly:忽略中断的等待
public final void awaitUninterruptibly() {
//将该线程封装成node,新节点的状态为CONDITION,添加到队列尾部
Node node = addConditionWaiter();
//在CLH队列首部释放占有的锁
int savedState = fullyRelease(node);
boolean interrupted = false;
//循环检测该node是否已经成功添加到CLH队列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//用interrupted保存中断标志,不抛出异常
if (Thread.interrupted())
interrupted = true;
}
//在CLH队列中获取锁 或者 interrupted发生中断了,则调用selfInterrupt发生中断
//acquireQueued是一个阻塞方法
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
signal:对CONDITION队列中第一个CONDITION状态的节点(将该节点以及前面的CANCELLED状态的节点从CONDITION队列中出队),将该节点从CONDITION队列中添加到CLH队列末尾,同时需要设置该节点在CLH队列中前驱节点的状态(若前驱节点为cancelled状态或者给前驱节点执行CAS操作失败,则需要调用park操作在此处唤醒该线程,否则就是在CLH队列中设置前驱节点的signal状态成功,则不用在此处唤醒该线程,唤醒工作交给前驱节点,可以少进行一次park和unpark操作)
//唤醒CONDITION队列中首部的第一个CONDITION状态的节点
public final void signal() {
//判断锁是否被当前线程独占,如果不是,则当前线程不能signal其他线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//CONDITION队列不为null,则doSignal方法将唤醒CONDITION队列中所有的节点线程
if (first != null)
doSignal(first);
}
//对CONDITION队列中从首部开始的第一个CONDITION状态的节点,执行transferForSignal操作,将node从CONDITION队列中转换到CLH队列中,同时修改CLH队列中原先尾节点的状态
private void doSignal(Node first) {
do {
//当前循环将first节点从CONDITION队列transfer到CLH队列
//从CONDITION队列中删除first节点,调用transferForSignal将该节点添加到CLH队列中,成功则跳出循环
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//两步操作,首先enq将该node添加到CLH队列中,其次若CLH队列原先尾节点为CANCELLED或者对原先尾节点CAS设置成SIGNAL失败,则唤醒node节点;否则该节点在CLH队列总前驱节点已经是signal状态了,唤醒工作交给前驱节点(节省了一次park和unpark操作)
final boolean transferForSignal(Node node) {
//如果CAS失败,则当前节点的状态为CANCELLED
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//enq将node添加到CLH队列队尾,返回node的prev节点p
Node p = enq(node);
int ws = p.waitStatus;
//如果p是一个取消了的节点,或者对p进行CAS设置失败,则唤醒node节点,让node所在线程进入到acquireQueue方法中,重新进行相关操作
//否则,由于该节点的前驱节点已经是signal状态了,不用在此处唤醒await中的线程,唤醒工作留给CLH队列中前驱节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signalAll:将CONDITION队列中所有node出队,逐个添加到CLH队列末尾,同时修改它们在CLH队列中前驱节点的状态,修改为signal成功,则不用在此处唤醒该节点的线程,唤醒工作交给CLH队列中的前驱节点,否则需要在此处park当前线程。
public final void signalAll() {
//查看当前线程是否独占锁,若不是,则当前线程没有权限执行signalAll操作,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//唤醒CONDITION队列中所有节点,同时transfer到CLH队列中
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
//将first节点从CONDITION队列中出队
Node next = first.nextWaiter;
first.nextWaiter = null;
//将first节点在CLH队列中入队,同时可能需要执行unpark操作
transferForSignal(first);
//更新first的指向
first = next;
} while (first != null);
}
参考文章
https://blog.csdn.net/u011470552/article/details/76571472
https://blog.csdn.net/zy1994hyq/article/details/84562475
Coder_py 发布了189 篇原创文章 · 获赞 58 · 访问量 18万+ 私信 关注