java.util.concurrent包详细分析--转

原文地址:http://blog.csdn.net/windsunmoon/article/details/36903901

概述

Java.util.concurrent 包含许多线程安全、测试良好、高性能的并发构建块。不客气地说,创建java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作。通过提供一组可靠的、高性能并发构建块,开发人员可以提高并发类的线程安全、可伸缩性、性能、可读性和可靠性。

此包包含locks,concurrent,atomic 三个包。

Atomic:原子数据的构建。

Locks:基本的锁的实现,最重要的AQS框架和lockSupport

Concurrent:构建的一些高级的工具,如线程池,并发队列等。

其中都用到了CAS(compare-and-swap)操作。CAS 是一种低级别的、细粒度的技术,它允许多个线程更新一个内存位置,同时能够检测其他线程的冲突并进行恢复。它是许多高性能并发算法的基础。在 JDK 5.0 之前,Java 语言中用于协调线程之间的访问的惟一原语是同步,同步是更重量级和粗粒度的。公开 CAS 可以开发高度可伸缩的并发 Java 类。这些更改主要由 JDK 库类使用,而不是由开发人员使用。

CAS操作都封装在java 不公开的类库中,sun.misc.Unsafe。此类包含了对原子操作的封装,具体用本地代码实现。本地的C代码直接利用到了硬件上的原子操作。

Atomic原子数据

这个包里面提供了一组原子变量类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。可以对基本数据、数组中的基本数据、对类中的基本数据进行操作。原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。

java.util.concurrent.atomic中的类可以分成4组:

标量类(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference

数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater

复合变量类:AtomicMarkableReference,AtomicStampedReference

标量类

第一组AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference这四种基本类型用来处理布尔,整数,长整数,对象四种数据,其内部实现不是简单的使用synchronized,而是一个更为高效的方式CAS (compare and swap) + volatile和native方法,从而避免了synchronized的高开销,执行效率大为提升。

他们的实现都是依靠 真正的值为volatile 类型,通过Unsafe 包中的原子操作实现。最基础就是CAS,他是一切的基础。如下 。其中offset是 在内存中 value相对于基地址的偏移量。(它的获得也由Unsafe 本地代码获得)。关于加锁的原理见附录。

核心代码如下,其他都是在compareAndSet基础上构建的。

1. private static final Unsafe unsafe = Unsafe.getUnsafe();

2. private volatile int value;

3. public final int get() {

4.         return value;

5. }

6. public final void set(int newValue) {

7.         value = newValue;

8. }

9. public final boolean compareAndSet(int expect, int update) {

10.    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

11.}

void set()和void lazySet():set设置为给定值,直接修改原始值;lazySet延时设置变量值,这个等价于set()方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能延时(尽管可以忽略),所以如果不是想立即读取设置的新值,允许在“后台”修改值,那么此方法就很有用。

getAndSet( )方法,利用compareAndSet循环自旋实现。

原子的将变量设定为新数据,同时返回先前的旧数据。

其本质是get( )操作,然后做set( )操作。尽管这2个操作都是atomic,但是他们合并在一起的时候,就不是atomic。在Java的源程序的级别上,如果不依赖synchronized的机制来完成这个工作,是不可能的。只有依靠native方法才可以。

Java代码  

1.  public final int getAndSet(int newValue) {

2.      for (;;) {

3.          int current = get();

4.          if (compareAndSet(current, newValue))

5.              return current;

6.      }

7.  }

对于 AtomicInteger、AtomicLong还提供了一些特别的方法。贴别是如,

getAndAdd( ):以原子方式将给定值与当前值相加, 相当于线程安全的t=i;i+=delta;return t;操作。

以实现一些加法,减法原子操作。(注意 --i、++i不是原子操作,其中包含有3个操作步骤:第一步,读取i;第二步,加1或减1;第三步:写回内存)

数组类

第二组AtomicIntegerArray,AtomicLongArray还有AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方面也引人注目,这对于普通数组来说是不受支持的。

他们内部并不是像AtomicInteger一样维持一个valatile变量,而是全部由native方法实现,如下
AtomicIntegerArray的实现片断:

Java代码  

1.  private static final Unsafe unsafe = Unsafe.getUnsafe();

2.  private static final int base = unsafe.arrayBaseOffset(int[].class);  //数组基地址

3.  private static final int scale = unsafe.arrayIndexScale(int[].class);  //数组元素占的大小精度

4.  private final int[] array;

5.  public final int get(int i) {

6.          return unsafe.getIntVolatile(array, rawIndex(i));

7.  }

8.  public final void set(int i, int newValue) {

9.          unsafe.putIntVolatile(array, rawIndex(i), newValue);

10. }

11.

12.  private longrawIndex(int i) {//获取具体某个元素的偏移量

13.         if (i <0 || i >= array.length)

14.             thrownew IndexOutOfBoundsException("index " + i);

15.         return base+ (long) i * scale;

16. }

更新器类

第三组AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。API非常简单,但是也是有一些约束:

(1)字段必须是volatile类型的

(2)字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说 调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。

(3)只能是实例变量,不能是类变量,也就是说不能加static关键字。

(4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。

(5)对于AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater 。

复合变量类

防止ABA问题出现而构造的类。如什么是ABA问题呢,当某些流程在处理过程中是顺向的,也就是不允许重复处理的情况下,在某些情况下导致一个数据由A变成B,再中间可能经过0-N个环节后变成了A,此时A不允许再变成B了,因为此时的状态已经发生了改变,他们都是对atomicReference的进一步包装,AtomicMarkableReferenceAtomicStampedReference功能差不多,有点区别的是:它描述更加简单的是与否的关系,通常ABA问题只有两种状态,而AtomicStampedReference是多种状态,那么为什么还要有AtomicMarkableReference呢,因为它在处理是与否上面更加具有可读性。

Lcoks 锁

此包中实现的最基本的锁,阻塞线程的LockSupport。核心是AQS框架(AbstractQueuedSynchronizer),是J U C(Javautil concurrent) 最复杂的一个类。

Lock 和Synchronized

J U C 中的Lock和synchronized具有同样的语义和功能。不同的是,synchronized 锁在退出块时自动释放。而Lock 需要手动释放,且Lock更加灵活。Syschronizd 是 java 语言层面的,是系统关键字;Lock则是java 1.5以来提供的一个类。

Synchronized 具有以下缺陷,它无法中断一个正在等候获得锁的线程;也无法通过投票得到锁,如果不想等下去,也就没法得到锁;同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行。

而Lock(如ReentrantLock )除了与Synchronized 具有相同的语义外,还支持锁投票定时锁等候可中断锁等候(就是说在等待锁的过程中,可以被中断)的一些特性。

Lock. lockInterruptibly ,调用后,或者获得锁,或者被中断后抛出异常。优先响应异常。这点可以用 类似以下代码测试。

Thread a = new Thread(task1, "aa");

Thread b = new Thread(task1, "bb");

a.start();

b.start();

b.interrupt();

LockSupport 和java内置锁

在LockSupport出现之前,如果要block/unblock某个Thread,除了使用Java语言内置的monitor机制之外,只能通过Thread.suspend()和Thread.resume()。然而Thread.suspend()和Thread.resume()基本上不可用,除了可能导致死锁之外,它们还存在一个无法解决的竞争条件:如果在调用Thread.suspend()之前调用了Thread.resume(),那么该Thread.resume()调用没有任何效果。LockSupport最主要的作用,便是通过一个许可(permit)状态,解决了这个问题。LockSupport 只能阻塞当前线程,但是可以唤醒任意线程。

那么LockSupport和Java语言内置的monitor机制有什么区别呢?它们的语义是不同的。LockSupport是针对特定Thread来进行block/unblock操作的;wait()/notify()/notifyAll()是用来操作特定对象的等待集合的。为了防止知识生锈,在这里简单介绍一下Java语言内置的monitor机制(详见:http://whitesock.iteye.com/blog/162344 )。正如每个Object都有一个锁,每个Object也有一个等待集合(wait set),它有wait、notify、notifyAll和Thread.interrupt方法来操作。同时拥有锁和等待集合的实体,通常被成为监视器(monitor)。每个Object的等待集合是由JVM维护的。等待集合一直存放着那些因为调用对象的wait方法而被阻塞的线程。由于等待集合和锁之间的交互机制,只有获得目标对象的同步锁时,才可以调用它的wait、notify和notifyAll方法。这种要求通常无法靠编译来检查,如果条件不能满足,那么在运行的时候调用以上方法就会导致其抛出IllegalMonitorStateException。

wait() 方法被调用后,会执行如下操作:

·        如果当前线程已经被中断,那么该方法立刻退出,然后抛出一个InterruptedException异常。否则线程会被阻塞。

·        JVM把该线程放入目标对象内部且无法访问的等待集合中。

·        目标对象的同步锁被释放,但是这个线程锁拥有的其他锁依然会被这个线程保留着。当线程重新恢复质执行时,它会重新获得目标对象的同步锁。

notify()方法被调用后,会执行如下操作:

·        如果存在的话,JVM会从目标对象内部的等待集合中任意移除一个线程T。如果等待集合中的线程数大于1,那么哪个线程被选中完全是随机的。

·        T必须重新获得目标对象的同步锁,这必然导致它将会被阻塞到调用Thead.notify()的线程释放该同步锁。如果其他线程在T获得此锁之前就获得它,那么T就要一直被阻塞下去。

·        T从执行wait()的那点恢复执行。

notifyAll()方法被调用后的操作和notify()类似,不同的只是等待集合中所有的线程(同时)都要执行那些操作。然而等待集合中的线程必须要在竞争到目标对象的同步锁之后,才能继续执行。

在标准的Sun jdk 中,Locksupport的实现基于Unsafe,都是本地代码,Android的实现不全是本地代码。

一个线程调用park阻塞之后,如果被其他线程调用interrupt(),那么他它会响应中断,解除阻塞,但是不会抛出interruption 异常。这点在构造可中断获取锁的时候用到了。

AbstractQueuedSynchronizer

AQS框架是 J U C包的核心。是构建同步、锁、信号量和自定义锁的基础。也是构建高级工具的基础。

从上图可以看到,锁,信号量的实现内部都有两个内部类,都继承AQS。

由于AQS的构建上采用模板模式(Template mode),即 AQS定义一些框架,而它的实现延迟到子类。如tryAcquire()方法。由于这个模式,我们如果直接看AQS源码会比较抽象。所以从某个具体的实现切入简单易懂。这里选泽ReentrantLock ,它和Synchronized具有同样的语义。

简单说来,AbstractQueuedSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全 部处于阻塞状态,经过调查线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用 sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在Linux中中通过调用pthread_mutex_lock函数把 线程交给系统内核进行阻塞。

ReentrantLock

从ReentrantLock(可重入锁)开始,分析AQS。首先需要知道这个锁和java 内置的同步Synchronized具有同样的语义。如下代码解释重入的意思

Lock lock = new ReentrantLock();

public void test() {

lock.lock();

System.out.print("I am test1");

test(); // 递归调用 ……………………………1 递归调用不会阻塞,因为已经获得了锁,这就是重入的含义

// test2();// 调用test2 ………………………2

lock.unlock();// 这里应该放在finally 块中,这里简单省略,以后一样。

}

public void test2() {

lock.lock();

System.out.println("I am test1");

test2();//

lock.unlock();

}

重入的意思就是,如果已经获得了锁,如果执行期间还需要获得这个锁的话,会直接获得所,不会被阻塞,获得锁的次数加1;每执行一次unlock,持有锁的次数减1,当为0时释放锁。这点,Synchronized 具有同样语义。

查看源码,可以看到ReentrantLock 对Lock接口的实现,把所有的操作都委派给一个叫Sync的类,如下源码:

其中Sync的定义如右图

所以这个Syc类是关键。而Sync 基础AQS。Sync又有两个子类,

final static class NonfairSync extends Sync

final static class FairSync extends Sync

显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁。

先理一下Reentrant.lock()方法的调用过程(默认非公平锁):

这 些讨厌的Template模式导致很难直观的看到整个调用过程,其实通过上面调用过程及AbstractQueuedSynchronizer的注释可以发现,AbstractQueuedSynchronizer中抽象了绝大多数Lock的功能,而只把tryAcquire方法延迟到子类中实现。 tryAcquire方法的语义在于用具体子类判断请求线程是否可以获得锁,无论成功与否AbstractQueuedSynchronizer都将处理后面的流程。

NonfairSync 和 FairSync 不同的是执行lock时做的操作,如下为 NonfairSync 的操作,其中compareAndSetState(intexpect, int des) 为AQS的方法,设置同步状态,NonfairSync 通过修改同步状态获得锁,锁定不成功才执行acquire(1),此方法也在AQS中定义。而 FairSync.lock 直接执行acquire(1)。

final void lock() {

if (compareAndSetState(0, 1))

setExclusiveOwnerThread(Thread.currentThread());

else

acquire(1);

}

AQS中的Acquire(int)方法调用子类中的tryAcquire(int)实现,这里正是模板模式。如下面的源码。自此已经进入到了AQS的实现。

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

其他方法的调用顺序类似,如unlock 调用AQS的release ,release 调用Sync的tryRelease()。

下面看NonfairSync.tryAcquire,它调用Sync.nonfairTryAcquire。以下为实现,首先获取同步状态c,o代表锁没有线程正在竞争锁。如果c=0,那么尝试用CAS操作获得锁;或者c!=0,但是锁被当前线程拥有,那么获得锁的次数增加 acquires 次,这就是重入的概念。以上两种情况都成功获得锁,返回真。如果不是以上两种情况,就没有获得锁,返回假。

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

如果没有获得锁,即NonfairSync.tryAcqiuer()返回假,那么可以看出 AQS.acquire 将执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg);将此线程追加到等待队列的队尾。其中Node是AQS的一个内部类,他是等待队列节点的抽象。

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

其中mode指的是模式,NULL 为独占,否则为共享锁。RetranLock为独占锁。首先把线程包装为一个节点。然后获取等待队列的尾,如果不为NULL的话(这说明有其他线程在待队列中行),就把初始化node的前驱为pred.( node.prev = pred) 然后通过CAS操作把node 设置为新的队尾,如果成功则设置pred的后继为 node.至此 快速进队完成。

但是如果pred为null(此时没有线程在等待,一开始tail 就是null) ,或者CAS设置队尾失败。则需要执行下面的入队流程。 这里可能是整个阻塞队列的初始化过程。Tail 为null

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) { // Must initialize

Node h = new Node(); // Dummy header

h.next = node;

node.prev = h;

if (compareAndSetHead(h)) {

tail = node;

return h;

}

}

else {

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

}

该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是通过CAS把当前现在追加到队尾,并返回包装后的Node实例。

把线程要包装为Node对象的主要原因,除了用Node构造供虚拟队列外,还用Node包装了各种线程状态,这些状态被精心设计为一些数字值:

SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)

