需要有一定的JUC基础来进行观看。
文章目录
AQS源码
图解大概描述了过程,使用state,类似于操作系统的信号量的操作。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E000uBCQ-1645527324567)(C:\Users\wang\AppData\Roaming\Typora\typora-user-images\1645521733726.png)]
初始化lock对象。这里默认是非公平锁,也就是true。以下讲解都会以非公平锁讲解,后面会讲其区别
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
一段同步代码是从lock()方法开始。
Lock.lock
public void lock() {
sync.acquire(1);
}
compareAndSetState(0, 1)指用CAS的方法将state由0变为1,底层是Unsafe.compareAndSwapInt()(后面章节会讲述)。
acquire开始
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Java实现的中断(调用Thread.interrupt()方法),实际就是从线程外界,修改线程内部的一个标志变量,或者让线程中的一些阻塞方法,抛出InterruptedException。以此”通知“线程去做一些事情, 至于做什么,做不做,实际完全是由线程内的业务代码自己决定的。不过一般都是释放资源并结束线程。
这里补充interrupt方法:如果线程处于被阻塞状态,那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。仅此而已。如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。
interrupt() 并不能真正的中断线程,需要被调用的线程自己进行配合才行。
Thread thread = new Thread(() -> {
while (!Thread.interrupted()) {
// do more work.
}
});
thread.start();
// 一段时间以后
thread.interrupt();
例如,这里在执行了thread.interrupt()方法后,while循环将退出。
acquire方法中if语句里需要分为三个部分来看
tryAcquire(arg)(一)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//尝试抢占锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入锁的情况
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// false 表示抢占失败
return false;
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))(二)
抢占锁失败后,进入队列。
addWaiter方法
private Node addWaiter(Node mode) {
//mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//尝试快速方式直接放到队尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//失败则通过enq入队。
enq(node);
return node;
}
enq方法
private Node enq(final Node node) {
//自旋,直到成功加入队尾
for (;;) {
Node t = tail;
// 队列为空,则创建一个空的标志结点作为head结点,并将tail也指向它。
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法(三)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//是否成功拿到资源
try {
boolean interrupted = false;//等待过程中是否被中断过
//自旋
for (;;) {
final Node p = node.predecessor();//拿到前驱
//如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是第一个非head节点释放完资源唤醒自己,也可能被interrupt)。
if (p == head && tryAcquire(arg)) {
setHead(node);//拿到资源后,将head指向该结点。
p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
failed = false; // 成功获取资源
return interrupted;//返回等待过程中是否被中断过。下面会对其进行记录。
}
//如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
}
} finally {
if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
cancelAcquire(node);
}
}
acquireQueued中的shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
//如果已经告诉前驱获取资源后后通知自己,那就可以安心挂起
return true;
if (ws > 0) {
//如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
// 循环向前查找取消节点,把取消节点从队列中剔除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//设置前任节点等待状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
acquireQueued中的parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//挂起自己
LockSupport.park(this);
//返回当前线程的中断状态,并清除标记
return Thread.interrupted();
}
acquire结束
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
selfInterrupt();中
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
至此加锁过程完成
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VcGEGr8f-1645527324568)(C:\Users\wang\AppData\Roaming\Typora\typora-user-images\1645526165642.png)]
总结:
1.调用tryAcquire()尝试直接去获取资源,如果成功则直接返回;
2.没成功,则addWaiter()将该线程加入等待队列的尾部(正常节点的后面,并清除不正常的被取消的节点),并标记为独占模式;
3.acquireQueued()是一个循环,线程如果可以拿到资源则返回,否则在等待队列中休息,当被千亿节点唤醒(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。但并不会对其运行产生影响。具体可看上面介绍。
Lock.unlock
public void unlock() {
sync.release(1);
}
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
tryRelease
// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
// 减少可重入次数
int c = getState() - releases;
// 当前线程不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
private void unparkSuccessor(Node node) {
//node为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
至此就全部结束
下篇会讲解公平和非公平,可重入和不可重入,共享和排他锁的细节。