一、简介
Exchanger类允许在两个线程之间定义同步点,当两个线程都到达同步点时,它们交换数据。也就是第一个线程的数据进入到第二个线程中,第二线程的数据进入到第一个线程中。
Exchanger可以用于校对工作的场景。
Exchanger只有一个构造函数:
public Exchanger() { participant = new Participant(); }
这个类提供对外的接口非常简洁,两个重载的范型exchange方法:
// 除非当前线程被中断,否则一直等待另一个线程到达这个交换点,然后将交换的数据 x传输给它,并收到另一个线程传过来的数据。 public V exchange(V x) throws InterruptedException // 和上一个方法功能基本一样,只不过这个方法增加了等待超时时间 public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
1.1、Exchanger源码详解
Exchanger算法的核心是通过一个可交换数据的slot,以及一个可以带有数据item的参与者。源码中的描述如下:
for (;;) { if (slot is empty) { // offer place item in a Node; if (can CAS slot from empty to node) { wait for release; return matching item in node; } } else if (can CAS slot from node to empty) { // release get the item in node; set matching item in node; release waiting thread; } // else retry on CAS failure }
Exchanger中定义了如下几个重要的成员变量:
/** * Per-thread state */ private final Participant participant; /** * Elimination array; null until enabled (within slotExchange). * Element accesses use emulation of volatile gets and CAS. */ private volatile Node[] arena; /** * Slot used until contention detected. */ private volatile Node slot;
participant的作用是为每个线程关联一个Node对象。Participant继承自ThreadLocal:
/** The corresponding thread local class */ static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } }
Node类定义如下:
@sun.misc.Contended static final class Node { int index; // Arena index int bound; // Last recorded value of Exchanger.bound int collides; // Number of CAS failures at current bound int hash; // Pseudo-random for spins Object item; // This thread's current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null }
index:arena的下标,多个槽位的时候利用;
bound:上一次记录的Exchanger.bound;
collides:在当前bound下CAS失败的次数;
hash:伪随机数,用于自旋;
item:这个线程的当前项,也就是需要交换的数据;
match:交换的数据;
parked:挂起时设置线程值,其他情况下为null;
看exchange(V x)方法。
exchange(V x)方法
如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。该方法源码如下:
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // translate null args if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || // disambiguates null return (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v; }
exchange(V x)方法主要步骤如下:
1. 判断程序要交换的数据是否为空指针,若为空指针,则将NULL_ITEM设置为要交换的数据,NULL_ITEM是一个来代替空指针的对象,它定义为:
private static final Object NULL_ITEM = new Object();
2. 若arena为null,则通过slotExchange(Object item, boolean timed, long ns)方法来交换数据;否则,若arena不为null,则运行下一步骤。
3. 判断程序中断状态,若程序没有被中断,则运行arenaExchange(Object item, boolean timed, long ns)方法来交换数据;否则,抛出InterruptedException异常。
4. 返回交换后的数据,若数据为NULL_ITEM,则将其转换为空指针null。
在整个过程中,最主要的就是那两个数据交换方法,我们先来看一看slotExchange(Object item, boolean timed, long ns)方法。
slotExchange(Object item, boolean timed, long ns)方法
该方法源码如下:
private final Object slotExchange(Object item, boolean timed, long ns) { // 获取与线程相关联的Node对象 Node p = participant.get(); // 获取当前线程对象 Thread t = Thread.currentThread(); // 判断线程中断状态 if (t.isInterrupted()) // preserve interrupt status so caller can recheck return null; // 进入自旋 for (Node q;;) { // 如果slot不为null,表明已经有其他线程等待交换数据 if ((q = slot) != null) { // 通过CAS交换数据信息,成功则返回交换数据 if (U.compareAndSwapObject(this, SLOT, q, null)) { // 获取其他线程交换的数据 Object v = q.item; // 槽位内值被改为参数item,这是等待线程需要的数据 q.match = item; // 获取等待线程 Thread w = q.parked; // 等待线程不为null,则将其唤醒 if (w != null) U.unpark(w); // 返回拿到的数据 return v; } // CAS操作失败,则创建arena用于竞争 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; } // 如果arena不为null,方法返回null,随后进入arenaExchange方法 else if (arena != null) return null; // caller must reroute to arenaExchange // 否则,q(slot)为空,通过CAS尝试将slot设置为p,失败之后自旋重试,成功则跳出自旋,进入spin+block模式 else { p.item = item; // 将slot设置为占据该slot线程所对应的Node if (U.compareAndSwapObject(this, SLOT, null, p)) break; p.item = null; } } // 等待release // 若exchange操作有时间限制,则先计算结束时间和自旋次数,进入自旋+阻塞 int h = p.hash; long end = timed ? System.nanoTime() + ns : 0L; int spins = (NCPU > 1) ? SPINS : 1; Object v; // 直到成功交换到数据 while ((v = p.match) == null) { // 自旋 if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); } // 其它线程来交换数据了,另一个线程修改了solt,但是还没有设置match数据,这时可以再稍等一会 else if (slot != p) spins = SPINS; // 需要阻塞当前线程,等待其它线程来交换数据 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // 设置Node节点中的parked属性为当前线程,当其他线程要交换数据时,需要通过parked属性来唤醒该线程 p.parked = t; // 阻塞当前线程 if (slot == p) U.park(false, ns); // 当前线程被唤醒之后,做一些清除操作 p.parked = null; U.putObject(t, BLOCKER, null); } // 交换失败,重置slot else if (U.compareAndSwapObject(this, SLOT, p, null)) { v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 清除match信息 U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; // 返回交换得到的数据,若失败,则返回值为null return v; }
slotExchange(Object item, boolean timed, long ns)方法的整个业务逻辑如下所示:
当一个线程来交换数据时,槽位(slot)有两种状态:
如果发现槽位(solt)没有数据,即slot为null,这时,当前线程就可以通过CAS操作占据slot,若CAS操作成功,则slot就已经被当前线程占据。如果失败,则有可能其他线程抢先了占据了slot,当前线程需要重头开始循环。占据slot成功的线程,需要等待其它线程来进行数据交换,此时,当前线程需要进行一段时间的自旋:
若在线程自旋期间,有其他线程来交换数据,则获取交换数据后,直接返回数据,而不用阻塞该进程。
若在线程自旋期间,没有其他线程来交换数据,那么就需要阻塞当前线程,在阻塞之前,还需要进行一次槽位判断,若槽位发生了变化,说明有其它线程来交换数据了,此时会延长当前线程的自旋时间,可能数据交换马上就完成;若槽位没有发生变化,则直接挂起当前线程,等待其他线程来交换数据,在另一个线程交换数据完成之后,另一个线程会唤醒与之配对交换的线程(即前面被挂起的线程),被唤醒的线程,继续执行,拿到交换的数据之后,直接返回,若出现了超时、被中断的情况,则返回值为null。
如果发现槽位(solt)已有数据,即slot不为null,这表明已经有其它线程占据了槽位,正在等待交换数据,那么当前线程就可以尝试进行数据交换,首先要通过CAS操作设置slot变量值,若CAS成功,则表示当前线程可以进行数据交换,否则,若CAS失败,则表示有其他线程抢先交换了数据,那这时,多个线程产生了竞争,然后,当前线程就会创建arena数组来避免竞争,用于后续的数据交换。
slotExchange(Object item, boolean timed, long ns)方法的整个执行步骤就是这样了,下面我们再看一看arenaExchange(Object item, boolean timed, long ns)方法的执行步骤。
arenaExchange(Object item, boolean timed, long ns)方法
在介绍该方法之前,我们需要先来了解一下与之相关的数据结构
在前面介绍的Node类,被加上了一个@sun.misc.Contended注解,这个是用来避免伪共享的,关于伪共享的详解,可以看这篇博客。在Exchanger类中,ASHIFT就是用来避免伪共享的:
/** * The byte distance (as a shift value) between any two used slots * in the arena. 1 << ASHIFT should be at least cacheline size. */ private static final int ASHIFT = 7;
对ASHIFT进行详细说明,下面看一看arenaExchange(Object item, boolean timed, long ns)方法:
private final Object arenaExchange(Object item, boolean timed, long ns) { // Node数组,具有多个槽位 Node[] a = arena; // 获取与线程相关联的Node对象 Node p = participant.get(); // p.index初始值为0 for (int i = p.index;;) { // access slot at i int b, m, c; long j; // j is raw array offset // 在数组中,根据索引i取出数据,j相当于该线程要访问的第一个槽位 Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); // 该槽位有数据,即已经有线程在此槽位等待交换数据 if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 进行数据交换 Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } // bound是最大的有效位置,和MMASK相与,得到真正存储数据的最大索引值 // 如果i小于最大索引,且对应槽位为空 else if (i <= (m = (b = bound) & MMASK) && q == null) { // 将需要交换的数据赋值给p p.item = item; // offer // 通过CAS来设置该槽位的数据,等待其他线程来交换 if (U.compareAndSwapObject(a, j, null, p)) { long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait // 自旋 for (int h = p.hash, spins = SPINS;;) { Object v = p.match; // 有其他线程来和该线程交换数据 if (v != null) { U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } // 其它线程来交换数据了,另一个线程修改了槽位数据,但是还没有设置match数据,这时可以再稍等一会 else if (U.getObjectVolatile(a, j) != p) spins = SPINS; // releaser hasn't set match yet // m == 0表明已经到达arena数组中最小的存储数据槽位,当前线程需要阻塞在这里 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window // 再次检查槽位,看看在阻塞前,有没有线程来交换数据 if (U.getObjectVolatile(a, j) == p) U.park(false, ns); p.parked = null; U.putObject(t, BLOCKER, null); } // 当前这个槽位一直没有线程来交换数据,可以换个槽位试试 else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { // 更新bound if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; // 减小索引值,往最小的存储数据槽位的方向挪动 i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; // 超时 if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } // 占据槽位失败 else p.item = null; // clear offer } // i不在有效索引范围内,或者对应槽位已经被其它线程抢先交换了 else { // 更新p.bound if (p.bound != b) { // stale; reset p.bound = b; // bound的CAS失败次数初始为0 p.collides = 0; // i如果到达最大值,就递减 i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 更新bound的CAS失败次数 p.collides = c + 1; i = (i == 0) ? m : i - 1; // cyclically traverse } // 递增i else i = m + 1; // grow // 更新index p.index = i; } } }
在slotExchange方法中,当存在竞争时,会创建arena数组:
arena = new Node[(FULL + 2) << ASHIFT];
在创建arena数组之前,会先设置bound为SEQ(SEQ=MMASK + 1),即bound的初始值为256:
/** * The maximum supported arena index. The maximum allocatable * arena size is MMASK + 1. Must be a power of two minus one, less * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices * for the expected scaling limits of the main algorithms. */ private static final int MMASK = 0xff; /** * Unit for sequence/version bits of bound field. Each successful * change to the bound also adds SEQ. */ private static final int SEQ = MMASK + 1;
arena的大小为(FULL + 2) << ASHIFT,因为1 << ASHIFT 是用于避免伪共享的,因此实际有效的Node 只有FULL + 2 个。
然后通过以下代码来获取arena中的节点:
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
ABASE定义如下:
Class<?> ak = Node[].class; // ABASE absorbs padding in front of element 0 ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
U.arrayBaseOffset(ak)就是计算存放第一个元素的内存地址,相对于数组对象起始地址的内存偏移量。可以看出,ABASE是计算出了一个新的起始地址,其前面的(1 << ASHIFT)位置都没有被利用。
当一个线程来交换数据时,若计算出的槽位索引有效,那对应的槽位有两种状态:
如果发现槽位(solt)没有数据,即slot为null,这时,当前线程就可以通过CAS操作占据slot,
若CAS操作成功
则slot就已经被当前线程占据。然后该线程会采用自旋+阻塞的方式进行等待交换数据。只有当槽位是第一个(m==0,i <= m)时,线程才会阻塞,否则,若长时间没有其他线程来交换数据,当前线程会换个槽位等待,首先,线程会将旧槽位的值通过CAS置为null,然后更新bound,索引值减半(i = p.index >>>= 1),如果设置了超时,需要进行超时判断,若发生超时,则直接返回。
若CAS操作失败
则有可能其他线程抢先了占据了slot,则将p.item设置为null,重新自旋。
如果发现槽位(solt)已有数据,即slot不为null,这表明已经有其它线程占据了槽位,正在等待交换数据,那么当前线程就可以尝试进行数据交换,首先要通过CAS操作设置slot变量值,若CAS成功,则表示当前线程可以进行数据交换,否则,若CAS失败,则表示有其他线程抢先交换了数据,那这时,多个线程产生了竞争,那么就更新bound和p.index。
arenaExchange(Object item, boolean timed, long ns)方法的运行逻辑总结如下:
当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待,如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,如果有数据就交换,没数据就等一会,但是不会阻塞在这里,在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪,如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。 第一个槽位并不是指的数组中的第一个,而是逻辑第一个,因为存在伪共享,多槽位中,部分空间没有被利用。