CANCELLED(1):因为超时或中断,该线程已经被取消

CONDITION(-2):表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞。

PROPAGATE(-3):传播共享锁

0:0代表无状态

接下来执行acquireQueued(Node)方法。acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回。

final boolean acquireQueued(final Node node, int arg) {

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

catch (RuntimeException ex) {

cancelAcquire(node);

throw ex;

}

}

以上的循环不会无限进行,因为接下来线程会被阻塞。这由parkAndCheckInterrupt()方法实现,但是它只有在shouldParkAfterFailedAcquire 方法返回 true 的时候后才会继续执行进而阻塞。所以看 shouldParkAfterFailedAcquire方法,从方法的名字看 意思是,当获取锁失败的时候是否应该阻塞。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;

if (ws == Node.SIGNAL)

/*

* This node has already set status asking a release

* to signal it, so it can safely park

*/

return true;

if (ws > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

while (pred.waitStatus > 0);

pred.next = node;

else {

/*

* waitStatus must be 0 or PROPAGATE. Indicate that we

* need a signal, but don't park yet. Caller will need to

* retry to make sure it cannot acquire before parking.

*/

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

}

return false;

}

此方法的作用是根据它的前驱节点决定本节点做什么样的操作。前面已经说过Node的节点的waitState 表示它个后继节点 需要做什么操作。这里就是对线程状态的检查,所有这个方法参数中有前驱节点。

