Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理

Java_并发编程培训

java并发程序设计教程

JUC Exchanger

一、概述

Exchanger 可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

二、算法描述

基本想法是维护一个槽指向一个结点,结点包含一个准备提供(出去)的item和一个等待填充的洞。如果一个到来的“occupying”线程看到槽是空的,CAS结点到那里并等待另一个线程发起交换。第二个“fulfilling”线程看到槽是非空的,把它CAS回空,也通过CAS洞交换item,加上唤醒occupying线程,如果它是阻塞的。在每种情况下CAS可能失败,因为一个槽一开始看起来是非空的,但在CAS时不是,或者相反,所以线程需要重试这些动作。

这个简单方法在只有少量线程使用Exchanger时工作得很好,当大量线程使用单个Exchanger时性能快速恶化,由于CAS竞争。因此我们使用一个“arena, 竞技场”,基本上是一个哈希表,有动态可变数量的槽,每一个槽都可被任意线程执行交换。进入线程基于自己的线程ID选择槽,如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往 0 号槽,那里总是存在如果表收缩的话。控制这个的特殊机制如下:

  • Waiting:0 号槽的特殊之处在于当没有竞争时它是唯一存在的槽。一个占据 0 号槽的线程在短暂自旋后如果没有线程满足它 将阻塞。在其他情况下,占据线程最终放弃并尝试其他槽。等待线程在阻塞(如果是0号槽)或放弃(如果是其他槽)并重新开始之前会自旋等待一会儿(周期小于典型的上下文切换时间)。没有理由让线程阻塞,除非不大可能出现其他线程。占据者主要避免内存竞争,所以坐在静静地那里短暂地轮询,然后它会阻塞和解除阻塞。非0号槽等待那消失,因为缺少其他线程,在每次尝试会浪费一个额外的上下文切换,在平均上这仍然比可选的方法快。

  • Sizing:通常,仅使用少量的槽足以减少竞争。特别是少量线程时,使用太多的槽可能导致和使用太少的槽一样的低性能,且那对于错误没有太多空间。变量“max”维持了实际使用的槽的最大数量。它在当一个线程看到很多CAS失败时增加。(这类似于基于目标的负载值resing一个常规的哈希表只不过这里的增长步骤仅仅是一个接一个而不是按比例的。)增长要求每个槽上竞争失败的次数达到3次。要求多次失败是因为一些CAS失败不是因为竞争,而是两个线程之间的竟态race或出现读和CAS的线程优先级。而且,突然的竞争高峰可以比平均可接受的水平高很多。当一个非0号槽的等待由于没有等到满足而流逝elapses时,尝试减小max上限。经历过等待流逝elapsed的线程移往0号槽,所以最终(或将)找到存在存在线程,即使表由于不活跃而缩减。用于增长和缩减而选择的机制和阀值都是固定地与交换代码里的索引indexing和哈希hashing混在一起的,不能很好地抽取出来。

  • Hashing:每个线程选择它的的初始槽来使用,与一个简单的哈希码一致。给定的会交会的线程的序号是一样的,但高效地随机分布在线程间。使用竞技场arenas遭遇了所有哈希表的典型开销 vs 质量之间的平衡。这里,我们使用一步 FNV-1a 哈希码,基于当前线程的ID和一个廉价的大约相当于取模的操作(用于选择下标)。用这种方式优化下标选择的一个负面是哈希码是固定的hardwired,使用了表大小的最高32位。但这个值对于已知的平台和应用的满足的。

  • Probing:在感觉到选择的槽上的竞争时,我们顺序探测表,在表里出现哈希碰撞后类似于顺序探测。(我们周期性地移动,以反序方式,来吻合表的增长和收缩规则。)此外为了减少虚假警报(false-alarms)和缓存颠簸(cache trashing)的影响,我们在移动前两次尝试初次选择的槽。

  • Padding:即使了有了竞争管理,槽的竞争仍然很严重,所以我们使用 缓存填充(cache-padding)来避免低效的内存性能。由于这个,槽只在使用时延迟实例化,来避免不必要的空间浪费。位置的隔离一开始在一个应用程序里不会成为一个议题issue,随着时间推移和GC实行压缩,槽很可能被移到临近的,这可以导致在多核处理器是那个的缓存行颠簸,除非使用了填充。

