What
Semaphore标识信号量,允许指定数量的线程同时访问某个资源
How
通过以下两部实现信号量:
-
acquire
方法用于获得准入许可(如果没有获得许可,则进行等待,直到有线程释放许可而获得许可为止) -
release
用于释放准入许可
应用场景
- 实现某种资源池限制,类似于数据库连接池
- 对容器施加边界,比如一个集合中最多只能添加5个元素
- 资源并发访问数量限制
- 当作普通的锁使用(信号量为1时相当于普通的锁 信号量大于1时共享锁)
Semaphore代码示例
import java.util.concurrent.Semaphore;
public class SemaphoreDemo implements Runnable {
Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(semaphoreDemo);
thread.start();
}
}
@Override
public void run() {
try {
// 获得准入许可(如何没有获得成功,则进行等待,直到有线程释放许可而获得该许可为止)
semaphore.acquire();
Thread.sleep(1000);
System.out.println(System.currentTimeMillis() + ", " + Thread.currentThread().getName() + ", 执行完毕!");
// 释放准入许可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
示例中,有10条线程对信号量为5的资源进行争用,但每次只有5个线程拿到许可,另外的线程需要等待拿到许可的线程释放许可后才能拿到许可
Semaphore源码解析
关键方法如下:
- 构造方法:new Semaphore(5);
- 获取许可:semaphore.acquire();
- 释放许可:semaphore.release();
接下来我们从这三个方法入手进行源码解析
1. 构造方法:new Semaphore(5)
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
默认构造方法为非公平共享锁,可以通过构造参数fair
来选择公平或非公平,类似于ReentantLock
2. 获取许可:semaphore.acquire()
public void acquire() throws InterruptedException {
// 共享式获取AQS的同步状态
sync.acquireSharedInterruptibly(1);
}
调用的是AQS的acquireSharedInterruptibly
方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (tryAcquireShared(arg) < 0) {
doAcquireSharedInterruptibly(arg);
}
}
其中tryAcquireShared
依赖于Sync实现,在Semaphore中有AQS的实现Sync类,方法如下:
// 尝试获取共享锁
protected int tryAcquireShared(int acquires) {
for (; ; ) {
// 队列中存在等待线程则返回-1
if (hasQueuedPredecessors())
return -1;
int available = getState(); // 可用许可数量
int remaining = available - acquires; // 剩余许可数量
if (remaining < 0 || compareAndSetState(available, remaining))
// 返回可用的余量
return remaining;
}
}
这是FairSync的tryAcquireShared方法,在NonfairSync中,没有hasQueuedPredecessors()
判断,其余一样。
在方法中可以看出,最终返回的是剩余的许可数量,有如下几种情况:
-
如果剩余许可数量<0,则执行
doAcquireSharedInterruptibly
方法让线程自旋等待,这里是等待别的线程释放许可后线程被唤醒去尝试获取private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 线程进入同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (; ; ) { // 自旋 final Node p = node.predecessor(); if (p == head) { // 当前节点的前置节点是AQS的头节点 即自己是AQS同步队列的第一个节点 int r = tryAcquireShared(arg); // 再去获取信号量 if (r >= 0) { setHeadAndPropagate(node, r); // 退出自旋 p.next = null; // help GC failed = false; return; } } // 判断是否应该挂起该线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException(); } } } finally { if (failed) { cancelAcquire(node); // 获取失败 就取消获取 } } }
-
否则就是拿到了许可数量,继续正常执行,不阻塞
3. 释放许可:semaphore.release()
public void release() {
sync.releaseShared(1);
}
同样,调用的是AQS的releaseShared
方法,看下代码:
public final boolean releaseShared(int arg) {
// 调用AQS实现类的tryReleaseShared
if (tryReleaseShared(arg)) {
// 唤醒后续的线程节点
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
交由子类Sync实现,代码如下:
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState(); // 当前信号量许可数
int next = current + releases; // 当前信号量许可数+释放的信号量许可数
if (next < current) // overflow 一般不会走进来
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) // CAS更新当前信号量许可数
return true;
}
}
释放许可成功则继续调用AQS的doReleaseShared
方法来唤醒后续节点可以来争取许可了
private void doReleaseShared() {
for (; ; ) { // 自旋等待
Node h = head;
// 有头节点且头节点和尾节点不是同一个
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 设置status为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue; // 循环检查
}
// 唤醒节点的后续节点
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) { //
continue; // 失败则继续循环
}
}
if (h == head) {
break;
}
}
}
总结
Semaphore使用AQS同步状态来保存信号量的计数器。acquireSharedInterruptibly
会减少计数(获取许可),当计数为非正值的时候阻塞线程,否则不会阻塞线程releaseShared
方法会增加计数(释放许可),在计数不超过信号量限制时会解除线程的阻塞(获取到许可的线程)