检查原则在于:

规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞

规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞

规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同

总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。

至此,获取锁完毕。

请求锁不成功的线程会被挂起在acquireQueued方法的第12行,12行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第13行,即设置interrupted = true,之后又进入无限循环。

解锁的过程相对简单一些。

调用关系如下顺序 ReentrantLock.unlock()    AQS.release()  --Synx.tryRealse()

从无限循环的代码可以看出,并不是得到解锁的线程一定能获得锁,必须在第6行中调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。此可以看到,把tryAcquire方法延迟到子类中实现的做法非常精妙并具有极强的可扩展性,令人叹为观止!当然精妙的不是这个Templae设计模式,而是Doug Lea对锁结构的精心布局。

public void unlock() {

sync.release(1);

}

release的语义在于:如果可以释放锁,则唤醒队列第一个线程(Head.next)。release先调用tryRelease调用是否解锁成功,解锁成长才进行下一步操作。

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

tryRelease与tryAcquire语义相同,把如何释放的逻辑延迟到子类中。tryRelease语义很明确:如果线程多次锁定,则进行多次释放,直至status==0则真正释放锁,所谓释放锁即设置status为0,因为无竞争所以没有使用CAS。如下源代码

protected final boolean tryRelease(int releases) {

int c = getState() - releases;

if (Thread.currentThread() != getExclusiveOwnerThread())

throw new IllegalMonitorStateException();

boolean free = false;

if (c == 0) {

free = true;

setExclusiveOwnerThread(null);

}

setState(c);

return free;

}

下面的源代码是唤醒队列的第一个线程。但是其可能被取消,当被取消的时候,从队尾往前找线程。(不从对头开始的原因是,队尾一直在变化,不容易判断)

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling. It is OK if this

* fails or if status is changed by waiting thread.

*/

int ws = node.waitStatus;

if (ws < 0)

compareAndSetWaitStatus(node, ws, 0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node.  But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

Node s = node.next;

if (s == null || s.waitStatus > 0) {

s = null;

for (Node t = tail; t != null && t != node; t = t.prev)

if (t.waitStatus <= 0)

s = t;

}

if (s != null)

LockSupport.unpark(s.thread);

}

可中断锁的实现:本质是调用 AQS. 他在响应中断后直接跳出循环,抛出异常,而正常额Lock 忽略这个中断,只是简单的记录下,然后继续循环。

private void doAcquireInterruptibly(int arg)

throws InterruptedException {

final Node node = addWaiter(Node.EXCLUSIVE);

try {

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

return;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

break;

}

catch (RuntimeException ex) {

cancelAcquire(node);

throw ex;

}

// Arrive here only if interrupted

cancelAcquire(node);

throw new InterruptedException();

}

超时锁的实现基本类似,就是阻塞一段时间后自己恢复,如果有中断则抛出异常。

private boolean doAcquireNanos(int arg, long nanosTimeout)

throws InterruptedException {

long lastTime = System.nanoTime();

final Node node = addWaiter(Node.EXCLUSIVE);

try {

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

return true;

}

if (nanosTimeout <= 0) {

cancelAcquire(node);

return false;

}

if (nanosTimeout > spinForTimeoutThreshold &&

shouldParkAfterFailedAcquire(p, node))

LockSupport.parkNanos(this, nanosTimeout);

long now = System.nanoTime();

nanosTimeout -= now - lastTime;

lastTime = now;

if (Thread.interrupted())

break;

}

catch (RuntimeException ex) {

cancelAcquire(node);

throw ex;

}

// Arrive here only if interrupted

cancelAcquire(node);

throw new InterruptedException();

}

Condition

Condition 实现了与java内容monitor 类似的功能。提供 await,signal,signalall 等操作,与object . wait等一系列操作对应。不同的是一个condition 可以有多个条件队列。这点内置monitor 是做不到的。另外还支持 超时、取消等更加灵活的方式。

和内置的Monitor一样,调用 Condition。aWait 等操作,需要获得锁,也就是 Condition 是和一个锁绑定在一起的。它的实现 是在AQS中,基本思想如下:一下内容抄自博客:http://www.nbtarena.com/Html/soft/201308/2429.html

public final void await() throws InterruptedException {

// 1.如果当前线程被中断,则抛出中断异常

if (Thread.interrupted())

throw newInterruptedException();

// 2.将节点加入到Condition队列中去,这里如果lastWaiter是cancel状态,那么会把它踢出Condition队列。

Node node = addConditionWaiter();

// 3.调用tryRelease,释放当前线程的锁

long savedState =fullyRelease(node);

int interruptMode = 0;

// 4.为什么会有在AQS的等待队列的判断?

// 解答:signal*作会将Node从Condition队列中拿出并且放入到等待队列中去,在不在AQS等待队列就看signal是否执行了

// 如果不在AQS等待队列中,就park当前线程,如果在,就退出循环,这个时候如果被中断,那么就退出循环

while (!isOnSyncQueue(node)) {

LockSupport.park(this);

if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)

break;

}

// 5.这个时候线程已经被signal()或者signalAll()*作给唤醒了,退出了4中的while循环

// 自旋等待尝试再次获取锁,调用acquireQueued方法

if (acquireQueued(node,savedState) && interruptMode != THROW_IE)

interruptMode = REINTERRUPT;

if (node.nextWaiter != null)

unlinkCancelledWaiters();

if (interruptMode != 0)

reportInterruptAfterWait(interruptMode);

}

整个await的过程如下:

  1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。进行2。

  2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。

  3.自旋(while)挂起,直到被唤醒或者超时或者CACELLED等。进行4。

  4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。

可以看到,这个await的*作过程和Object.wait()方法是一样,只不过await()采用了Condition队列的方式实现了Object.wait()的功能。

signal和signalAll方法

await*()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await*()中FIFO队列中第一个Node唤醒(或者全部Node)唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。

Java Code

public final void signal() {

if (!isHeldExclusively())

throw newIllegalMonitorStateException();

Node first = firstWaiter;

if (first != null)

doSignal(first);

}

这里先判断当前线程是否持有锁,如果没有持有,则抛出异常,然后判断整个condition队列是否为空,不为空则调用doSignal方法来唤醒线程,看看doSignal方法都干了一些什么:

Java Code

private void doSignal(Node first) {

do {

if ( (firstWaiter =first.nextWaiter) == null)

lastWaiter = null;

first.nextWaiter = null;

} while(!transferForSignal(first) &&

(first = firstWaiter)!= null);

}

上面的代*很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。

Java Code

final boolean transferForSignal(Node node) {

/*

* 设置node的waitStatus:Condition->0

*/

if(!compareAndSetWaitStatus(node, Node.CONDITION, 0))

return false;

/*

* 加入到AQS的等待队列,让节点继续获取锁

* 设置前置节点状态为SIGNAL

*/

Node p = enq(node);

int c = p.waitStatus;

if (c > 0 ||!compareAndSetWaitStatus(p, c, Node.SIGNAL))

LockSupport.unpark(node.thread);

return true;

}