这是对论文 “A Scalable Elimination-based Exchange Channel[http://hdl.handle.net/1802/2104]” 描述的算法的一个改进版,作者是 William Scherer, Doug Lea, and Michael Scott 。

三、简单的Exchanger

根据算法描述里的一部分,做了个简单的实现。未测试。

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
publicclassSimpleExchanger{
     staticclassSlot{
           finalObjectoffer;
           finalThread waiter;
           volatile Objectother;
 
           publicSlot(Thread waiter,Objectoffer){
               this.offer=offer;
               this.waiter=waiter;
          }
     }
 
     privatevolatile AtomicReference<Slot>refer=newAtomicReference<>();
 
     publicObjectexchange(Objectoffer){
          Slot me=newSlot(Thread.currentThread(),offer);
 
          Slot you;
           for(;;){
               if((you=refer.get())==null){
                    if(refer.compareAndSet(null,me)){
                        LockSupport.park();
                         returnme.other;
                   }
 
              }elseif(refer.compareAndSet(you,null)){
                   you.other=offer;
                   LockSupport.unpark(you.waiter);
                    returnyou.offer;
              }
          }
     }
}
 

四、Exchanger 核心源码

内部类和属性

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<br/>/**
* arena(竞技场)的容量。设置为一个提供了足够空间处理竞争的值。在小机器上,
* 大多数槽都不会使用,但它仍然不是浪费的,因为额外的空间提供了一些机器层面的地址填充,
* 来最小化大量CAS操作的槽的地址的干扰。
* 在每个大机器上,性能最终受限于内存的带宽,不是线程/CPU数量。
*
* 这个常量在没有修改 索引(indexing)和哈希(hashing)算法时不能修改。
*/
privatestaticfinalintCAPACITY=32;
 
/**
* 槽数组。元素在需要时延迟初始化。
* 声明为volatile是为了确保 double -checked延迟构建有效。
*/
privatevolatile Slot[]arena=newSlot[CAPACITY];
 
/**
* `max` 的值将在没有竞争的情况下持有所有线程。当这个值小于 CAPACITY 时,能避免一些膨胀。
*/
privatestaticfinalintFULL=
    Math.max(0,Math.min(CAPACITY,NCPU/2)-1);
 
/**
* 在阻塞或放弃等待填充之前自旋(什么都不做,除了轮询内存位置)的次数。
* 在单处理器上应当是0。在多处理器上,这个值应当足够大以便两个线程
* 尽可能快地交换元素,仅在其中一个失速(stalled,由于GC或或被取代)
* 时阻塞,但也不能太长,以免浪费CPU资源。
*
* 考虑各种差异,这个值比大多数系统上下文切换时间的平均值的一半大一点。
*/
privatestaticfinalintSPINS=(NCPU==1)?0:2000;
 
 
privatestaticfinalintTIMED_SPINS=SPINS/20;
 
 
privatestaticfinalObjectCANCEL=newObject();
 
 
privatestaticfinalObjectNULL_ITEM=newObject();
 
 
/**
* 被使用的槽序号的最大值。这个值在一个线程经历太多CAS竞争时增加,
* 在自旋等待 elapses时减少。变化只能通过compareAndSet来实施,
* 避免由于线程在设置前碰巧拖延stall而导致致数据腐化stale。
*/
privatefinalAtomicInteger max=newAtomicInteger();
 
 
/**
* Node 持有交换数据的一部分。这个类继承自AtomicReference来表示一个洞。
* `get()` 返回洞(里的值),`compareAndSet` CAS 值到洞里。
*/
privatestaticfinalclassNode extendsAtomicReference&lt;Object&gt;{
     // 注意两个属性分别用final和volatile来保证内存可视性
    publicfinalObjectitem;
    publicvolatile Thread waiter;
 
    publicNode(Objectitem){
        this.item=item;
    }
}
 
/**
* Slot是一个具有缓存行填充的AtomicReference。由于填充显著地增加了空间(开销),
* 所有的槽都是按需创建的,这样,在能够提升吞吐量时会有不止一个槽,比使用额外空间更有价值。
*/
privatestaticfinalclassSlot extendsAtomicReference&lt;Object&gt;{
    // Improve likelihood of isolation on &lt;= 64 byte cache lines
    longq0,q1,q2,q3,q4,q5,q6,q7,q8,q9,qa,qb,qc,qd,qe;
}
 

hashIndex 方法

此方法用于定位线程的默认槽。

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
privatefinalinthashIndex(){
    longid=Thread.currentThread().getId();
    inthash=(((int)(id^(id&gt;&gt;&gt;32)))^0x811c9dc5)*0x01000193;
 
    intm=max.get();
    intnbits=(((0xfffffc00  &gt;&gt;m)&amp;4)|// Compute ceil(log2(m+1))
                 ((0x000001f8&gt;&gt;&gt;m)&amp;2)|// The constants hold
                 ((0xffff00f2&gt;&gt;&gt;m)&amp;1));// a lookup table
    intindex;
    while((index=hash&amp;((1&lt;&lt;nbits)-1))&gt;m)       // May retry on
        hash=(hash&gt;&gt;&gt;nbits)|(hash&lt;&lt;(33-nbits));// non-power-2 m
    returnindex;
}
 

spinWait 方法

此方法用于自旋等待,主要是默认槽是非0号槽的线程在没有线程可交换时使用。

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
privatestaticObjectspinWait(Node node,Slot slot){
    intspins=SPINS;
    for(;;){
        Objectv=node.get();
        if(v!=null)
            returnv;
        elseif(spins&gt;0)
            --spins;
        else
            tryCancel(node,slot);
    }
}
 
// 注意,tryCancel并没有处理CAS失败的问题,CAS失败后重试是由调用者方法负责的。
privatestaticbooleantryCancel(Node node,Slot slot){
    if(!node.compareAndSet(null,CANCEL))
        returnfalse;
    if(slot.get()==node)// 提前检查以减少CAS竞争
        slot.compareAndSet(node,null);
    returntrue;
}
 

阻塞等待

当线程在 0 号槽也没有线程可交换时就会先自旋一段时间,如果仍然没有可交换线程则进入等待。

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
privatestaticObjectawait(Node node,Slot slot){
     Threadw=Thread.currentThread();
      intspins=SPINS;
      for(;;){
          Objectv=node.get();
           if(v!=null)
               returnv;
           elseif(spins&gt;0)// 自旋等待阶段
              --spins;
           elseif(node.waiter==null)// 先设置好,下次循环阻塞。
              node.waiter=w;
           elseif(w.isInterrupted())// 中断时终止
              tryCancel(node,slot);
           else
               // Block 阻塞
              LockSupport.park(node);
     }
}
 

doExchange 方法

doExchange 方法是Exchanger类的核心方法,所以exchange方法极其变形最终都是调用这个方法。它的实现完全遵循前面的算法描述。

 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
privateObjectdoExchange(Objectitem,booleantimed,longnanos){
    Node me=newNode(item);                 // 创建一个以防需要 occupying
    intindex=hashIndex();                  // 当前槽的序号
    intfails=0;                            // CAS 失败计数
 
    for(;;){
        Objecty;                             // 当前槽的内容
        Slot slot=arena[index];
        if(slot==null)                     // 延迟初始化槽
            createSlot(index);                // 继续循环来重新读取
 
        elseif((y=slot.get())!=null&amp;&amp;  // 尝试满足,也即已有线程在等待交换
                 slot.compareAndSet(y,null)){
           // 已有线程在等待交换,尝试交换
            Node you=(Node)y;               // 传输元素
            if(you.compareAndSet(null,item)){
               // CAS 设置对象到等待者,考虑与其他线程 竞争,等待者取AtomicReference.value
                LockSupport.unpark(you.waiter);  // 成功交换,唤醒对方。
                returnyou.item;   // 返回对方的对象,完成交换者取等待者Node的item。
            }
            // 否则取消(由于与其他线程竞争交换失败),继续
        }
 
        elseif(y==null&amp;&amp;                 // 尝试占住槽
                 slot.compareAndSet(null,me)){
           // 没有线程在等待交换,当前等待其他线程来交换。
            if(index==0)                   // 0号槽特殊处理:阻塞等待
                returntimed?
                    awaitNanos(me,slot,nanos):
                    await(me,slot);
 
            // 非0号槽自旋等待,方法返回时要么成功交换,要么被取消
            Objectv=spinWait(me,slot);
            if(v!=CANCEL)// 被fulfilled
                returnv;
 
            // 取消等待,也就是等待流逝elapses。
            me=newNode(item);              // 丢弃已取消的结点
            intm=max.get();
            if(m&gt;(index&gt;&gt;&gt;=1))           // 减小序号
                max.compareAndSet(m,m-1);  // 可能缩减表
        }
 
        elseif(++fails&gt;1){               // 每个槽上允许2次失败。
           // CAS 失败处理,达到3次则增大 表,也就是增加CAS的槽。
            intm=max.get();
            if(fails&gt;3&amp;&amp;m&lt;FULL&amp;&amp;max.compareAndSet(m,m+1))
                index=m+1;                // 第三次失败时增加槽
 
            elseif(--index&lt;0)
                index=m;                    // 循环遍历
        }
    }
}
 
 

Java并发编程(您不知道的线程池操作)

这几篇博客,一直在谈线程,设想一下这个场景,如果并发的线程很多,然而每个线程如果执行的时间很多的话,这样的话,就会大量的降低系统的效率。这时候就可以采用线程池的操作,来缓存我们并发操作的线程。

而对于java中的线程池,大家需要理解好ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor这几个类之间的关系即可。

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理

通过看上面的这个关系图,可以知道,最核心的一个类是ThreadPoolExecutor,我们下面来具体的看一下这个类的操作。

ThreadPoolExecutor类提供了四个构造函数,如下所示

<span style="font-family:Comic Sans MS;font-size:18px;"><span style="font-family:Comic Sans MS;"><span style="font-family:Comic Sans MS;font-size:18px;">public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
} public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
} public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
} public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0
|| maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}</span></span></span>

