【从入门到放弃-Java】并发编程-JUC-SynchronousQueue

前言

上文【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue,我们介绍了基于链表的有界阻塞队列LinkedBlockingQueue,它是Executors.newFixedThreadPool中workQueue使用的阻塞队列。
本文我们来学习ExecutorService.newCachedThreadPool中使用的阻塞队列:SynchronousQueue。

SynchronousQueue

【从入门到放弃-Java】并发编程-JUC-SynchronousQueue
如图和LinkedBlockingQueue一样,都是继承了AbstractQueue类,实现了BlockingQueue和Serializable接口

SynchronousQueue

/**
 * Creates a {@code SynchronousQueue} with nonfair access policy.
 */
public SynchronousQueue() {
    this(false);
}

/**
 * Creates a {@code SynchronousQueue} with the specified fairness policy.
 *
 * @param fair if true, waiting threads contend in FIFO order for
 *        access; otherwise the order is unspecified.
 */
public SynchronousQueue(boolean fair) {
    //公平模式下使用队列,实现先进先出,非公平模式下使用栈,先进后出
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

因为SynchronousQueue的put、offer、take、poll方法全是调用了Transferer的transfer方法,我们一起来看下这个transfer到底是何方神圣。

Transferer

/**
 * Shared internal API for dual stacks and queues.
 */
abstract static class Transferer<E> {
    /**
     * Performs a put or take.
     *
     * @param e if non-null, the item to be handed to a consumer;
     *          if null, requests that transfer return an item
     *          offered by producer.
     * @param timed if this operation should timeout
     * @param nanos the timeout, in nanoseconds
     * @return if non-null, the item provided or received; if null,
     *         the operation failed due to timeout or interrupt --
     *         the caller can distinguish which of these occurred
     *         by checking Thread.interrupted.
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

Transferer是一个抽象类,只有一个抽象方法transfer。可以从注释中看到:

  • e是元素根据e是否为null来控制是生产者还是消费者。
  • timed是布尔值,控制是否使用超时机制。
  • nanos是超时时间。

transfer的具体实现有两个,在Transferer的两个实现类:TransferQueue和TransferStack中

TransferQueue::transfer

E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
    //判断是消费者还是生产者,如果e为null则消费者,e不为null是生产者
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        //tail和head是队列的尾部和头部,是一个item为空的QNode,如果队列被其它线程改动了,则continue重新处理
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        //如果队列为空,或者处于same-mode模式
        if (h == t || t.isData == isData) { // empty or same-mode
            //如果不是最后一个节点,则继续寻找最后一个有数据的节点
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            //如果已经tn不为null,则尝试通过CAS把tn置为尾结点,然后重新执行
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            //如果超时,直接返回null
            if (timed && nanos <= 0L)       // can't wait
                return null;
            //如果新节点还未创建,则创建一个新的QNode来承载元素e
            if (s == null)
                s = new QNode(e, isData);
            //尝试通过CAS将t的下一个节点设置为s,如果设置失败,则说明t的下一个节点已经被添加了元素,则需要从头开始处理
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //尝试将tail设置为新建的节点s
            advanceTail(t, s);              // swing tail and wait
            //加入队列后阻塞,把等待线程设置为当前线程,等待唤醒处理
            Object x = awaitFulfill(s, e, timed, nanos);
            //如果超时中断则删除这个节点并返回null
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            //如果不是tail节点,则判断是否是head节点,
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                //如果返回的x不为null,则设置item为自身
                if (x != null)              // and forget fields
                    s.item = s;
                //把等待线程设置为null
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            //x为null说明已经被消费
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                //通过cas将首节点设置为e(null)
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            //唤醒节点设置的线程
            LockSupport.unpark(m.waiter);
            //返回获取到的item
            return (x != null) ? (E)x : e;
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列尾部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

TransferStack::transfer

E transfer(E e, boolean timed, long nanos) {
    /*
     * Basic algorithm is to loop trying one of three actions:
     *
     * 1. If apparently empty or already containing nodes of same
     *    mode, try to push node on stack and wait for a match,
     *    returning it, or null if cancelled.
     *
     * 2. If apparently containing node of complementary mode,
     *    try to push a fulfilling node on to stack, match
     *    with corresponding waiting node, pop both from
     *    stack, and return matched item. The matching or
     *    unlinking might not actually be necessary because of
     *    other threads performing action 3:
     *
     * 3. If top of stack already holds another fulfilling node,
     *    help it out by doing its match and/or pop
     *    operations, and then continue. The code for helping
     *    is essentially the same as for fulfilling, except
     *    that it doesn't return the item.
     */

    //如果e是null,则是REQUEST模式,不为null则是DATA模式
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        //如果是队列不为空
        if (h == null || h.mode == mode) {  // empty or same-mode
            //如果超时
            if (timed && nanos <= 0L) {     // can't wait
                if (h != null && h.isCancelled())
                    //cas方式移除头部节点
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            //从头部插入数据
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                //等待节点被内的元素被处理完毕或等待超时
                SNode m = awaitFulfill(s, timed, nanos);
                //如果是中断,则清除节点s并返回null
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                //从头部取出数据并移除节点
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        //如果节点h没有被处理完
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    //尝试唤醒节点中保存的线程
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                //尝试唤醒节点中保存的线程
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}
  • 先判断队列是否为空,或者尾结点与当前节点模式相同,则将节点加入队列头部
  • 等待线程被唤醒(put被take唤醒,take被put唤醒)处理
  • 如果队列不为空,或者尾结点与当前节点模式不相同,则唤醒头部节点,取出数据,并把头部节点移除

总结

SynchronousQueue是一个无空间的队列即不可以通过peek来获取数据或者contain判断数据是否在队列中。
当队列为空时,队列执行take或put操作都会调用transfer,使线程进入阻塞,等待一个与tail节点模式互补(即put等take、take等put)的请求。
如果新请求与队列tail节点的模式相同,则将请求加入队列,模式不同,则可进行消费从队列中移除节点。
TransferStack:非公平的栈模式,先进后出(头进头出)
TransferQueue:公平的队列模式,先进先出(尾进头出)

我的理解:

  • SynchronousQueue不存储数据,只存储请求
  • 当生产或消费请求到达时,如果队列中没有互补的请求,则将会此请求加入队列中,线程进入阻塞 等待互补的请求到达。
  • 若是互补的请求到达时,则唤醒队列中的线程,消费请求使用生产请求中的数据内容。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

上一篇:【从入门到放弃-Java】码出高效-计算机基础-二进制和浮点数


下一篇:【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock