j.u.c系列(06)---之锁条件:Condition

  

写在前面

  在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait()、notify()系列方法可以实现等待/通知模式。在Java SE5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。

Condition简介

  Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()

Condtion的实现

  获取一个Condition必须要通过Lock的newCondition()方法。该方法定义在接口Lock下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。Condition为一个接口,其下仅有一个实现类ConditionObject,由于Condition的操作需要获取相关的锁,而AQS则是同步锁的实现基础,所以ConditionObject则定义为AQS的内部类。定义如下:

public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {}
}

等待队列

  每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。我们看Condition的定义就明白了:

public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L; //头节点
private transient Node firstWaiter;
//尾节点
private transient Node lastWaiter; public ConditionObject() {
} /** 省略方法 **/
}

  从上面代码可以看出Condition拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用await()方法,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。结构如下:

j.u.c系列(06)---之锁条件:Condition

  Node里面包含了当前线程的引用。Node定义与AQS的CLH同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node静态内部类)。

  Condition的队列结构比CLH同步队列的结构简单些,新增过程较为简单只需要将原尾节点的nextWaiter指向新增节点,然后更新lastWaiter即可。

等待(await)

  调用Condition的await()方法会使当前线程进入等待状态,同时会加入到Condition等待队列同时释放锁。当从await()方法返回时,当前线程一定是获取了Condition相关连的锁。

    public final void await() throws InterruptedException {
// 当前线程中断
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
* 直到检测到此节点在同步队列上
*/
while (!isOnSyncQueue(node)) {
//线程挂起
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

  此段代码的逻辑是:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在CLH同步队列中(收到signal信号之后就会在AQS队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。

  加入条件队列(addConditionWaiter())源码如下:

    private Node addConditionWaiter() {
Node t = lastWaiter; //尾节点
//Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
if (t != null && t.waitStatus != Node.CONDITION) {
//清除条件队列中所有状态不为Condition的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//当前线程新建节点,状态CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/**
* 将该节点加入到条件队列中最后一个位置
*/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

  该方法主要是将当前线程加入到Condition条件队列中。当然在加入到尾节点之前会清楚所有状态不为Condition的节点。

  fullyRelease(Node node),负责释放该线程持有的锁。

    final long fullyRelease(Node node) {
boolean failed = true;
try {
//节点状态--其实就是持有锁的数量
long savedState = getState();
//释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

  isOnSyncQueue(Node node):如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true

    final boolean isOnSyncQueue(Node node) {
//状态为Condition,获取前驱节点为null,返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//后继节点不为null,肯定在CLH同步队列中
if (node.next != null)
return true; return findNodeFromTail(node);
}

  unlinkCancelledWaiters():负责将条件队列中状态不为Condition的节点删除

        private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

通知(signal)

  调用Condition的signal()方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。

    public final void signal() {
//检测当前线程是否为拥有锁的独
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒
}

  该方法首先会判断当前线程是否已经获得了锁,这是前置条件。然后唤醒条件队列中的头节点。

  doSignal(Node first):唤醒头节点

    private void doSignal(Node first) {
do {
//修改头结点,完成旧头结点的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

  doSignal(Node first)主要是做两件事:1.修改头节点,2.调用transferForSignal(Node first) 方法将节点移动到CLH同步队列中。transferForSignal(Node first)源码如下:

     final boolean transferForSignal(Node node) {
//将该节点从状态CONDITION改变为初始状态0,
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false; //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
Node p = enq(node);
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

整个通知的流程如下:

  1. 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
  2. 如果线程已经获取了锁,则将唤醒条件队列的首节点
  3. 唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中
  4. 最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。

总结

  一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

synchronized原理

  synchronized原理在java中,每一个对象有且仅有一个同步锁。这也意味着,同步锁是依赖于对象而存在。
当我们调用某对象的synchronized方法时,就获取了该对象的同步锁。例如,synchronized(obj)就获取了“obj这个对象”的同步锁。
不同线程对同步锁的访问是互斥的。也就是说,某时间点,对象的同步锁只能被一个线程获取到!通过同步锁,我们就能在多线程中,实现对“对象/方法”的互斥访问。 例如,现在有两个线程A和线程B,它们都会访问“对象obj的同步锁”。假设,在某一时刻,线程A获取到“obj的同步锁”并在执行一些操作;而此时,线程B也企图获取“obj的同步锁” —— 线程B会获取失败,它必须等待,直到线程A释放了“该对象的同步锁”之后线程B才能获取到“obj的同步锁”从而才可以运行。

synchronized基本规则

  synchronized基本规则我们将synchronized的基本规则总结为下面3条,并通过实例对它们进行说明。
  第一条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程对“该对象”的该“synchronized方法”或者“synchronized代码块”的访问将被阻塞。
  第二条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程仍然可以访问“该对象”的非同步代码块。
  第三条: 当一个线程访问“某对象”的“synchronized方法”或者“synchronized代码块”时,其他线程对“该对象”的其他的“synchronized方法”或者“synchronized代码块”的访问将被阻塞。

实例锁 -- 锁在某一个实例对象上。如果该类是单例,那么该锁也具有全局锁的概念。
               实例锁对应的就是synchronized关键字。

全局锁 -- 该锁针对的是类,无论实例多少个对象,那么线程都共享该锁。
               全局锁对应的就是static synchronized(或者是锁在该类的class或者classloader对象上)。

Condtion的实现

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[5];
int putptr, takeptr, count; public void put(Object x) throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
while (count == items.length)
notFull.await();
// 将x添加到缓冲中
items[putptr] = x;
// 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
if (++putptr == items.length) putptr = 0;
// 将“缓冲”数量+1
++count;
// 唤醒take线程,因为take线程通过notEmpty.await()等待
notEmpty.signal(); // 打印写入的数据
System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
} finally {
lock.unlock(); // 释放锁
}
} public Object take() throws InterruptedException {
lock.lock(); //获取锁
try {
// 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
while (count == 0)
notEmpty.await();
// 将x从缓冲中取出
Object x = items[takeptr];
// 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
if (++takeptr == items.length) takeptr = 0;
// 将“缓冲”数量-1
--count;
// 唤醒put线程,因为put线程通过notFull.await()等待
notFull.signal(); // 打印取出的数据
System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
return x;
} finally {
lock.unlock(); // 释放锁
}
}
} public class ConditionTest2 {
private static BoundedBuffer bb = new BoundedBuffer(); public static void main(String[] args) {
// 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
// 启动10个“读线程”,从BoundedBuffer中不断的读数据。
for (int i=0; i<10; i++) {
new PutThread("p"+i, i).start();
new TakeThread("t"+i).start();
}
} static class PutThread extends Thread {
private int num;
public PutThread(String name, int num) {
super(name);
this.num = num;
}
public void run() {
try {
Thread.sleep(1); // 线程休眠1ms
bb.put(num); // 向BoundedBuffer中写入数据
} catch (InterruptedException e) {
}
}
} static class TakeThread extends Thread {
public TakeThread(String name) {
super(name);
}
public void run() {
try {
Thread.sleep(10); // 线程休眠1ms
Integer num = (Integer)bb.take(); // 从BoundedBuffer中取出数据
} catch (InterruptedException e) {
}
}
}
}
p1 put
p4 put
p5 put
p0 put
p2 put
t0 take
p3 put
t1 take
p6 put
t2 take
p7 put
t3 take
p8 put
t4 take
p9 put
t5 take
t6 take
t7 take
t8 take
t9 take

(01) BoundedBuffer 是容量为5的缓冲,缓冲中存储的是Object对象,支持多线程的读/写缓冲。多个线程操作“一个BoundedBuffer对象”时,它们通过互斥锁lock对缓冲区items进行互斥访问;而且同一个BoundedBuffer对象下的全部线程共用“notFull”和“notEmpty”这两个Condition。
       notFull用于控制写缓冲,notEmpty用于控制读缓冲。当缓冲已满的时候,调用put的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count+1,最后,调用notEmpty.signal()缓冲notEmpty上的等待线程(调用notEmpty.await的线程)。 简言之,notFull控制“缓冲区的写入”,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程。
       同理,notEmpty控制“缓冲区的读取”,当读取了缓冲区数据之后会唤醒notFull上的等待线程。
(02) 在ConditionTest2的main函数中,启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);同时,也启动10个“读线程”,从BoundedBuffer中不断的读数据。

上一篇:读 Zepto 源码系列


下一篇:Android布局样式