下面来分析下这四个构造函数中每个参数的具体意义,其实主要是一些初始化的操作

corePoolSize:表明这个线程池的大小

     maximumPoolSize:表明这个线程池中,最多可以容纳多少个线程执行

     TimeUnit:线程存活的时间单位,有秒、小时、分钟等

     keepAliveTime:用来表示一个线程多长时间不执行任务后,就被废弃掉

     workQueue:阻塞队列,表示如何线程多余线程池的话,用来存储等待执行的任务,具体值如下


1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

       2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

       3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

 threadFactory:一个线程工厂,用来创建线程

     handler:用来表示拒绝执行任务的策略。

下面通过举个例子来理解上述的表示参数。

假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。 因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10
个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措
施,比如重新招4个临时工人进来;然后就将任务也分配给这4个临时工人做;如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新
的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10
个工人,毕竟请额外的工人是要花钱的。

这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

下面通过一个例子来了解上述的构造函数。

<span style="font-family:Comic Sans MS;font-size:18px;"><span style="font-family:Comic Sans MS;"><span style="font-family:Comic Sans MS;font-size:18px;"><span style="font-family:Comic Sans MS;font-size:18px;">package com.test;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class TestThreadPool { public static void main(String[] args) { //创建一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); //通过循环来开启20个任务操作
for (int i = 1; i <= 20; i++) { threadPool.execute(new ThreadPoolTask()); }
}
} //创建 ThreadPoolTask类: class ThreadPoolTask implements Runnable {
public void run() {
try {
System.out.println("开始执行任务:" + Thread.currentThread().getName());
Thread.sleep(100);
}
catch(Exception e){
e.printStackTrace();
}
} }</span></span></span></span>

