每一个Java工程师应该都或多或少了解过AQS,我自己也是前前后后,反反复复研究了很久,看了忘,忘了再看,每次都有不一样的体会。这次趁着写博客,打算重新拿出来系统的研究下它的源码,总结成文章,便于以后复习。
原文地址:http://www.jianshu.com/p/71449a7d01af
AbstractQueuedSynchronizer(以下简称AQS)作为java.util.concurrent包的基础,它提供了一套完整的同步编程框架,开发人员只需要实现其中几个简单的方法就能*的使用诸如独占,共享,条件队列等多种同步模式。我们常用的比如ReentrantLock,CountDownLatch等等基础类库都是基于AQS实现的,足以说明这套框架的强大之处。鉴于此,我们开发人员更应该了解它的实现原理,这样才能在使用过程中得心应手。
总体来说个人感觉AQS的代码非常难懂,本文就其中的独占锁实现原理进行分析。
一、执行过程概述
首先先从整体流程入手,了解下AQS独占锁的执行逻辑,然后再一步一步深入分析源码。
获取锁的过程:
- 当线程调用acquire()申请获取锁资源,如果成功,则进入临界区。
- 当获取锁失败时,则进入一个FIFO等待队列,然后被挂起等待唤醒。
- 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则进入临界区,否则继续挂起等待。
释放锁过程:
- 当线程调用release()进行锁资源释放时,如果没有其他线程在等待锁资源,则释放完成。
- 如果队列中有其他等待锁资源的线程需要唤醒,则唤醒队列中的第一个等待节点(先入先出)。
二、源码深入分析
基于上面所讲的独占锁获取释放的大致过程,我们再来看下源码实现逻辑:
首先来看下获取锁的方法acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
代码虽然短,但包含的逻辑却很多,一步一步看下:
- 首先是调用开发人员自己实现的tryAcquire() 方法尝试获取锁资源,如果成功则整个acquire()方法执行完毕,即当前线程获得锁资源,可以进入临界区。
- 如果获取锁失败,则开始进入后面的逻辑,首先是addWaiter(Node.EXCLUSIVE)方法。来看下这个方法的源码实现:
//注意:该入队方法的返回值就是新创建的节点
private Node addWaiter(Node mode) {
//基于当前线程,节点类型(Node.EXCLUSIVE)创建新的节点
//由于这里是独占模式,因此节点类型就是Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//这里为了提搞性能,首先执行一次快速入队操作,即直接尝试将新节点加入队尾
if (pred != null) {
node.prev = pred;
//这里根据CAS的逻辑,即使并发操作也只能有一个线程成功并返回,其余的都要执行后面的入队操作。即enq()方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//完整的入队操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果队列还没有初始化,则进行初始化,即创建一个空的头节点
if (t == null) {
//同样是CAS,只有一个线程可以初始化头结点成功,其余的都要重复执行循环体
if (compareAndSetHead(new Node()))
tail = head;
} else {
//新创建的节点指向队列尾节点,毫无疑问并发情况下这里会有多个新创建的节点指向队列尾节点
node.prev = t;
//基于这一步的CAS,不管前一步有多少新节点都指向了尾节点,这一步只有一个能真正入队成功,其他的都必须重新执行循环体
if (compareAndSetTail(t, node)) {
t.next = node;
//该循环体唯一退出的操作,就是入队成功(否则就要无限重试)
return t;
}
}
}
}
上面的入队操作有两点需要说明:
一、初始化队列的触发条件就是当前已经有线程占有了锁资源,因此上面创建的空的头节点可以认为就是当前占有锁资源的节点(虽然它并没有设置任何属性)。
二、注意整个代码是处在一个死循环中,知道入队成功。如果失败了就会不断进行重试。
- 经过上面的操作,我们申请获取锁的线程已经成功加入了等待队列,通过文章最一开始说的独占锁获取流程,那么节点现在要做的就是挂起当前线程,等待被唤醒,这个逻辑是怎么实现的呢?来看下源码:
通过上面的分析,该方法入参node就是刚入队的包含当前线程信息的节点
final boolean acquireQueued(final Node node, int arg) {
//锁资源获取失败标记位
boolean failed = true;
try {
//等待线程被中断标记位
boolean interrupted = false;
//这个循环体执行的时机包括新节点入队和队列中等待节点被唤醒两个地方
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点就是头结点,则尝试获取锁资源
if (p == head && tryAcquire(arg)) {
//当前节点获得锁资源以后设置为头节点,这里继续理解我上面说的那句话
//头结点就表示当前正占有锁资源的节点
setHead(node);
p.next = null; //帮助GC
//表示锁资源成功获取,因此把failed置为false
failed = false;
//返回中断标记,表示当前节点是被正常唤醒还是被中断唤醒
return interrupted;
}
如果没有获取锁成功,则进入挂起逻辑
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//最后会分析获取锁失败处理逻辑
if (failed)
cancelAcquire(node);
}
}
挂起逻辑是很重要的逻辑,这里拿出来单独分析一下,首先要注意目前为止,我们只是根据当前线程,节点类型创建了一个节点并加入队列中,其他属性都是默认值。
//首先说明一下参数,node是当前线程的节点,pred是它的前置节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前置节点的waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起
return true;
if (ws > 0) {
//由waitStatus的几个取值可以判断这里表示前置节点被取消
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点
//注,由于头结点head是通过new Node()创建,它的waitStatus为0,因此这里不会出现空指针问题,也就是说最多就是找到头节点上面的循环就退出了
pred.next = node;
} else {
//根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面这个方法逻辑比较复杂,它是用来判断当前节点是否可以被挂起,也就是唤醒条件是否已经具备,即如果挂起了,那一定是可以由其他线程来唤醒的。该方法如果返回false,即挂起条件没有完备,那就会重新执行acquireQueued方法的循环体,进行重新判断,如果返回true,那就表示万事俱备,可以挂起了,就会进入parkAndCheckInterrupt()方法看下源码:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
//被唤醒之后,返回中断标记,即如果是正常唤醒则返回false,如果是由于中断醒来,就返回true
return Thread.interrupted();
}
看acquireQueued方法中的源码,如果是因为中断醒来,那么就把中断标记置为true。不管是正常被唤醒还是由与中断醒来,都会去尝试获取锁资源。如果成功则返回中断标记,否则继续挂起等待。
注:Thread.interrupted()方法在返回中断标记的同时会清除中断标记,也就是说当由于中断醒来然后获取锁成功,那么整个acquireQueued方法就会返回true表示是因为中断醒来,但如果中断醒来以后没有获取到锁,继续挂起,由于这次的中断已经被清除了,下次如果是被正常唤醒,那么acquireQueued方法就会返回false,表示没有中断。
最后我们回到acquireQueued方法的最后一步,finally模块。这里是针对锁资源获取失败以后做的一些善后工作,翻看上面的代码,其实能进入这里的就是tryAcquire()方法抛出异常,也就是说AQS框架针对开发人员自己实现的获取锁操作如果抛出异常,也做了妥善的处理,一起来看下源码:
//传入的方法参数是当前获取锁资源失败的节点
private void cancelAcquire(Node node) {
// 如果节点不存在则直接忽略
if (node == null)
return;
node.thread = null;
// 跳过所有已经取消的前置节点,跟上面的那段跳转逻辑类似
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
//这个是前置节点的后继节点,由于上面可能的跳节点的操作,所以这里可不一定就是当前节点,仔细想一下。^_^
Node predNext = pred.next;
//把当前节点waitStatus置为取消,这样别的节点在处理时就会跳过该节点
node.waitStatus = Node.CANCELLED;
//如果当前是尾节点,则直接删除,即出队
//注:这里不用关心CAS失败,因为即使并发导致失败,该节点也已经被成功删除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//这里的判断逻辑很绕,具体就是如果当前节点的前置节点不是头节点且它后面的节点等待它唤醒(waitStatus小于0),
//再加上如果当前节点的后继节点没有被取消就把前置节点跟后置节点进行连接,相当于删除了当前节点
compareAndSetNext(pred, predNext, next);
} else {
//进入这里,要么当前节点的前置节点是头结点,要么前置节点的waitStatus是PROPAGATE,直接唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
上面就是独占模式获取锁的核心源码,确实非常难懂,很绕,就这几个方法需要反反复复看很多遍,才能慢慢理解。
接下来看下释放锁的过程:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()方法是用户自定义的释放锁逻辑,如果成功,就判断等待队列中有没有需要被唤醒的节点(waitStatus为0表示没有需要被唤醒的节点),一起看下唤醒操作:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//把标记为设置为0,表示唤醒操作已经开始进行,提高并发环境下性能
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果当前节点的后继节点为null,或者已经被取消
if (s == null || s.waitStatus > 0) {
s = null;
//注意这个循环没有break,也就是说它是从后往前找,一直找到离当前节点最近的一个等待唤醒的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//执行唤醒操作
if (s != null)
LockSupport.unpark(s.thread);
}
相比而言,锁的释放操作就简单很多了,代码也比较少。
三、总结
以上就是AQS独占锁的获取与释放过程,大致思想很简单,就是尝试去获取锁,如果失败就加入一个队列中挂起。释放锁时,如果队列中有等待的线程就进行唤醒。但如果一步一步看源码,会发现细节非常多,很多地方很难搞明白,我自己也是反反复复学习很久才有点心得,但也不敢说已经研究通了AQS,甚至不敢说我上面的研究成果就是对的,只是写篇文章总结一下,跟同行交流交流心得。
除了独占锁,后面还会产出AQS一系列的文章,包括共享锁,条件队列的实现原理等。