上面就是唤醒一个await*()线程的过程,根据前面的介绍,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)*作,将当前节点加入到AQS队列。

signalAll和signal方法类似,主要的不同在于它不是调用doSignal方法,而是调用doSignalAll方法:

Java Code

private void doSignalAll(Node first) {

lastWaiter = firstWaiter  = null;

do {

Node next =first.nextWaiter;

first.nextWaiter = null;

transferForSignal(first);

first = next;

} while (first != null);

}

这个方法就相当于把Condition队列中的所有Node全部取出插入到等待队列中去。

线程池

JUC 中提供了线程池的实现,其基于一系列的抽象和接口。接下里一步一步解开线程池的神秘面纱。

首先应该了解线程池的使用。J U C 提供了一个 构造线程池的 工厂类。java.util.concurrent.Executors 。此工厂提供了构造各种不同类型线程池的静态方法。如固定线程池,单一工作线程池,和缓存线程池等。

如下代码构造了一个具有2个固定工作线程的线程池。

ExecutorService ser = Executors.newFixedThreadPool(2);

经过跟踪,此构造函数最终调用如下,其参数解释如下:

corePoolSize - 池中所保存的线程数,包括空闲线程。

maximumPoolSize - 池中允许的最大线程数。

keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

unit - keepAliveTime 参数的时间单位。

workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。

threadFactory - 执行程序创建新线程时使用的工厂。

handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

我们构造的线程池的类型是 ExecutorService,ThreadPoolExecutor继承AbstractExecutorService,其总体类图如下,可以看到最初的抽象是Exector。

接口Executor

该接口只有一个方法,JDK解释如下

执行已提交的Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。

不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:

class DirectExecutor implements Executor {

public void execute(Runnable r) {

r.run();

}

}

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {

public void execute(Runnable r) {

new Thread(r).start();

}

}

方法介绍如下:

void execute(Runnable command)

在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由Executor实现决定。

接口ExecutorService

ExecutorService 是对 Executor 的扩展,JDK文档解释如下:

Executor 提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。

可以关闭ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow()方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。

通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。

此接口中的关键是三个submit 方法,接受一个任务,并返回结果Future。

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

其中Callable 就是带返回结果的Runnable。定义如下:

public interface Callable<V> {

/**

* Computes a result, or throws an exception if unable to do so.

*

@return computed result

@throws Exception if unable to compute a result

*/

V call() throws Exception;

}

精彩的是返回一个表示任务的未决结果的 Future。该 Future 的get 方法在成功完成时将会返回该任务的结果。注意这些过程是异步的。

接口Future

JDK解释如下:

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。

它的方法简介如下:

 boolean

cancel(boolean mayInterruptIfRunning) 
          试图取消对此任务的执行。

 V

get() 
          如有必要,等待计算完成,然后获取其结果。

 V

get(long timeout, TimeUnit unit) 
          如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。

 boolean

isCancelled() 
          如果在任务正常完成前将其取消,则返回 true

 boolean

isDone() 
          如果任务已完成,则返回 true

Submit后发生的事情

有了以上的一些基本了解,接下来看当任务提交之后发生的一系列过程。

Submit 的实际代码位于AbstractExecutorService,继承ExecutorService。来观察其三个submit方法。

构造RunnableFuture