上面就是开启20个任务,来让线程池处理,如果线程池满了的话,就会放入到阻塞对列中进行等待操作

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE

    Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
    Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池


     下面是这三个静态方法的具体实现;
<span style="font-family:Comic Sans MS;font-size:18px;"><span style="font-family:Comic Sans MS;"><span style="font-family:Comic Sans MS;font-size:18px;">public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,

                                  new LinkedBlockingQueue<Runnable>());

}

public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService

        (new ThreadPoolExecutor(1, 1,

                                0L, TimeUnit.MILLISECONDS,

                                new LinkedBlockingQueue<Runnable>()));

}

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>());

}</span></span></span>
     从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

 newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

            newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;

  
       
 newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为
Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

     参考资料:
       http://www.cnblogs.com/dolphin0520/p/3932921.html
      http://blog.csdn.net/hejingyuan6/article/details/47058189
 
 
 
 
 
 

最受欢迎的 8 位 Java 大师

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理 oschina 发布于2013年07月22日 收藏 160 评论 67

Github 访问速度慢?不稳定?问题反馈响应不及时?该怎么办!!>>>  Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理

下面是8位Java牛人,他们为Java社区编写框架、产品、工具或撰写书籍改变了Java编程的方式。

P.S 以下排名纯属个人喜好。

1. Tomcat & Ant创始人

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
James Duncan Davidson,当他还是Sun公司 (1997–2001)的一名软件工程师时创立了基于Java的Web服务器Tomcat。直到现在Tomcat 仍然被用于很多Java Web项目。此外他还编写了Ant构建工具,采用XML描述构建过程和依赖管理成为了构建基于Java Web程序的实际标准。

