[源码]解析 SynchronousQueue 上界,下界.. 数据保存和数据传递. 堵塞队列. 有无频繁await?

 简析SynchronousQueue。LinkedBlockingQueue(两个locker,更快),ArrayBlockingQueue(一个locker,读写都竞争)
    三者都是blockingQueue.
    对于blockingQueue的堵塞和非堵塞方法对注记方案:
        * oppo(oppo手机)是一对,offer和poll不堵塞
        * ppt是一对.put和take都堵塞.
解析源码之前先实战看下SynchronousQueue.

public static void main(String[] args) throws InterruptedException {
          final SynchronousQueue<Long> workQueue = new SynchronousQueue<Long>();
        boolean offer = workQueue.offer(2L); // offer不能放入,前面无堵塞线程,本身也不堵塞不会放入到堵塞线程队列中
        System.out.println("main thread: offer=" + offer);
        Long poll = workQueue.poll(); // 不能放poll出元素为null,前面无堵塞线程,本身也不堵塞不会放入到堵塞线程队列中
        System.out.println("main thread: poll=" + poll);
        ExecutorService newCachedThreadPool = Executors.newFixedThreadPool(4); //
内部源代码实现是:new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); 
     
        System.out.println("/**=====================以下是队列是 take类型========*/");
        newCachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out
                            .println("take thread 1: begin take and thread will be blocked by call  park(await) method");
                    Long take = workQueue.take();
                    System.out
                            .println("take thread 1:   take suceffull take object="
                                    + take);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }
        });
        newCachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out
                            .println("take thread 2: begin take and thread will be blocked by call  park(await) method");
                    Long take = workQueue.take();
                    System.out
                            .println("take thread 2:   take suceffull take object="
                                    + take);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }
        });
        Thread.sleep(1000);
         poll = workQueue.poll();
        System.out.println("main thread: 队列是take类型. 同类型操作分两种:1.堵塞take会堵塞,增加队列中 2.非堵塞操作poll失败. Queue接口poll失败,  poll=" + poll);
         offer = workQueue.offer(2L);  // 
        System.out.println("main thread: 队列是take类型. 非同类型无论堵塞不堵塞都成功. 非同类型非堵塞操作Queue接口 offer 成功? " + offer);
        Thread.sleep(2000);
        long object = 123L;
        System.out.println("put thead: begin put " + object);
        try {
            workQueue.put(object);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out
                .println("put thead: 队列是take类型. 非同类型无论堵塞不堵塞都成功.  blockingQueue 堵塞接口 put调用未堵塞,调用成功 SynchronousQueue will unpark(notify) the take thread ");
    
        System.out.println("/**=====================以下是 Put类型========*/");
    
        System.out.println("/*先堵塞两个put*/");
        newCachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                long object = 123L;
                System.out.println("put thead 1: begin put " + object);
                try {
                    workQueue.put(object);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out
                        .println("put thead 1: finish put sucefully , SynchronousQueue will unpark(notify) the take thread ");
            }
        });
        newCachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                long object = 123L;
                System.out.println("put thead 2: begin put " + object);
                try {
                    workQueue.put(object);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out
                        .println("put thead 2: finish put sucefully , SynchronousQueue will unpark(notify) the take thread ");
            }
        });
        Thread.sleep(2000);
        System.out.println("/*试图offer*/");
         offer = workQueue.offer(2L);  // 
            System.out.println("main thread: 队列是put类型.  同类型操作分两种:1.堵塞put会堵塞,增加队列中 2.非堵塞操作offer失败.  非堵塞操作Queue接口 offer 成功? " + offer);
            
              poll = workQueue.poll();
            System.out.println("main thread: 队列是put类型. 非同类型无论堵塞不堵塞都成功.  poll=" + poll);
            
    Long take = workQueue.take();
    System.out
            .println("main thread: 队列是put类型. 非同类型无论堵塞不堵塞都成功. BlockingQueue堵塞接口take调用未堵塞, take suceffull take object="
                    + take);
        newCachedThreadPool.shutdown();

}

    
输出:
main thread: offer=false
main thread: poll=null
/**=====================以下是队列是 take类型========*/
take thread 1: begin take and thread will be blocked by call  park(await) method
take thread 2: begin take and thread will be blocked by call  park(await) method
main thread: 队列是take类型. 同类型操作分两种:1.堵塞take会堵塞,增加队列中 2.非堵塞操作poll失败. Queue接口poll失败,  poll=null
main thread: 队列是take类型. 非同类型无论堵塞不堵塞都成功. 非同类型非堵塞操作Queue接口 offer 成功?

