Java并发编程实践 目录
并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier
并发编程 06—— CompletionService : Executor 和 BlockingQueue
并发编程 10—— 任务取消 之 关闭 ExecutorService
并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略
并发编程 20—— AbstractQueuedSynchronizer 深入分析
概述
第2 部分 java.util.concurrent 同步器类中的AQS
第1 部分 锁和同步器
先看下java.util.concurrent.locks大致结构
上图中,LOCK的实现类其实都是构建在AbstractQueuedSynchronizer上,为何图中没有用UML线表示呢,这是每个Lock实现类都持有自己内部类Sync的实例,而这个Sync就是继承AbstractQueuedSynchronizer(AQS)。为何要实现不同的Sync呢?这和每种Lock用途相关。另外还有AQS的State机制。下文会举例说明不同同步器内的Sync与state实现。
同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。
第2 部分 java.util.concurrent 同步器类中的AQS
什么是state机制
提供 volatile 变量 state; 用于同步线程之间的共享状态。通过 CAS 和 volatile 保证其原子性和可见性。对应源码里的定义:
/**
* 同步状态
*/
private volatile int state; /**
*cas
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
不同实现类的Sync与State
基于AQS构建的Synchronizer包括ReentrantLock,Semaphore,CountDownLatch, ReetrantRead WriteLock,FutureTask等,这些Synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不一样而已。
2.1 ReentrantLock
需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于1,也就是获得了重进入的效果,而其他线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。以下为ReetranLock的FairSync的tryAcquire实现代码解析。
//公平获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果当前重进入数为0,说明有机会取得锁
if (c == 0) {
//如果是第一个等待者,并且设置重进入数成功,那么当前线程获得锁
if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前线程本身就持有锁,那么叠加重进入数,并且继续获得锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//以上条件都不满足,那么线程进入等待队列。
return false;
}
2.2 Semaphore
则是要记录当前还有多少次许可可以使用,到0,就需要等待,也就实现并发量的控制,Semaphore一开始设置许可数为1,实际上就是一把互斥锁。以下为Semaphore的FairSync实现
protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
for (;;) {
Thread first = getFirstQueuedThread();
//如果当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程需要等待
if (first != null && first != current)
return -1;
//如果当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,并且减去当前线程需要的许可证得到剩下的值
int available = getState();
int remaining = available - acquires;
//如果remining<0,那么反馈给AQS当前线程需要等待,如果remaining>0,并且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知AQS当前线程拿到许可,可以继续执行。
if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
2.3 CountDownLatch
闭锁则要保持其状态,在这个状态到达终止态之前,所有线程都会被park住,闭锁可以设定初始值,这个值的含义就是这个闭锁需要被countDown()几次,因为每次CountDown是sync.releaseShared(1),而一开始初始值为10的话,那么这个闭锁需要被countDown()十次,才能够将这个初始值减到0,从而释放原子状态,让等待的所有线程通过。
//await时候执行,只查看当前需要countDown数量减为0了,如果为0,说明可以继续执行,否则需要park住,等待countDown次数足够,并且unpark所有等待线程
public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
} //countDown 时候执行,如果当前countDown数量为0,说明没有线程await,直接返回false而不需要唤醒park住线程,如果不为0,得到剩下需要 countDown的数量并且compareAndSet,最终返回剩下的countDown数量是否为0,供AQS判定是否释放所有await线程。
public boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2.4 FutureTask
需要记录任务的执行状态,当调用其实例的get方法时,内部类Sync会去调用AQS的acquireSharedInterruptibly()方法,而这个方法会反向调用Sync实现的tryAcquireShared()方法,即让具体实现类决定是否让当前线程继续还是park,而FutureTask的tryAcquireShared方法所做的唯一事情就是检查状态,如果是RUNNING状态那么让当前线程park。而跑任务的线程会在任务结束时调用FutureTask 实例的set方法(与等待线程持相同的实例),设定执行结果,并且通过unpark唤醒正在等待的线程,返回结果。
//get时待用,只检查当前任务是否完成或者被Cancel,如果未完成并且没有被cancel,那么告诉AQS当前线程需要进入等待队列并且park住
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
} //判定任务是否完成或者被Cancel
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
} //get时调用,对于CANCEL与其他异常进行抛错
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0,nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
} //任务的执行线程执行完毕调用(set(V v))
void innerSet(V v) {
for (;;) {
int s = getState();
//如果线程任务已经执行完毕,那么直接返回(多线程执行任务?)
if (s == RAN)
return;
//如果被CANCEL了,那么释放等待线程,并且会抛错
if (s == CANCELLED) {
releaseShared(0);
return;
}
//如果成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工作(一般由FutrueTask的子类实现)
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
参考
1.《java并发编程实战》 构建自定义的同步工具
3. AbstractQueuedSynchronizer的介绍和原理分析
4. 深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的实现分析(上)