相关链接

  1. James Duncan Davidson Twitter
  2. James Duncan Davidson Wiki
  3. James Duncan Davidson个人博客
  4. Apache Ant
  5. Apache Tomcat

2. 测试驱动开发和JUnit创始人

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
Kent Beck创立了 极限编程和测试驱动软件开发方法。此外,他和Erich Gamma编写了JUnit,一个简单测试框架成为了构建基于Java Web程序测试的实际标准。JUnit和测试驱动开发组合改变了传统的Java编程方式,许多Java者对此并不感冒。

相关链接

  1. Kent Beck Twitter
  2. Kent Beck Wiki
  3. Kent Beck博客
  4. JUnit测试框架
  5. 极限编程Wiki
  6. 测试驱动开发Wiki

新闻和访谈

  1. Kent Beck: “我认为我们是在飞机上编程”
  2. 采访Kent Beck和Martin Fowler
  3. 与Kent Beck谈极限编程

Kent Beck著作

  1. Extreme Programming Explained: Embrace Change (2nd Edition)
  2. Refactoring: Improving the Design of Existing Code
  3. JUnit Pocket Guide

3. Java Collections框架

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
Joshua Bloch领导设计并实现了众多Java平台特性,包括JDK5.0语言改进和广受赞誉的Java Collection框架。2004年6月,他离开了Sun公司成为Google首席Java架构师。接下来他凭借“ Effective Java”一书赢得了著名的Jolt大奖,该书也是受到争议的Java必读书籍。

相关链接

  1. Joshua Bloch Twitter
  2. Joshua Bloch Wiki

新闻和访谈

  1. Effective Java: Joshua Bloch访谈
  2. 超级明星Josh Bloch

Joshua Bloch著作

  1. Effective Java (2nd Edition)
  2. Java Concurrency in Practice

