Java线程同步之一--AQS
线程同步是指两个并发执行的线程在同一时间不同时执行某一部分的程序。同步问题在生活中也很常见,就比如在麦当劳点餐,假设只有一个服务员能够提供点餐服务。每个服务员在同一时刻只能接待一个顾客的点餐,那么除了正在接待的顾客,其他人只能等待排队。当一个点餐服务完成之后,其他顾客就可以上去进行点餐。
从这个例子中可以看到如下几个关注点:
- 点餐服务为临界区域(critical area),其可同时进行的数量,即为有多少人可进入临界区域。
- 排队即为对目前暂时无法取得点餐服务的人的一种处理方式。这种处理方式的特性有公平性(按次序),效率性(接手最快为最好)等。
- 顾客进行排队和从队伍中叫一个顾客来进行服务即为睡眠(park)和唤醒(unpark)机制。
并发中线程同步是重点需关注的问题,线程同步自然也有一定的模式,DougLea就写出了一个简单的框架AQS用来支持一大类线程同步工具,如ReentrantLock,CountdownLatch,Semphaore等。
AQS是concurrent包中的一系列同步工具的基础实现,其提供了状态位,线程阻塞-唤醒方法,CAS操作。基本原理就是根据状态位来控制线程的入队阻塞、出队唤醒来解决同步问题。
入队:
出队:
二、代码分析
下面以ReentrantLock来说明AQS的组成构件的工作情况:
在ReentrantLock中封装了一个同步器Sync,继承了AbstractQueuedSynchronizer,根据对临界区的访问的公平性要求不同,又分为NonfairSync和FairSync。为了简化起见,就取最简单的NonFairSync作为例子来说明:
1. 对于临界区的控制:
java.util.concurrent.locks.ReentrantLock.NonfairSync
1: final void lock() {
2:
3: if (compareAndSetState(0, 1))
4:
5: setExclusiveOwnerThread(Thread.currentThread());
6:
7: else
8:
9: acquire(1);
10:
11: }
12:
从以上代码可以看出,其主要目的是采用cas比较临界区的状态。
1.1. 如果为0,将其设置为1,并记录当前线程(当前线程可进入临界区);
1.2. 如果为1,尝试获取临界区控制
java.util.concurrent.locks.AbstractQueuedSynchronizer
1: public final void acquire(int arg) {
2:
3: if (!tryAcquire(arg) &&
4:
5: acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
6:
7: selfInterrupt();
8:
9: }
10:
1.2.1. NonFairLock的tryAcquire实现为:
1: final boolean nonfairTryAcquire(int acquires) {
2:
3: final Thread current = Thread.currentThread();
4:
5: int c = getState();
6:
7: if (c == 0) {
8:
9: if (compareAndSetState(0, acquires)) {
10:
11: setExclusiveOwnerThread(current);
12:
13: return true;
14:
15: }
16:
17: }
18:
19: else if (current == getExclusiveOwnerThread()) {
20:
21: int nextc = c + acquires;
22:
23: if (nextc < 0) // overflow
24:
25: throw new Error("Maximum lock count exceeded");
26:
27: setState(nextc);
28:
29: return true;
30:
31: }
32:
33: return false;
34:
35: }
36:
上述代码主要是针对大部分线程进入临界区工作时间不会很长而进行的性能优化,第一次尝试失败了,极有可能过一会儿锁就释放了,因此重新去尝试获取锁。
1.2.2. 以下这段代码是锁的精华部分
java.util.concurrent.locks.AbstractQueuedSynchronizer
1: final boolean acquireQueued(final Node node, int arg) {
2:
3: try {
4:
5: boolean interrupted = false;
6:
7: for (;;) {
8:
9: final Node p = node.predecessor();
10:
11: if (p == head && tryAcquire(arg)) {
12:
13: setHead(node);
14:
15: p.next = null; // help GC
16:
17: return interrupted;
18:
19: }
20:
21: if (shouldParkAfterFailedAcquire(p, node) &&
22:
23: parkAndCheckInterrupt())
24:
25: interrupted = true;
26:
27: }
28:
29: } catch (RuntimeException ex) {
30:
31: cancelAcquire(node);
32:
33: throw ex;
34:
35: }
36:
37: }
38:
在无限循环中完成了对线程的阻塞和唤醒。阻塞在parkAndCheckInterrupt()唤醒后从此处进行释放。
算法过程:
- 从加入队列的node开始反向查找,将前一个元素赋值给p;
- 如果p是head,那么试着再获得一次锁tryAcquire(arg),成功则将head指针往后移动,并跳出循环;
- 如果上一步骤尝试失败,那么进行测试是否要park ,如果状态为0,将其标记为SIGNAL,并返回false;
- 再重复检查一次,发现其头部的waitStatus为-1.Node.signal。确认需要park successor; 进行parkAndCheckInterrupt()将当前线程阻塞。
2. 对于临界区的释放
2.1. java.util.concurrent.locks.AbstractQueuedSynchronizer
1: public final boolean release(int arg) {
2:
3: if (tryRelease(arg)) {
4:
5: Node h = head;
6:
7: if (h != null && h.waitStatus != 0)
8:
9: unparkSuccessor(h);
10:
11: return true;
12:
13: }
14:
15: return false;
16:
17: }
18:
2.1.1. java.util.concurrent.locks.ReentrantLock.Sync
1: protected final boolean tryRelease(int releases) {
2:
3: int c = getState() - releases;
4:
5: if (Thread.currentThread() != getExclusiveOwnerThread())
6:
7: throw new IllegalMonitorStateException();
8:
9: boolean free = false;
10:
11: if (c == 0) {
12:
13: free = true;
14:
15: setExclusiveOwnerThread(null);
16:
17: }
18:
19: setState(c);
20:
21: return free;
22:
23: }
24:
将state进行变化-releases,检查当前线程是否是拿住锁的线程,否则掷出异常.如果为0,将持有锁线程标记为null。
从ReentrantLock例子可以看出AQS的工作原理,更为精妙的是,在这几个基本机制作用下衍生了许多种并发工具,以后的介绍中可以看到。