true

take thread 2:   take suceffull take object=2
put thead: begin put 123
take thread 1:   take suceffull take object=123
put thead: 队列是take类型. 非同类型无论堵塞不堵塞都成功.  blockingQueue 堵塞接口 put调用未堵塞,调用成功 SynchronousQueue will unpark(notify) the take thread 
/**=====================以下是 Put类型========*/
/*先堵塞两个put*/
put thead 1: begin put 123
put thead 2: begin put 123
/*试图offer*/
main thread: 队列是put类型.  同类型操作分两种:1.堵塞put会堵塞,增加队列中 2.非堵塞操作offer失败.  非堵塞操作Queue接口 offer 成功? false
main thread: 队列是put类型. 非同类型无论堵塞不堵塞都成功.  poll=123
main thread: 队列是put类型. 非同类型无论堵塞不堵塞都成功. BlockingQueue堵塞接口take调用未堵塞, take suceffull take object=123
put thead 2: finish put sucefully , SynchronousQueue will unpark(notify) the take thread 
put thead 1: finish put sucefully , SynchronousQueue will unpark(notify) the take thread 


      依据上面的代码能够简单总结SynchronousQueue 的queue特点:
1.
queue有三种类型: 空类型,take以及put类型,
       分别说明.
        1.1. 空类型时, take和put都会被堵塞, 非堵塞offer和poll都不会堵塞. 无法成功操作.
        1.2. 队列是某种类型时,  
    同类型操作: 分两种: 1.堵塞method会堵塞,增加堵塞队列中 2.非堵塞method操作失败
.
     非同类型: 无论堵塞不堵塞method都成功.
              详细说来:
                 1.2.1. 队列是take类型时, . 
                    同类型操作:堵塞take操作会堵塞,非堵塞 poll()失败, 
                    非同类型: 异类的put和offer可以成功.
                 1.2.2. 队列是put类型., 
            同类型操作:堵塞put操作会被堵塞,非堵塞offer操作失败. 
            非同类型: 异类的take,poll会成功2.
关于队列长度,界:
   一个线程堵塞队列要么全是take线程,要么全是put线程. 后来的互补的操作将会匹配对头的线程.使退出队列.
     1. 里面没有不论什么数据.调用offer()无法成功,返回flase,表示填充失败.不会被堵塞.
     2. 先take,被堵塞,直到有一个线程来offer. 两个不同的互补碰撞发生匹配完毕(fullfill). 之前的take的线程被唤醒获得offer的提供的数据.
再来解析SynchronousQueue 源码,
SynchronousQueue 源码凝视中关键术语解析:
 Node类型: 一共分层两种. 一种是 isDate=true. (相应offer , put 函数) 一种是isDate=false (相应 take函数)