4. JBoss创始人

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
Marc Fleury在2001年创立了JBoss开源Java应用服务器。JBoss是带有争议的基于Java Web应用程序实际标准。接下来,他将JBoss卖给了RedHat并加盟RedHat继续JBoss开发。2007年2月9日,他决定离开RedHat 追求个人爱好,比如教学、研究生物学、音乐以及和家人在一起。

相关链接

  1. Marc Fleury Wiki
  2. Marc Fleury博客
  3. JBoss应用服务器

新闻和访谈

  1. Red Hat会失去JBoss创始人吗?
  2. JBoss创始人Marc Fleury离开Red Hat,现在该怎么办?
  3. JBoss’s Marc Fleury在SOA、ESB和OSS
  4. 复活Marc Fleury

5. Struts创始人

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
Craig Mcclanahan是一个流行的MVC框架Struts的创建者,富有争议的是每个Java开发者都知道如何编写Struts代码。由于在早期获得了巨大的成功,基本上每个较早的Java Web应用程序都采用了Struts实现。

相关链接

  1. Craig Mcclanahan Wiki
  2. Craig Mcclanahan Blog
  3. Apache Struts

新闻和访谈

  1. Craig McClanahan访谈
  2. Struts还是JSF?

6. Spring创始人

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
Rod Johnson是Java开源应用框架Spring的创始人。他是Spring的创建者和SpringSource的CEO。此外,Rod的著作Expert One-on-One J2EE Design and Development (2002)是最具 影响力的J2EE书籍。

相关链接

  1. Rod Johnson Twitter
  2. Rod Johnson博客
  3. SpringSource
  4. Spring框架Wiki

新闻和访谈

  1. VMware.com : VMware to acquire SpringSource
  2. Rod Johnson : VMware to acquire SpringSource
  3. Rod Johnson访谈 – CEO – Interface21
  4. Rod Johnson关于Spring维护策略改变问答
  5. Expert One-on-One J2EE Design and Development:Rod Johnson访谈

Rod Johnson著作

  1. Expert One-on-One J2EE Design and Development (Programmer to Programmer)
  2. Expert One-on-One J2EE Development without EJB

7. Hibernate 创始人

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
Gavin King是一个流行对象/关系持久化Java解决方案Hibernate项目的创始人,同时也是Java EE5应用框架Seam的作者。此外,他还为EJB 3.0和JPA设计作出了重大贡献。

相关链接

  1. Gavin King博客
  2. Hibernate Wiki
  3. Hibernate框架
  4. JBoss seam

新闻和访谈

  1. Tech Chat: Gavin King谈Contexts和依赖注入Weld, Java EE 6
  2. JPT : Gavin King访谈Hibernate
  3. JavaFree : Hibernate创始人Gavin King访谈
  4. Gavin King深度解析Seam

Gavin King著作

  1. Java Persistence with Hibernate
  2. Hibernate in Action (In Action series)

8. Java语言之父

Java并发编程(您不知道的线程池操作), 最受欢迎的 8 位 Java 大师,Java并发包中的同步队列SynchronousQueue实现原理
James Gosling在1994年发明了Java。他完成了Java的最初设计、编译器和虚拟机。由于他的贡献,他被推举为美国国家工程院院士。2010年4月 2日他离开了Sun公司,后者最近被Oracle收购。谈到为什么会离开,Gosling在博客中写道:“更确切地说我的离开比留下更有意义。”

相关链接

  1. James Gosling博客
  2. James Gosling Wiki

新闻和访谈

  1. Dennis Ritchie、Bjarne Stroustrup和James Gosling访谈
  2. James Gosling访谈,“Java之父”
  3. 开发者访谈:James Gosling

原文链接: javatyro 翻译: ImportNew.com唐尤华
译文链接: http://www.importnew.com/5575.html

相关链接

Java_并发编程培训

Java并发包中的同步队列SynchronousQueue实现原理

