SynchronousQueue源码解析(非公平模式)

上篇文章,介绍了 SynchronousQueue 的公平模式(源码分析)。

这篇文章,从源码入手,解析 非公平模式

如果你对SynchronousQueue不熟悉,可以先看我的这篇文章(图解SynchronousQueue)。

一、初始化


	SynchronousQueue<Integer> queue = new SynchronousQueue<>();

    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
    

也就是,不传参数,或传一个false时,就会实现非公平模式。

内部类 TransferStack 没有带参构造器,看下它的几个属性


        static final int REQUEST    = 0; // 取元素标识
        
        static final int DATA       = 1; // 放元素标识
     
        static final int FULFILLING = 2; // 已匹配标识
        
         // TransferStack的内部类SNode 的几个属性
         
            volatile SNode next;        //  栈中下一个元素
            volatile SNode match;       // 与其匹配的元素
            volatile Thread waiter;     // 对应的线程
            Object item;                // 元素的值
            int mode;

二、调用 put() 后阻塞

用 图解 SynchronousQueue 那篇文章中的例子。 t1, t2, t3 先后往队列中放 10, 20, 30

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

也就是说 put(e) 方法,会调用 transferer.transfer(e, false, 0) 。

顺便说下,取元素与放元素都会调用这个 transfer 方法,只是参数不同而已。

SynchronousQueue源码解析(非公平模式)

 	E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;
            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

t1、 t2、t3 三个线程,调用 transferer.transfer(e, false, 0),

第三行代码 int mode = (e == null) ? REQUEST : DATA; ,所以 mode 都是 1。

第六行代码 if (h == null || h.mode == mode) {}这个条件,都是返回 true。

所以 for () {} 这个循环中,t1, t2, t3 会走同一分支的代码,精简如下。

 E transfer(E e, boolean timed, long nanos) {    
            SNode s = null; 
            int mode = (e == null) ? REQUEST : DATA; // t1, t2, t3 对应都是1
            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) { // t1 - head ==null,  t2和t3,符合 h.mode == mode
                    if (timed && nanos <= 0) {      // timed 是 false
                       // ...
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos); // 阻塞在这个方法里
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    // ...
                } else {                            // help a fulfiller
                   // ...
                }
            }
        }

整个代码不是很难理解,

casHead(h, s = snode(s, e, h, mode))这句,是先生成 SNode,然后入栈。

如果入栈失败,则会重头开始,再次执行(for() 这是无限循环)

调用 awaitFulfill(s, false, 0)时,会阻塞,即 t1, t2, t3 这三个线程,都阻塞在这儿了。


        SNode awaitFulfill(SNode s, boolean timed, long nanos) {         
            final long deadline = timed ? System.nanoTime() + nanos : 0L; // timed是false
            Thread w = Thread.currentThread(); // 当前线程
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();
                SNode m = s.match; // 此次调用,s.match都是null
                if (m != null)
                    return m;
                if (timed) { // 不会进
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                if (spins > 0) // spins > 0 ,从头再执行直至spins 减小到 0
                    spins = shouldSpin(s) ? (spins-1) : 0;
                else if (s.waiter == null)
                    s.waiter = w; // s 节点标属于哪个线程
                else if (!timed) 
                    LockSupport.park(this); // 阻塞
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

这个方法,现在调用最终都会执行 LockSupport.park(this); 即阻塞。

之后 还会再说这方法,那是有取元素的线程,修改了s.match,并唤醒线程,此方法可以跳出。

好的,放元素的三个线程全部阻塞,等待取元素线程工作。结合上面的那张图,整理好思路。

三、调用 take() 取元素


    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

t4 线程取元素,调用 transfer(null, false, 0) 方法,和之前讲的,只是一个参数不同而已。

第三行代码 int mode = (e == null) ? REQUEST : DATA; ,所以 mode 是 0。

h == null || h.mode == mode,这个判断返回false,看下一个判断

else if (!isFulfilling(h.mode)) 这个判断返回true


static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
// 即 1 & 2, 位运算结果是0

所以,此次调用 transfer(null, false, 0) 方法,会进入第二个分支,精简如下


        E transfer(E e, boolean timed, long nanos) {
            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;
            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  
                   // ...
                } else if (!isFulfilling(h.mode)) { 
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m 是 s 的匹配结点
                            if (m == null) {        // 此次不会进,忽略这段
                                casHead(s, null);   
                                s = null;           
                                break;              
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) { // 本例中,m是t3线程那个节点,s是本次新生成的节点。
                                casHead(s, mn);     // head 移位,即 s和m都出栈
                                return (E) ((mode == REQUEST) ? m.item : s.item); // 返回
                            } else                  // 出现了并发,退出人头再来
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                           
					// ...
                }
            }
        }

casHead(h, s=snode(s, e, h, FULFILLING|mode)) 这行代码,是生成新SNode入栈。效果如图。
SynchronousQueue源码解析(非公平模式)

看下for () {} 是匹配出栈的过程。

m.tryMatch(s) m是t3线程那个节点,s是本次新生节点。


            boolean tryMatch(SNode s) {
            // 将 t3线程那个节点的match,从null,改为 s 
                if (match == null &&
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                    Thread w = waiter;
                    if (w != null) {    // 只唤醒一次
                        waiter = null;
                        LockSupport.unpark(w); // 唤醒t3线程
                    }
                    return true;
                }
                return match == s;
            }

SynchronousQueue源码解析(非公平模式)
最终 t4 线程,把 t3线程放的元素 30 给取走了,t4 返回了。

同时,t4 线程,将 t3线程的match属性,由null设置为s节点,并唤醒 t3 线程,最终 t3 也返回了。

也就是 取元素的线程,放元素的线程,相互作用,同时成功。

四、put() 方法被唤醒

前面说了,t1 线程的那个节点,match属性,由null设置为s节点

往上翻看下 awaitFulfill 方法,被唤醒的 t1 线程,从头执行,

 // awaitFulfill 方法由此退出
            SNode m = s.match;
            if (m != null)
                return m;
            SNode m = awaitFulfill(s, timed, nanos);
            if (m == s) {               // 此次不相等
                clean(s);
                return null;
            }
            if ((h = head) != null && h.next == s)
                casHead(h, s.next);     // help s's fulfiller
            return (E) ((mode == REQUEST) ? m.item : s.item); // 返回s.item,即返回 10, 

最终返回s.item,即返回 10,不等于null,外层put() 方法结束。

至此,非公平模式的源码解析完毕

由于 put() 方法和 take()方法调用的是同一个 transfer() 方法,代码理解起来就不容易。

多看几遍,多画图,大概的流程是可以弄明白的。

另外,本篇示例是先 put(),后 take(),来说明。其实也可以先take(),后 put() 来说明,

有兴趣的话,可以这么试试!

相关文章

ArrayBlockingQueue 源码解析

LinkedBlockingQueue 源码解析

PriorityBlockingQueue 源码解析

上一篇:java.net.SocketException:不是多播地址


下一篇:左神算法书籍《程序员代码面试指南》——2_01在单链表和双链表中删除倒数第k个字节