dual queue:dual的含义就好理解了,由于仅仅有两类,能够当isDate=true和isDate=false遇到时会匹配.可翻译为成双的,对偶的. 对偶队列.
same mode: 同样的模式(isDate都=true,或者isDate都=false).比方take产生的Node和前面已经放入到队列中的take动作Node就属于同一个模式
complementary :互补的.比方先take,放到队列中.后面来一个offer动作就是complementary (互补).反之亦然.
fulfill: 字面英文翻译,完毕.详细到算法里的含义是一个动作和之前的complementary(译为互补)的动作得到匹配.
=============
SynchronousQueue以下的一个部分凝视部分翻译.
/*
   * A dual queue (and similarly stack) is one that at any given
     * time either holds "data" -- items provided by put operations,
     * or "requests" -- slots representing take operations, or is
     * empty. A call to "fulfill" (i.e., a call requesting an item
     * from a queue holding data or vice versa) dequeues a
     * complementary node.  The most interesting feature of these
     * queues is that any operation can figure out which mode the

* queue is in, and act accordingly without needing locks.

不论什么一个操作 put/take都会产生一个节点,抓住数据(假设是take,数据为null). 一个实现"匹配"的调用将会将互补的节点退出队列.
最有趣的是不论什么一个操作都能指出眼下的队列处于何种类型(注:队列里要么全是take线程,要么全是put线程).而且不须要锁.
     
     // 以下的凝视比較了java实现的算法和借鉴的算法(见凝视中网址)有何差别
      * The algorithms here differ from the versions in the above paper
     * in extending them for use in synchronous queues, as well as
     * dealing with cancellation. The main differences include:

     *  1. The original algorithms used bit-marked pointers, but
     *     the ones here use mode bits in nodes, leading to a number
     *     of further adaptations.
     *  2. SynchronousQueues must block threads waiting to become
     *     fulfilled. 必须等待匹配的线程. fulfilled-完毕的意思.
     *  3. Support for cancellation via timeout and interrupts,
     *     including cleaning out cancelled nodes/threads
     *     from lists to avoid garbage retention and memory depletion. //可以取消或者中断
     *
=============再来看看SynchronousQueue.TransferQueue.transfer以下的凝视.=========
/**
  当队列是空,或者是同一种Mode时,直接放入到列队尾.不会完毕(fullfill)
             * 2. If queue apparently contains waiting items, and this
我一開始没理解的一点是:
    当一个head是 isDate=false , tail是isDate=true时, 一个线程进来的操作是isDate=false时.
不会进入①,进入②后看代码又无法和head完毕匹配(fullfill).
后来想明确了,这样的情况不会发生.由于tail是isDate=true,这个会与head完毕匹配(fullfill).换句话说.
队列里tail和head肯定是same mode.所以当①推断失败,进入②后,
 
/**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* Basic algorithm is to loop trying to take either of
             * two actions:
             *
             * 1. If queue apparently empty or holding same-mode nodes,
             *    try to add node to queue of waiters, wait to be
             *    fulfilled (or cancelled) and return matching item.
             *   // same-mode指的是take还是put..
假设空或者是同类型,那么就放入队列堵塞等待
             * 2. If queue apparently contains waiting items, and this
             *    call is of complementary mode, try to fulfill by CAS'ing
             *    item field of waiting node and dequeuing it, and then
             *    returning matching item.
             *   // 假设是不同,刚好是互补(complementary)节点,那么刚好可以匹配(fulfill )
             * In each case, along the way, check for and try to help
             * advance head and tail on behalf of other stalled/slow
             * threads.
             *
             * The loop starts off with a null check guarding against
             * seeing uninitialized head or tail values. This never
             * happens in current SynchronousQueue, but could if
             * callers held non-volatile/final ref to the
             * transferer. The check is here anyway because it places
             * null checks at top of loop, which is usually faster
             * than having them implicitly interspersed.
             */
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);
            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin
                if (h == t || t.isData == isData) { // empty or same-mode 队列里假设都是take线程.此线程还是take调用,进入该分支
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;
                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }
                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null)?

x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill//进入该分支,由于整个队列的节点类型都一样,肯定能和head完毕匹配(fulfill)
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read
                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS  //这一步非常非常重要,成功把被匹配线程的数据节点改成了自己的节点,实现了传输数据
                        advanceHead(h, m);          // dequeue and retry 
                        continue;
                    }
                    advanceHead(h, m);              // successfully fulfilled // 完毕匹配,将被匹配的线程退出原队列
                    LockSupport.unpark(m.waiter);   //