作者:一粟

介绍

Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。

不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

实现原理

阻塞队列的实现方法有许多:

阻塞算法实现

阻塞算法实现通常在内部采用一个锁来保证多个线程中的put()和take()方法是串行执行的。采用锁的开销是比较大的,还会存在一种情况是线程A持有线程B需要的锁,B必须一直等待A释放锁,即使A可能一段时间内因为B的优先级比较高而得不到时间片运行。所以在高性能的应用中我们常常希望规避锁的使用。

01 public class NativeSynchronousQueue<E> {
02     boolean putting = false;
03     E item = null;
04  
05     public synchronized E take() throws InterruptedException {
06         while (item == null)
07             wait();
08         E e = item;
09         item = null;
10         notifyAll();
11         return e;
12     }
13  
14     public synchronized void put(E e) throws InterruptedException {
15         if (e==null) return;
16         while (putting)
17             wait();
18         putting = true;
19         item = e;
20         notifyAll();
21         while (item!=null)
22             wait();
23         putting = false;
24         notifyAll();
25     }
26 }

信号量实现

经典同步队列实现采用了三个信号量,代码很简单,比较容易理解:

01 public class SemaphoreSynchronousQueue<E> {
02     E item = null;
03     Semaphore sync = new Semaphore(0);
04     Semaphore send = new Semaphore(1);
05     Semaphore recv = new Semaphore(0);
06  
07     public E take() throws InterruptedException {
08         recv.acquire();
09         E x = item;
10         sync.release();
11         send.release();
12         return x;
13     }
14  
15     public void put (E x) throws InterruptedException{
16         send.acquire();
17         item = x;
18         recv.release();
19         sync.acquire();
20     }
21 }

在多核机器上,上面方法的同步代价仍然较高,操作系统调度器需要上千个时间片来阻塞或唤醒线程,而上面的实现即使在生产者put()时已经有一个消费者在等待的情况下,阻塞和唤醒的调用仍然需要。

Java 5实现

01 public class Java5SynchronousQueue<E> {
02     ReentrantLock qlock = new ReentrantLock();
03     Queue waitingProducers = new Queue();
04     Queue waitingConsumers = new Queue();
05  
06     static class Node extends AbstractQueuedSynchronizer {
07         E item;
08         Node next;
09  
10         Node(Object x) { item = x; }
11         void waitForTake() { /* (uses AQS) */ }
12            E waitForPut() { /* (uses AQS) */ }
13     }
14  
15     public E take() {
16         Node node;
17         boolean mustWait;
18         qlock.lock();
19         node = waitingProducers.pop();
20         if(mustWait = (node == null))
21            node = waitingConsumers.push(null);
22          qlock.unlock();
23  
24         if (mustWait)
25            return node.waitForPut();
26         else
27             return node.item;
28     }
29  
30     public void put(E e) {
31          Node node;
32          boolean mustWait;
33          qlock.lock();
34          node = waitingConsumers.pop();
35          if (mustWait = (node == null))
36              node = waitingProducers.push(e);
37          qlock.unlock();
38  
39          if (mustWait)
40              node.waitForTake();
41          else
42             node.item = e;
43     }
44 }

Java 5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。

Java6实现

Java 6的SynchronousQueue的实现采用了一种性能更好的无锁算法 — 扩展的“Dual stack and Dual queue”算法。性能比Java5的实现有较大提升。竞争机制支持公平和非公平两种:非公平竞争模式使用的数据结构是后进先出栈(Lifo Stack);公平竞争模式则使用先进先出队列(Fifo Queue),性能上两者是相当的,一般情况下,Fifo通常可以支持更大的吞吐量,但Lifo可以更大程度的保持线程的本地化。

代码实现里的Dual Queue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:

  1. 持有数据 – put()方法的元素
  2. 持有请求 – take()方法

这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。

其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。