public Future<?> submit(Runnable task) {

if (task == nullthrow new NullPointerException();

RunnableFuture<Object> ftask = newTaskFor(task, null);

execute(ftask);

return ftask;

}

public <T> Future<T> submit(Runnable task, T result) {

if (task == nullthrow new NullPointerException();

RunnableFuture<T> ftask = newTaskFor(task, result);

execute(ftask);

return ftask;

}

public <T> Future<T> submit(Callable<T> task) {

if (task == nullthrow new NullPointerException();

RunnableFuture<T> ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

可以看出,不论submit 方法的参数是什么,都是先构造一个RunnableFuture ,然偶执行它,并返回它。执行和返回的都是RunnableFuture。所以RunnableFuture实现了future 接口和runnnable接口。注意这点的类型是RunnableFuture,所有接下来的execute方法执行的run方法是RunnableFuture 的具体实现类FutureTask的run方法。

来看RunnableFuture,其代码如下:

/**

* A {@link Future} that is {@link Runnable}. Successful execution of

* the <tt>run</tt> method causes completion of the <tt>Future</tt>

* and allows access to its results.

@see FutureTask

@see Executor

@since 1.6

@author Doug Lea

@param<V> The result type returned by this Future's <tt>get</tt> method

*/

public interface RunnableFuture<V> extends Runnable, Future<V> {

/**

* Sets this Future to the result of its computation

* unless it has been cancelled.

*/

void run();

}

作为 Runnable 的 Future。成功执行 run 方法可以完成 Future 并允许访问其结果。以下代码可以看出 返回的实际上是FutureTask,为RunnableFuture的实现类。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {

return new FutureTask<T>(runnable, value);}

关于 FutureTask  JDK对其介绍如下:

可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。

可使用FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。

除了作为一个独立的类外,此类还提供了 protected 功能,这在创建自定义任务类时可能很有用。

先看其构造函数。可以看出其构造函数主要是一个 同步器的构造。同步器接受一个Callable类型的参数。

public FutureTask(Callable<V> callable) {

if (callable == null)

throw new NullPointerException();

sync = new Sync(callable);

}

public FutureTask(Runnable runnable, V result) {

sync = new Sync(Executors.callable(runnable, result));

}

对于参数是Runnable 类型时,经过转化为Callable 类型,转化代码如下,本质上就是在Callable 的call方法中调用Runnable的run方法:

public static <T> Callable<T> callable(Runnable task, T result) {

if (task == null)

throw new NullPointerException();

return new RunnableAdapter<T>(task, result);

}

static final class RunnableAdapter<T> implements Callable<T> {

final Runnable task;

final T result;

RunnableAdapter(Runnable  task, T result) {

this.task = task;

this.result = result;

}

public T call() {

task.run();

return result;

}

}

FutureTask的关键逻辑都由他的一个内部类Sync 实现。我们先暂且不管其具体实现,留在后面说。

执行

接下来看 执行任务。Execute 方法实现在ThreadPoolExecutor 类中,这是具体的线程池。其Execute 方法如下:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))//这里是再给一次机会

reject(command); // is shutdown or saturated

}

}

具体的逻辑如下描述:

首先判断空;

如果当前池大小 小于 核心池大小(初始就是这样),那么会执行 addIfUnderCorePoolSize这个方法。这个方法就会创建新的工作线程,且把当前任务 command 设置为他的第一个任务,并开始执行,返回true。整个execute方法结束。(1)否则加入到等待队列中。(2)

执行情况1

先看情况1:如下代码,只有当前池大小小于核心池大小的时候,且线程池处于RUNNING状态的时候才增加新的工作线程,并把传进来的任务作为第一个任务并开始执行。此时返回真,否则返回假。

/**

* Creates and starts a new thread running firstTask as its first

* task, only if fewer than corePoolSize threads are running

* and the pool is not shut down.

@param firstTask the task the new thread should run first (or

* null if none)

@return true if successful

*/

private boolean addIfUnderCorePoolSize(Runnable firstTask) {

Thread t = null;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < corePoolSize && runState == RUNNING)

t = addThread(firstTask);

finally {

mainLock.unlock();

}

if (t == null)

return false;

t.start();

return true;

}

/**

* Creates and returns a new thread running firstTask as its first

* task. Call only while holding mainLock.

*

@param firstTask the task the new thread should run first (or

* null if none)

@return the new thread, or null if threadFactory fails to create thread

*/

private Thread addThread(Runnable firstTask) {

Worker w = new Worker(firstTask);//工作线程,

Thread t = threadFactory.newThread(w);//封装成线程

if (t != null) {

w.thread = t;

workers.add(w);

int nt = ++poolSize;

if (nt > largestPoolSize)

largestPoolSize = nt;

}

return t;

}

执行情况2

如果当前池大小 大于核心池的大小,或者添加新的工作线程失败(这可能是多线程环境下,竞争锁,被阻塞,其他线程已经创建好了工作线程)。那么当前任务进入到等待队列。

如果队列满,或者线程池已经关闭,那么拒绝该任务。

工作线程worker

对工作线程的封装是类Worker,它实现了Runnable接口。addThread方法把Worker 组成线程(用threadFactory),并加入线程池,重新设置线程池 大小的 达到的最大值。

重点研究下worker的run方法,首先运行第一个任务,以后通过getTask()获取新的任务,如果得不到,工作线程会自动结束,在结束前 会执行一些工作,见后面。