唤醒被匹配的线程
                    return (x != null)?

x : e;

                }
            }

}



0.堵塞有几种?

 1. lock获取不到锁堵塞. 2. 获取到锁可是await堵塞.然后又释放锁(见 <[源代码]Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).>)
1. LinkedBlockingQueue 和 SynchronousQueue是否有界,上界,下界范围是多少? 
2. LinkedBlockingQueue 和 SynchronousQueue 的有几种堵塞线程? 前者一个线程堵塞队列(封装在reentrantLock内,使用者不知),一个条件队列(封装在ConditionObject内,使用者不知). 条件队列signal后把线程会放到堵塞队列里,见wiz<Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).>
. 后者仅仅有一个线程堵塞队列. 其各自的堵塞队列数据结构有何不同?前者无数据,后者有数据. 
2. LinkedBlockingQueue 和 SynchronousQueue怎样在线程之间传递数据?

前者获取锁后放置到数据队列中,然后unpark锁.后者将数据传递给线程节点上的数据引用.使读线程解锁后能读取到.尽管SynchronousQueue没有了数据队列,用每一个线程持有一个数据替代了.

3. LinkedBlockingQueue.先offer,再poll 和 SynchronousQueue 先offer,再poll有何差别?  
4. LinkedBlockingQueue 先put,再take 和 SynchronousQueue 先put,再take有何差别? 前者能够先放在数据队列上.后者没有地方来接受他的数据,必须等待到有一个take线程产生的数据节点来接受数据.



LinkedBlockingQueue,ArrayBlockingQueue:
    有数据队列和堵塞线程队列
    1. 数据队列有最大长度,有界,默认是Integer.Max; 
    2. 数据队列达到上界,下界时,对对应的堵塞方法有影响.
LinkedBlockingQueue 两个locker,更复杂. ArrayBlockingQueue一个locker,两个Condition.
 SynchronousQueue :
  最重要的差别: 
外在: 一个有容量>=1,依赖缓冲区线程之间交换数据. 一个无容量须要线程时时产生数据节点来接受传递数据.
内部:数据读取匹配的方式不同: BlockingQueue是与已存放在队列上的数据配对, SynchronousQueue是与已堵塞的线程配对(一个线程id相应着一个数据,这是 SynchronousQueue特有的特点)
        对于LinkedBlockingQueue。ArrayBlockingQueue,有数据队列,也有线程堵塞队列 .  数据配对即可
        对于SynchronousQueue ,无数据队列,仅仅有线程堵塞队列/stack与已堵塞的线程配对即可.
要理解上面这句话.这几个问题思考下.
1. LinkedBlockingQueue 和 SynchronousQueue是否有界?  前者有上界,下界>=1. 后者上界,下界都是0.(无缓冲层就无法传递数据,将数据巧妙地保存在了由线程调用时产生的节点上(和线程同生共死).)
2. LinkedBlockingQueue 和 SynchronousQueue 的有几种堵塞线程? 其各自的队列数据结构有何不同?
    总结: 前者堵塞队列上无数据,后者堵塞队列上有数据和操作类型.两者在堵塞时都利用堆栈的局部变量来临时保存数据和传递数据.
     前者的利用排它锁,堵塞数据队列上无数据, AbstractQueueSyncronizer( 可能是公平也可能是非公平锁,插入瞬间非公平 ), 堵塞时将数据保存在内存堆栈局部变量上,每次获得锁后将数据传递给数据队列, 
     后者获得匹配后,综合匹配的线程数据,返回非null数据. 并改动匹配线程的数据且唤醒被匹配的堵塞线程.被匹配的堵塞线程依据其堵塞队列上的新数据和原线程内存堆栈上的局部变量数据(重点,难点)综合返回非null数据. 详细见源码凝视.

     第二个问题引出的另外一个话题是因为SynchronousQueue 已经没有了数据队列缓冲区,导致SynchronousQueue 中继承Queue的put方法的语义都量变到质变了.
      BlockingQueue 继承自Queue的put         javaDoc :   Inserts the specified element into this queue, waiting if necessary forspaceto become available.

      SynchronousQueue  继承自Queue的put javaDoc :   Adds the specified element to this queue, waiting if necessary foranother thread