01 /**
02     * Shared internal API for dual stacks and queues.
03     */
04    static abstract class Transferer {
05        /**
06         * Performs a put or take.
07         *
08         * @param e if non-null, the item to be handed to a consumer;
09         *          if null, requests that transfer return an item
10         *          offered by producer.
11         * @param timed if this operation should timeout
12         * @param nanos the timeout, in nanoseconds
13         * @return if non-null, the item provided or received; if null,
14         *         the operation failed due to timeout or interrupt --
15         *         the caller can distinguish which of these occurred
16         *         by checking Thread.interrupted.
17         */
18        abstract Object transfer(Object e, boolean timed, long nanos);
19    }

TransferQueue实现如下(摘自Java 6源代码),入列和出列都基于Spin和CAS方法:

01 /**
02     * Puts or takes an item.
03     */
04    Object transfer(Object e, boolean timed, long nanos) {
05        /* Basic algorithm is to loop trying to take either of
06         * two actions:
07         *
08         * 1. If queue apparently empty or holding same-mode nodes,
09         *    try to add node to queue of waiters, wait to be
10         *    fulfilled (or cancelled) and return matching item.
11         *
12         * 2. If queue apparently contains waiting items, and this
13         *    call is of complementary mode, try to fulfill by CAS'ing
14         *    item field of waiting node and dequeuing it, and then
15         *    returning matching item.
16         *
17         * In each case, along the way, check for and try to help
18         * advance head and tail on behalf of other stalled/slow
19         * threads.
20         *
21         * The loop starts off with a null check guarding against
22         * seeing uninitialized head or tail values. This never
23         * happens in current SynchronousQueue, but could if
24         * callers held non-volatile/final ref to the
25         * transferer. The check is here anyway because it places
26         * null checks at top of loop, which is usually faster
27         * than having them implicitly interspersed.
28         */
29  
30        QNode s = null// constructed/reused as needed
31        boolean isData = (e != null);
32  
33        for (;;) {
34            QNode t = tail;
35            QNode h = head;
36            if (t == null || h == null)         // saw uninitialized value
37                continue;                       // spin
38  
39            if (h == t || t.isData == isData) { // empty or same-mode
40                QNode tn = t.next;
41                if (t != tail)                  // inconsistent read
42                    continue;
43                if (tn != null) {               // lagging tail
44                    advanceTail(t, tn);
45                    continue;
46                }
47                if (timed &amp;&amp; nanos &lt;= 0)        // can't wait
48                    return null;
49                if (s == null)
50                    s = new QNode(e, isData);
51                if (!t.casNext(null, s))        // failed to link in
52                    continue;
53  
54                advanceTail(t, s);              // swing tail and wait
55                Object x = awaitFulfill(s, e, timed, nanos);
56                if (x == s) {                   // wait was cancelled
57                    clean(t, s);
58                    return null;
59                }
60  
61                if (!s.isOffList()) {           // not already unlinked
62                    advanceHead(t, s);          // unlink if head
63                    if (x != null)              // and forget fields
64                        s.item = s;
65                    s.waiter = null;
66                }
67                return (x != null)? x : e;
68  
69            else {                            // complementary-mode
70                QNode m = h.next;               // node to fulfill
71                if (t != tail || m == null || h != head)
72                    continue;                   // inconsistent read
73  
74                Object x = m.item;
75                if (isData == (x != null) ||    // m already fulfilled
76                    x == m ||                   // m cancelled
77                    !m.casItem(x, e)) {         // lost CAS
78                    advanceHead(h, m);          // dequeue and retry
79                    continue;
80                }
81  
82                advanceHead(h, m);              // successfully fulfilled
83                LockSupport.unpark(m.waiter);
84                return (x != null)? x : e;
85            }
86        }
87    }

参考文章

  1. Javadoc of SynchronousQueue
  2. Scalable Synchronous Queues
  3. Nonblocking Concurrent Data Structures with Condition Synchronization
上一篇:quartz 配置


下一篇:【转】windows下 ADT NDK开发环境配置