Semaphore源码解读
目录
前言
Semaphore字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。构造方法:
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
permits 表示许可线程的数量fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程 。
主要方法:
public void acquire() throws InterruptedException
public void release()
tryAcquire(long timeout, TimeUnit unit)
acquire() 表示阻塞并获取许可,release() 表示释放许可,tryAcquire尝试获取,不会阻塞,超时后返回。具体的使用方法非常简单,就不细讲了,下面直接进入源码。
源码解读
和其他juc中的组件一样,Semaphore内部同样维护了一个Sync继承AbstractQueuedSynchronizer作为同步器的实现,下面看获得许可acquire():
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
会调用AbstractQueuedSynchronizer.acquireSharedInterruptibly(1):
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //如果线程由中断唤醒,直接抛出中断异常
if (tryAcquireShared(arg) < 0) //尝试获取许可,如果返回值不小于0.则说明获取成功
doAcquireSharedInterruptibly(arg);
}
进入tryAcquireShared(arg),这个方法有不同的实现,这里看非公平版本,tryAcquireShared(int acquires)--->Semaphore.Sync.nonfairTryAcquireShared(int acquires):
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState(); //获取state资源状态
int remaining = available - acquires; //计算剩余资源
/**
*如果剩余资源小于0,说明无剩余资源,直接返回。否则cas修改资源状态,只要还有剩余资源,则会无限自旋,不断获取
*/
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
如果上面的方法返回值小于0,则说明无剩余资源,进入AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(arg),这个方法的代码和Reentranlock中acquireQueued()大同小异(传送门:Reentantlock源码解读_w7sss的博客-CSDN博客),下面就挑不一样的讲讲:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //入队,同Reentranlock
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); //如果前驱节点是头节点再尝试一次获取资源
if (r >= 0) {
setHeadAndPropagate(node, r); //成功获得资源,修改节点状态为propagate
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //这边线程会正式阻塞,被唤醒后如果判断是由中断唤醒的会抛出中断异常,这里和Reentranlock不一样,Reentranlock的lock方法不会响应中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这里主要和Reentranlock不同的是Semaphore是共享锁,区别在这个方法:AbstractQueuedSynchronizer.setHeadAndPropagate(node, r):
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
这个方法的主要逻辑在if中,这里
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0)
同时出现了一串判断,我把Doug Lea的原文注释贴上了,也说明这个if确实没那么好理解,我们暂且先看if中的逻辑,这要有助于理解这个if判断:
if (s == null || s.isShared())
doReleaseShared(); //这里如果node节点的下一个节点是共享节点,就继续Release,即在接下来的代码有可能继续能唤醒后继节点,这在propagate > 0的时候是理所应当的
进入 doReleaseShared():
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //如果头节点waitStatus是SIGNAL,则唤醒后继节点,这种情况是最好理解的,相信大家自己按正常流程跟代码(不用考虑并发),就能走到这里
}
/**
* 如果头节点waitStatus是0,则cas修改为PROPAGATE,这里就要考虑并发了,在一个线程
*正在获取资源的时候,又有一个线程释放了资源,也走到这个方法,执行unparkSuccessor(),
*head的waitStatus被改为0,说明啥?说明即将又有资源被释放了,所以在这里把waitStatus改为PROPAGATE,
*告诉执行到setHeadAndPropagate的线程,你可以进入doReleaseShared()了,因为我马上要把坑空出来了,这时如果有新线程入队,就可以有机会马上被唤醒。某种意义上这有自旋的含义
*/
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // loop if head changed
break;
}
}
讲到这里,那setHeadAndPropagate中的那个if判断就好理解了:
先看第一个h.waitStatus < 0,因为在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0,表明有剩余资源了,所以这里可以判断进入doReleaseShared()操作直接唤醒可能马上就会入队的线程。
再看第二个h.waitStatus < 0,这时第一个h.waitStatus == 0,说明执行到第一个的时候还没有线程释放资源,但是到了第二个的时候h又被赋值成当前的head了,此时一般h.waitStatus ==0,但是如果此时又发生了在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0的情况,那么也因该判断进入doReleaseShared()操作唤醒可能马上就会入队的线程。
当然了,这里是有可能导致不必要的唤醒的,因为h.waitStatus同样可能是-1,这里这么写的目的估计也是为了让资源尽可能地被充分利用。这段代码要结合多个线程并发的情况看。个人感觉关键还是要看
在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0的情况
是发生在 setHead(node)之前还是之后,这个对应了两处h.waitStatus < 0的判断。
笔者讲的可能也不是太清楚,如有错误请指正。
好了acquire操作讲完了,下面是release操作:
public void release() {
sync.releaseShared(1);
}
进入
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
我想这个就不用讲了,都是之前的代码。
再次申明:如有错误,欢迎指正!