算法实现
基本方法是维持一个“槽”(slot),这个槽是保持交换对象的结点的引用,同时也是一个等待填满的“洞”(hole)。如果一个即将到来的“占领”(occupying)线程发现槽为空,然后它就会CAS(compareAndSet)一个结点到这个槽并且等待另外一个线程调用exchange方法。第二个“匹配”(fulfilling)线程发现槽为非空,则CAS它为空,并且通过CAS洞来交换对象,另外如果占领线程被阻塞,则会一并唤醒占领线程。在每个例子里,CAS都可能由于槽一开始为非空但在CAS的时候为空,或者反之等情况而失败,所以线程需要重试这些动作。
在只有少量线程使用Exchanger的时候,这个简单的方法效果不错,但是在比较多线程使用同一个Exchanger的时候,由于CAS在同一个槽上竞争,性能就会急剧下降。因此我们使用一个“区域”(arena);总的来说,就是一个槽数量可以动态变化的哈希表,其中任意一个槽都可以被线程用来交换。到来的线程就可以用基于它们的线程id的哈希值来选择槽。如果到来的线程在选择槽上CAS失败来,它就会选择另外一个槽。类似地,如果一条线程成功CAS进去一个槽,但是没有其它线程到来,它也会尝试另外一个槽,直到第0槽,即使表缩小的时候第0槽也会一直存在。这个特别的机制如下:
等待(Waiting):第0槽特别在于没有竞争的时候它是唯一存在的槽。当单条线程占领了第0槽后,如果没有线程匹配,那么该线程会在短暂的自旋之后阻塞。在其它情况下,占领线程最终会放弃并且尝试另外的槽。在阻塞(如果是第0槽)或者放弃(其它的槽)或者重新开始的时候,等待线程都会自旋片刻(比上下文切换时间稍微短的一段时间)。除非不大可能有其它线程的存在,否则没有理由让线程阻塞。为了避免内存竞争,所以竞争者会在静静地轮询一段比阻塞然后唤醒稍短的时间。由于缺少其它线程,非0槽会等待自旋时间结束,大概每次尝试都会浪费一次额外的上下文切换时间,平均依然比另外的方法(阻塞然后唤醒)快很多。
改变大小(Sizing):通常,使用少量槽能够减少竞争。特别地当在少量线程时,使用太多槽会导致和使用太少槽的一样的糟糕性能,还有会导致空间不足的错误。变量“max”维持实际使用的槽的数量。当一条线程发现太多CAS失败的时候会增加“max”(这个类似于常规的基于一个目标载入因子来改变大小的哈希表,在这里不同的是,增长的速度是加一而不是按比例)。增长需要在每个槽上三次的失败竞争才会发生。需要多次失败才会增长可以处理这样的情况,一些CAS的失败并非由于竞争,可能在两条线程简单的竞争或者在读取和CAS过程中有线程抢先运行。同时,非常短暂的高峰竞争可能会大大高于平均可忍受的程度。当非0槽等待超时没有被p匹配的时候,就会尝试减少最大槽数量(max)限制。线程经历了超时等待会移动到更加接近第0槽,所以即使由于不活跃导致表大小缩减,但最终也会发现存在(或者未来)的线程。这个增长和缩减的选择机制和阀值从本质上讲都会在交换代码里卷入索引和哈希,而且无法很好地抽象出去。
哈希(Hashing):每条线程都会选择与简单的哈希码一直的初始槽来使用。对于任意指定线程,每次相遇的顺序都是相同的,但实际上对于线程是随机的。使用区域会遇到经典的哈希表的成本与质量权衡问题(cost vs quality tradeoffs)。这里,我们使用基于当前线程的Thread.getId()返回值的one-step FNV-1a哈希值,还加上一个低廉的近似模数(mod)操作去选择一个索引。以这样的方式来优化索引选择的缺陷是需要硬编码去使用一个最大为32的最大表大小。但是这个值足以超过已知的平台。
探查(Probing):在侦查到已选的槽的竞争后,我们会按顺序探查整个表,类似与哈希表在冲突中的线性探查。(循环地移动,按照相反的顺序,可以最好地配合表增长和缩减规则——表的增长和缩减都是从尾部开始,头部0槽保持不变)除了为了最小化错报和缓存失效的影响,我们会对第一个选择的槽进行两次探查。
填充(Padding):即使有了竞争管理,槽还是会被严重竞争,所以利用缓存填充(cache-jpadding)去避免糟糕的内存性能。由于这样,槽只有在使用的时候延迟构造,避免浪费不必要的空间。当内存地址不是程序的优先问题的时候,随着时间消逝,垃圾回收器执行压缩,槽非常可能会被移动到互相联结,除非使用了填充,否则会导致大量在多个内核上的高速缓存行无效。
算法实现主要为了优化高竞争条件下的吞吐量,所以增加了较多的特性来避免各种问题,初始看上去较为复杂,因此建议先大致看一下流程,然后再看看源码实现,再反过来看会有更加深刻的理解。
源码实现
Exchanger主要目的是不同线程间交换对象,因此exchange方法是Exchanger唯一的public方法。exchange方法有两个版本,一个是只抛出InterruptedException异常的无超时版本,一个是抛出InterruptedException, TimeoutException的有超时版本。先来看看无超时版本的实现
public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) { Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; Thread.interrupted(); // Clear interrupt status on IE throw } throw new InterruptedException(); }函数首先判断当前线程是否已经被中断,如果是则抛出IE异常,否则调用doExchange函数,调用函数之前,为了防止传入交换对象的参数x为null,因此会当null时会传入NULL_ITEM,一个预定义的作为标识的Object作为参数,另外,根据doExchange返回的对象来判断槽中的对象为null或者当前操作被中断,如果被中断则doExchange返回CANCEL对象,这样exchange就会抛出IE异常。
private static final Object CANCEL = new Object(); private static final Object NULL_ITEM = new Object();我们再来看看doExchange方法的实现。
private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // Create in case occupying int index = hashIndex(); // Index of current slot int fails = 0; // Number of CAS failures for (;;) { Object y; // Contents of current slot Slot slot = arena[index]; if (slot == null) // Lazily initialize slots createSlot(index); // Continue loop to reread else if ((y = slot.get()) != null && // Try to fulfill slot.compareAndSet(y, null)) { Node you = (Node)y; // Transfer item if (you.compareAndSet(null, item)) { LockSupport.unpark(you.waiter); return you.item; } // Else cancelled; continue } else if (y == null && // Try to occupy slot.compareAndSet(null, me)) { if (index == 0) // Blocking wait for slot 0 return timed ? awaitNanos(me, slot, nanos) : await(me, slot); Object v = spinWait(me, slot); // Spin wait for non-0 if (v != CANCEL) return v; me = new Node(item); // Throw away cancelled node int m = max.get(); if (m > (index >>>= 1)) // Decrease index max.compareAndSet(m, m - 1); // Maybe shrink table } else if (++fails > 1) { // Allow 2 fails on 1st slot int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; // Grow on 3rd failed slot else if (--index < 0) index = m; // Circularly traverse } } }函数首先利用当前要交换对象作为参数构造Node变量me,类Node定义如下
private static final class Node extends AtomicReference<Object> { public final Object item; public volatile Thread waiter; public Node(Object item) { this.item = item; } }内部类Node继承于AtomicReference,并且内部拥有两个成员对象item,waiter。假设线程1和线程2需要进行对象交换,类Node把线程1中需要交换的对象作为参数传递给Node构造函数,然后线程2如果在槽中发现此Node,则会利用CAS把当前原子引用从null变为需要交换的item对象,然后返回Node的成员变量item对象,构造Node的线程1调用get()方法发现原子引用非null的时候,就返回此对象。这样线程1和线程2就顺利交换对象。类Node的成员变量waiter一般在线程1如果需要阻塞和唤醒的情况下使用。
我们顺便看看槽Slot以及其相关变量的定义
private static final int CAPACITY = 32; private static final class Slot extends AtomicReference<Object> { // Improve likelihood of isolation on <= 128 byte cache lines. // We used to target 64 byte cache lines, but some x86s (including // i7 under some BIOSes) actually use 128 byte cache lines. long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; } private volatile Slot[] arena = new Slot[CAPACITY]; private final AtomicInteger max = new AtomicInteger();内部类Slot也是继承于AtomicReference,其内部变量一共定义了15个long型成员变量,这15个long成员变量的作用就是缓存填充(cache padding),这样可以避免在大量CAS的时候减轻cache的影响。arena定义为大小为CAPACITY的数组,而max就是arena实际使用的数组大小,一般max会根据情况进行增长或者缩减,这样避免同时对一个槽进行CAS带来的性能下降影响。
我们看回doExchange函数,函数接着调用hashIndex根据线程Id获取对应槽的索引。
private final int hashIndex() { long id = Thread.currentThread().getId(); int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193; int m = max.get(); int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1)) ((0x000001f8 >>> m) & 2) | // The constants hold ((0xffff00f2 >>> m) & 1)); // a lookup table int index; while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m return index; }hashIndex主要根据当前线程的id根据one-step FNV-1a的算出对应的哈希值,并且利用一个快速的模数估算来把哈希值限制在[0, max)之间(max是槽实际使用大小),具体实现涉及各种运算,有兴趣可以自行研究,此处略去。
doExchange函数接着会进入一个循环中,循环内部便是真正的算法逻辑,一共有4个判断,每个判断完之后如果没有返回再需要再次重新判断。首先从arena获取当前选中的Slot,由于hashIndex保证小于max值,因此不会数组越界。我们来看第一个判断,当第一次使用Slot的时候,该Slot为null,因此调用createSlot进行初始化。
private void createSlot(int index) { Slot newSlot = new Slot(); Slot[] a = arena; synchronized (a) { if (a[index] == null) a[index] = newSlot; } }createSlot的实现很简单,只是根据index参数把数组中的对应位置添加引用。但要注意并发问题,因此在给数组赋值的时候还要利用synchronized关键字进行同步。
接着看回doExchange循环。来看看第二个判断,如果选择的slot已经初始化,则调用当前slot.get()方法尝试获取Node节点,如果当前Node节点非null,则表明之前已有线程占领此Slot,则此时继续尝试CAS此slot为null,如果成功,则表示当前线程已经和此前的占领线程进行了匹配,接下来则CAS替换Node的原子引用为交换对象item,然后唤醒Node的占领线程waiter,接着返回Node.item完成了交换。
第三个判断中,如果获取槽中的Node为null,则表明选中的槽没有被占领,于是CAS把当前槽从null变为一开始以交换对象item构造的Node结点me,如果CAS成功,则要按照选择的槽索引分为两种处理,首先对于第0槽,需要进行阻塞等待,由于我们这里是非超时等待,因此调用await函数。
private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int SPINS = (NCPU == 1) ? 0 : 2000; private static Object await(Node node, Slot slot) { Thread w = Thread.currentThread(); int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) // Spin-wait phase --spins; else if (node.waiter == null) // Set up to block next node.waiter = w; else if (w.isInterrupted()) // Abort on interrupt tryCancel(node, slot); else // Block LockSupport.park(node); } }首先看看SPINS变量的定义,SPINS表示的是在阻塞或者等待匹配中超时放弃前需要自旋轮询变量的次数,在当只有单个CPU时为0,否则为2000。SPINS在多核CPU上能够在交换中,如果其中一条线程由于GC或者被抢占等原因暂停时,能够只等待短暂的轮询后即可重新进行交换操作。来看看await的实现,同样在循环里有四个判断:
第一个判断,调用Node的get方法,如果非null,则证明已经有线程成功交换对象又或者因为线程中断被取消了此次等待,因此直接返回对象v;
第二个判断,则get方法返回null,则要进行自旋等待,自旋的值是根据SPINS来决定;
第三个判断,此时自旋已经完结,因此需要进入阻塞状态,阻塞之前,首先把node.waiter赋值为当前线程,这样等后面有线程进行交换的时候可以唤醒此线程;
第四个判断,在最后进入阻塞前,如果发现当前线程已经被中断,则需要调用tryCancel取消此次等待
最后,调用LockSupport.park进入阻塞。
private static boolean tryCancel(Node node, Slot slot) { if (!node.compareAndSet(null, CANCEL)) return false; if (slot.get() == node) // pre-check to minimize contention slot.compareAndSet(node, null); return true; }tryCancel的实现很简单,首先需要CAS把当前结点的原子引用从null变为CANCEL对象,如果CAS失败,则有可能已经有线程顺利与当前结点进行匹配,并且调用CAS进行了交换。否则的话,再调用CAS把node所在的slot修改为null。如果这里CAS成功,则CANCEL对象会被返回到exchange方法里,让exchange方法判断后,抛出InterruptedException异常。
接着我们看回doExchange第三个判断,如果选择的是非0槽,则会调用spinWait进行自旋等待。
private static Object spinWait(Node node, Slot slot) { int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) --spins; else tryCancel(node, slot); } }spinWait的实现与await类似,但稍有不同,主要逻辑是如果经过SPINS次自旋以后,仍然无法被匹配,则会调用tryCancel把当前结点调用tryCancel取消,这样返回doExchange的时候,如果发现当前结点已经被取消,则重新构造一个新结点Node,并且把index的值右移一位(即整除2),另外此处还需要考虑把槽的数量减少,于是判断如果max的值比整除后的index要大,则通过CAS把max值减去一。
doExchange的第四个判断里,如果前三个判断都失败,则表明CAS失败,CAS的失败有可能只是因为两条线程之间的竞争,也有可能大量线程的并发,因此我们先把fails值加一记录此次的失败,然后继续循环前面的判断;如果连续两次都失败,则大量线程并发的可能性较大,此时如果失败次数大于3次,并且max仍然小于FULL(定义max的最大值),则尝试CAS把max增加1,如果成功的话,则把index赋值为m+1,下次选择的槽则为新分配的索引;如果失败次数还不够3次,则把当前索引减去一,循环遍历整个Slot表。
于是doExchange大致逻辑便是如此,exchange的超时版本大体逻辑类似,在调用doExchange传入对应超时参数,这样在第0槽需要等待的时候会调用另外的函数awaitNanos。
private Object awaitNanos(Node node, Slot slot, long nanos) { int spins = TIMED_SPINS; long lastTime = 0; Thread w = null; for (;;) { Object v = node.get(); if (v != null) return v; long now = System.nanoTime(); if (w == null) w = Thread.currentThread(); else nanos -= now - lastTime; lastTime = now; if (nanos > 0) { if (spins > 0) --spins; else if (node.waiter == null) node.waiter = w; else if (w.isInterrupted()) tryCancel(node, slot); else LockSupport.parkNanos(node, nanos); } else if (tryCancel(node, slot) && !w.isInterrupted()) return scanOnTimeout(node); } }awaitNanos大体逻辑基本与await相同,但添加了一些关于超时判断的逻辑。其中最主要的是在超时之后,会尝试调用scanOnTimeout函数。
private Object scanOnTimeout(Node node) { Object y; for (int j = arena.length - 1; j >= 0; --j) { Slot slot = arena[j]; if (slot != null) { while ((y = slot.get()) != null) { if (slot.compareAndSet(y, null)) { Node you = (Node)y; if (you.compareAndSet(null, node.item)) { LockSupport.unpark(you.waiter); return you.item; } } } } } return CANCEL; }scanOnTimeout把整个槽表都扫描一次,如果发现有线程在另外的槽位中,则进行CAS交换。这样就可以减少超时的可能性。注意CAS替换的是node.item,并不是get()方法返回的先前在tryCancel中被CAS掉的原子引用。
总结
Exchanger使用了无锁算法,使用了一个可以在多线程下两组线程相互交换对象引用的同步器。该同步器在激烈竞争的环境下,做了大量的优化,并在对于CAS的内存竞争也采用了padding来避免cache带来的影响。其中的无锁算法以及其优化值得仔细品味和理解。