深入学习Lock锁(5)——Condition接口应用与分析

1.对比Object对象的监视器方法

    任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、 wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以 实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等 待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

深入学习Lock锁(5)——Condition接口应用与分析

2.Condition接口API与使用

    Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创 建出来的,换句话说,Condition是依赖Lock对象的。Condition的使用方式比较简单,需要注意在调用方法前获取锁。

public class AwaitAndSignal {
	static boolean flag = true;
	static Lock lock = new ReentrantLock();
	static Condition con=lock.newCondition();
	public static void main(String[] args) throws Exception {
		Thread AwaitThread = new Thread(new Await(), "AwaitThread");
		AwaitThread.start();
		TimeUnit.SECONDS.sleep(1);
		Thread SignalThread = new Thread(new Signal(), "SignalThread");
		SignalThread.start();
	}

	static class Await implements Runnable {
		public void run() {
			try{
				lock.lock();
				while(flag&&!Thread.interrupted()){
					try {
						System.out.println(Thread.currentThread() + " flag is true. wait"
								+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
						con.await();
						
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				System.out.println(Thread.currentThread() + " flag is false. running"
						+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
			}finally {
				lock.unlock();
			}
		}
	}

	static class Signal implements Runnable {
		public void run() {
			try{
				lock.lock();
				System.out.println(Thread.currentThread() + " hold lock. notify"
						+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
				con.signalAll();
				flag = false;
			}finally {
				lock.unlock();
			}
			// 再次加锁
			try{
				lock.lock();
				System.out.println(Thread.currentThread() + " hold lock again. sleep"
						+ new SimpleDateFormat("HH:mm:ss").format(new Date()));
			}finally {
				lock.unlock();
			}
		}
	}
}

    一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

3.实现分析

    ConditionObject是同步器AbstractQueuedSynchronizer的内部类,是Condition接口的实现类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

1.等待队列

    等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node。

    一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点 (lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列。Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter 指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用 await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

    在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列。

2.等待

    调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释 放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。

    如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();//将当前线程加入等待队列
            int savedState = fullyRelease(node);//释放线程持有的锁
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

    调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。 当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException。

3.通知

    调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

    调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了 isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

    通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法 返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。 成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。

    而Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效 果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。

上一篇:RocketMQ与MYSQL事务消息整合


下一篇:多线程并发工具类