to receive it.
   数据存放不同:
     因为 LinkedBlockingQueue。ArrayBlockingQueue代码实现上通过数据队列转发数据的.
故这两者不能设置queye长度的最大值为0.
     两者通过堆内存传递,notifyAll堵塞线程. 假设数据队列是0,数据就无法传递了.
    SynchronousQueue 无数据队列,那么数据怎样传输呢? 代码实现上其数据是直接通过堆内存直接传递给堵塞线程. 线程1被堵塞将数据同一时候存放在线程堆栈上的局部变量以及和线程id绑定的队列节点(是field属性,状态)上. 线程2来匹配,会改变原堵塞线程的堆内存的值,使得原堵塞线程可以获取两份数据.这两份数据中肯定有份是生产者提供的数据,一份是消费者伪造的假数据,通过标示为推断终于得到生产者得到的数据.
     对于SynchronousQueue   最大值是0,也没有其它线程生成数据节点,put时无法存放数据, 让put一開始就进入了堵塞. 
     对于LinkedBlockingQueue,ArrayBlockingQueue, put仅仅有在数据队列满了才会堵塞.

应用场景:
以Executors.newCachedThreadPool()为例,CachedThreadPool特点:
有任务时可以无限制生成线程,无任务时也可以高速回收线程. 用线程不断生成替代了缓冲队列. 该javadoc上,明白说明了适合大批量的小任务. 不适合一下子大量,一下子又无数据. 不太适合生产者生产速率动荡变化,每一个任务都非常长的场景.
</pre><pre name="code" class="java">public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
        利用了SynchronousQueue.offer和take的调用配合实现了CachedThreadPool();相比blockingQueue长处: Syncronize一个堵塞队列.+
死循环cas(compare and swap),不会频繁的线程挂起和唤醒(park和unpark)
   和new
ThreadPoolExecutor时设置coreSize=0,linkedblockingQueue 容量=1 的差别是后者维护一个size=1的堵塞队列,队列常常在满和空之间切换,须要频繁的线程挂起和唤醒(park和unpark)
 
    
  强烈建议打开eclipse相应的源码,.jdk1.6 里的src.zip 源码包
1 public void execute(Runnable command) {
2 if (command == null)
3 throw new NullPointerException();
4 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
5 if (runState == RUNNING && workQueue.offer(command)) {
6 if (runState != RUNNING || poolSize == 0)
7 ensureQueuedTaskHandled(command);
8 }
9 else if (!addIfUnderMaximumPoolSize(command))
10 reject(command); // is shutdown or saturated
11 }
12 }

   刚開始execute(runable),queue.offer()失败,产生新的线程运行任务.一直不停的offer,产生新的线程.到后面老的线程运行完成会调用take()第一次execute(runable),第5行queue.offer()失败,进入第9行产生max配额的新线程运行任务runnable.后面不停的execute(),一直不停的offer失败,产生max配额的新线程去运行runnable.直到第一次的线程运行任务完成后会调用SynchronousQueue.take().
这样假设再有execute(),第5行就能匹配成功新的offer就能配对成功,runnable实例被老的线程获取运行,不会去新建线程.这个实现了动态的线程池.所以java才说适合大批量的小的异步任务(These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.)
      Executors.固定大小的线程的优点是.线程资源是有限的,每一个线程512k -Xss    
每一个线程的Stack大小,默认堆栈.避免有些无限制增加线程池的问题.没有提供可配置的BlockingQueue容量大小.




上一篇:[源码解析] PyTorch 分布式(2) --- 数据加载之DataLoader


下一篇:20145316许心远《Java程序设计》第5周学习总结