Semaphore源码分析

Semaphore介绍

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,他通过协调各个线程,以保证合理的使用公共资源。

Semaphore的简单示例

使用Semaphore来简单模拟数据库连接池

public class Pool {
//可同时访问资源的最大线程数
private static final int MAX_AVAILABLE = 100;
//信号量 表示:可获取的对象通行证
private final Semaphore available = new Semaphore(MAX_AVAILABLE,true);
//共享资源,可以想象成 items 数组内存储的都是Connection对象 模拟是连接池
protected Object[] items = new Object[MAX_AVAILABLE];
//共享资源占用情况,与items数组一一对应,比如:items[0]对象被外部线程占用,那么 used[0] == true,否则used[0] == false
protected boolean[] used = new boolean[MAX_AVAILABLE];

/**
* 获取一个空闲对象
* 如若无空闲对象则等待,直到有空闲对象为止
*/
public Object getItem() throws InterruptedException{
available.acquire();
return getNextAvailableItem();
}

/**
* 获取池中的一个空闲对象,获取成功后返回Object,失败返回null
* 成功后将对应的used[i] 设置为true
*/
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; i++) {
if(!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}

/**
* 归还对象到池中
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}

/**
* 归还对象到池中,归还成功返回true
* 归还失败:
* 1、池中不存在该对象的引用,返回false
* 2、池中含有该对象的引用,但该对象目前状态为空闲状态,也返回false
*/
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; i++) {
if(item == items[i]) {
if(used[i]) {
used[i] = false;
return true;
}else {
return false;
}
}
}
return false;
}
}

 Semaphore的源码分析

 1、Semaphore的构造方法

/**
* permits:通行证的个数
* fair: 是否公平的获取锁
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

/**
* 默认是非公平锁
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
 

 2、Semaphore公平锁模式下的acquire()方法

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
* 共享式地获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,
* 在同一时刻可以有多个线程获取到同步状态,该方法可以响应中断
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//条件成立:说明当前调用acquire方法的线程 已经是 中断状态了,直接抛出异常..
if (Thread.interrupted())
throw new InterruptedException();
//如果获取同步状态失败,返回值小于0
if (tryAcquireShared(arg) < 0)
     //可共享的中断模式下尝试获取锁
doAcquireSharedInterruptibly(arg);
}
/**
* 尝试获取通行证,获取成功返回 >= 0的值;
* 获取失败 返回 < 0 值
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
// //判断当前 AQS 阻塞队列内 是否有等待者线程,如果有直接返回-1,表示当前aquire操作的线程需要进入到队列等待..
if (hasQueuedPredecessors())
return -1;
//执行到这里,有哪几种情况?
//1.调用aquire时 AQS阻塞队列内没有其它等待者
//2.当前节点 在阻塞队列内是headNext节点

//获取state ,state这里表示 通行证
int available = getState();
//remaining 表示当前线程 获取通行证完成之后,semaphore还剩余数量
int remaining = available - acquires;
//条件一:remaining < 0 成立,说明线程获取通行证失败..
//条件二:前置条件,remaning >= 0, CAS更新state 成功,说明线程获取通行证成功,CAS失败,则自旋。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将调用semaphore.await()方法的线程 包装成node加入到 AQS的阻塞队列当中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前线程节点的前驱节点
final Node p = node.predecessor();
//条件成立,说明当前线程对应的节点 为 head.next节点
if (p == head) {
int r = tryAcquireShared(arg);
//说明还有剩余的通行证
if (r >= 0) {
//设置当前节点为 head节点,并且向后传播!(依次唤醒!)
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//shouldParkAfterFailedAcquire 会给当前线程找一个好爸爸,最终给爸爸节点设置状态为 signal(-1),返回true
//parkAndCheckInterrupt 挂起当前节点对应的线程...
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//如果发生了中断,取消该节点竞争共享锁
if (failed)
cancelAcquire(node);
}
}
/**
* 设置当前节点为 head节点,并且向后传播!(依次唤醒!)
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//将当前节点设置为 新的 head节点。
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//获取当前节点的后继节点..
Node s = node.next;
//条件一:s == null 什么时候成立呢? 当前node节点已经是 tail了,条件一会成立。 doReleaseShared() 里面会处理这种情况..
//条件二:前置条件,s != null , 要求s节点的模式必须是 共享模式。 latch.await() -> addWaiter(Node.SHARED)
if (s == null || s.isShared())
//基本上所有情况都会执行到 doReleasseShared() 方法。
doReleaseShared();
}
}
 /**
* 唤醒获取资源失败的线程
*
* Semaphore版本
* 都有哪几种路径会调用到doReleaseShared方法呢?
* 1、semaphore.release() -> sync.releaseShared(1) -> tryReleaseShared() -> doReleaseShared
* 2、被唤醒的线程 ->doAcquireSharedInterruptibly() parkAndCheckInterrupt() 唤醒 -> setHeadAndPropagate() -> doReleaseShared()
*/
private void doReleaseShared() {
for (;;) {
//获取当前AQS 内的 头结点
Node h = head;
//条件一:h != null 成立,说明阻塞队列不为空..
//条件二:h != tail 成立,说明当前阻塞队列内,除了head节点以外 还有其他节点。

if (h != null && h != tail) {
//执行到if里面,说明当前head 一定有 后继节点!

int ws = h.waitStatus;
//当前head状态 为 signal 说明 后继节点并没有被唤醒过呢...
if (ws == Node.SIGNAL) {
//唤醒后继节点前 将head节点的状态改为 0
//这里为什么,使用CAS呢? 回头说...
//当doReleaseShared方法 存在多个线程 唤醒 head.next 逻辑时,
//CAS 可能会失败...
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
     //false 没有人执行setHead(node)方法,有可能是没有线程需要唤醒了,或者唤醒线程已经更新过了head节点,那么执行自旋,在唤醒接下来的节点
     //true 唤醒线程还没有执行setHead(node)方法,唤醒线程唤醒以后执行doReleasedShared()方法
if (h == head) // loop if head changed
break;
}
}

 3、Semaphore非公平锁模式下的acquire()方法(tryAcquireShared方法不同)

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取当前的通行证数目
int available = getState();
int remaining = available - acquires;
//如果当前通行证不够,或者够的情况下cas设置通行证数目成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
//返回剩余的通行证数目
return remaining;
}
}
 

 4、Semaphore的release()方法

public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//条件成立:表示当前线程释放资源成功,释放资源成功后,去唤醒获取资源失败的线程..
if (tryReleaseShared(arg)) {
//唤醒获取资源失败的线程...
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
 
 

 

上一篇:semaphore


下一篇:POJ1163(简单的DP)