public void run() {

try {

Runnable task = firstTask;

firstTask = null;

while (task != null || (task = getTask()) != null) {

runTask(task);

task = null;

}

finally {

workerDone(this);

}

}

执行提交的任务,执行任务前 后可以 各进行 一些处理,目前默认实现是什么也不做,扩展的类可以实现它。

private void runTask(Runnable task) {

final ReentrantLock runLock = this.runLock;

runLock.lock();

try {

/*

* Ensure that unless pool is stopping, this thread

* does not have its interrupt set. This requires a

* double-check of state in case the interrupt was

* cleared concurrently with a shutdownNow -- if so,

* the interrupt is re-enabled.

*/

if (runState < STOP &&

Thread.interrupted() &&

runState >= STOP)

thread.interrupt();

/*

* Track execution state to ensure that afterExecute

* is called only if task completed or threw

* exception. Otherwise, the caught runtime exception

* will have been thrown by afterExecute itself, in

* which case we don't want to call it again.

*/

boolean ran = false;

beforeExecute(thread, task);

try {

task.run();

ran = true;

afterExecute(task, null);

++completedTasks;

catch (RuntimeException ex) {

if (!ran)

afterExecute(task, ex);

throw ex;

}

finally {

runLock.unlock();

}

}

下面的方法是 工作线程销毁钱调用的方法,是在run中调用的。当池大小为0的时候,调用tryterminate 方法。

/*

*Performs bookkeeping for an exiting worker thread.

@param w the worker 此方法在ThreadPoolExecutor 中

*/

void workerDone(Worker w) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

completedTaskCount += w.completedTasks;

workers.remove(w);

if (--poolSize == 0)

tryTerminate();

finally {

mainLock.unlock();

}

}

这个方法只有在线程池的状态是是stop 或者shutdown的时候才会真正的关闭整个线程池。另外shutdown也会调用这个方法。

/**

* Transitions to TERMINATED state if either (SHUTDOWN and pool

* and queue empty) or (STOP and pool empty), otherwise unless

* stopped, ensuring that there is at least one live thread to

* handle queued tasks.

*

* This method is called from the three places in which

* termination can occur: in workerDone on exit of the last thread

* after pool has been shut down, or directly within calls to

* shutdown or shutdownNow, if there are no live threads.

*/

private void tryTerminate() {

if (poolSize == 0) {

int state = runState;

if (state < STOP && !workQueue.isEmpty()) {

state = RUNNING; // disable termination check below

Thread t = addThread(null);

if (t != null)

t.start();

}

if (state == STOP || state == SHUTDOWN) {

runState = TERMINATED;

termination.signalAll();

terminated();

}

}

}

FutureTask

此类是RunnableFuture的实现类。线程池执行的run方法是它的run方法。它委托给Sync实现,SYNC 继承AQS。

/**

* Sets this Future to the result of its computation

* unless it has been cancelled.

*/

public void run() {

sync.innerRun();

}

重点看Sync。对具体任务的调用发生在innerSet(callable.call());这句调用,innerSet的方法 作用是 设置get方法的返回值。

void innerRun() {

if (!compareAndSetState(0, RUNNING))

return;

try {

runner = Thread.currentThread();

if (getState() == RUNNING) // recheck after setting thread

innerSet(callable.call());

else

releaseShared(0); // cancel

catch (Throwable ex) {

innerSetException(ex);

}

}

//设置后才释放锁。

void innerSet(V v) {

for (;;) {

int s = getState();

if (s == RAN)

return;

if (s == CANCELLED) {

// aggressively release to set runner to null,

// in case we are racing with a cancel request

// that will try to interrupt runner

releaseShared(0);

return;

}

if (compareAndSetState(s, RAN)) {

result = v;

releaseShared(0);

done();

return;

}

}

}

而get方法是需要获取锁的,所以在具体的任务没有执行完前,调用get方法会进入到阻塞状态。

V innerGet() throws InterruptedException, ExecutionException {

acquireSharedInterruptibly(0);

if (getState() == CANCELLED)

throw new CancellationException();

if (exception != null)

throw new ExecutionException(exception);

return result;

}

参考

http://www.cnblogs.com/sarafill/archive/2011/05/18/2049461.html框架介绍,比较广泛

http://chenzehe.iteye.com/blog/1759884原子类

http://blog.sina.com.cn/s/blog_75f0b54d0100r7af.html锁的操作系统原理

http://www.infoq.com/cn/articles/atomic-operation此人是淘宝大神,原子操作的实现

http://agapple.iteye.com/blog/970055   java线程阻塞中断和LockSupport的常见问题

http://blog.csdn.net/hintcnuie/article/details/11022049                      对比synchronized与java.util.concurrent.locks.Lock的异同

http://www.blogjava.net/xylz/archive/2010/07/06/325390.htmlAQS

http://www.open-open.com/lib/view/open1352431606912.html  比较清晰的解释AQS的实现。  这点给我的启示是,看源代码的时候,如果碰到 抽象类,那么跟它的实现类 结合一起看,搞清楚调用关系(这里肯定是模板模式,调用关系单单看抽象类看不明白)

http://whitesock.iteye.com/blog/162344java内置的锁机制

http://whitesock.iteye.com/blog/1336409  Inside AbstractQueuedSynchronizer 系列,写的非常精彩

http://www.nbtarena.com/Html/soft/201308/2429.html  有关condition 的讲解。

上一篇:AJAX-快速上手(四个步骤)


下一篇:配置VS2008下的Qt开发环境有感