The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A status field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* <p>Insertion into a CLH queue requires only a single atomi operation on "tail", so there is a simple atomic point of demarcation from unqueued to queued. Similarly, dequeuing involves only updating the "head". However, it takes a bit more work for nodes to determine who their successors are, in part to deal with possible cancellation due to timeouts and interrupts.
*
The "prev" links (not used in original CLH locks), are mainly needed to handle cancellation. If a node is cancelled, its successor is (normally) relinked to a non-cancelled predecessor. For explanation of similar mechanics in the case of spin locks, see the papers by Scott and Scherer at http://www.cs.rochester.edu/u/scott/synchronization/
*
We also use "next" links to implement blocking mechanics.The thread id for each node is kept in its own node, so a predecessor signals the next node to wake up by traversing next link to determine which thread it is. Determination of successor must avoid races with newly queued nodes to set the "next" fields of their predecessors. This is solved when necessary by checking backwards from the atomicall updated "tail" when a node's successor appears to be null.
* (Or, said differently, the next-links are an optimization so that we don't usually need a backward scan.)
Cancellation introduces some conservatism to the basic algorithms. Since we must poll for cancellation of other nodes, we can miss noticing whether a cancelled node is ahead or behind us. This is dealt with by always unparking successors upon cancellation, allowing them to stabilize on a new predecessor, unless we can identify an uncancelle predecessor who will carry this responsibility.
CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted effort if there is never contention. Instead, the node is constructed and head and tail pointers are set upon first contention.
Threads waiting on Conditions use the same nodes, but use an additional link. Conditions only need to link nodes in simple (non-concurrent) linked queues because they are only accessed when exclusively held. Upon await, a node is inserted into a condition queue. Upon signal, the node is transferred to the main queue. A special value of status field is used to mark which queue a node is on.
*
* <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
* Scherer and Michael Scott, along with members of JSR-166
* expert group, for helpful ideas, discussions, and critiques
* on the design of this class.
等待队列是“CLH”(Craig、Landin和Hagersten)锁队列的变体。CLH锁通常用于自旋锁。相反,我们将它们用于阻塞同步器,但使用相同的基本策略,即在其节点的前身中保存关于线程的一些控制信息。每个节点中的status字段用于跟踪线程是否应该阻塞。当它的前身释放时,一个节点被通知。否则,队列中的每个节点都充当一个特定通知样式的监视器,该监视器持有单个等待线程。status字段不控制线程是否被授予锁等等。如果线程是队列中的第一个,那么它可能会尝试获取。但第一并不能保证成功;
它只给人争辩的权利。因此,当前发布的竞争者线程可能需要重新等待。
*
*
要进入CLH锁队列,您需要原子地将它作为新的tail拼接进来。要退出队列,只需设置head字段。
* < >之前
* +------+ prev +-----+ +-----+
*头部| | <---- | | <---- | |尾部
* +------+ +-----+ +-----+
* < / >之前
*
*
插入CLH队列只需要一个“tail”上的原子操作,所以有一个简单的原子点来划分从未排队到排队。类似地,退出队列只涉及更新“头”。但是,节点需要做更多的工作来确定谁是它们的继任者,部分原因是要处理由于超时和中断而可能发生的取消。
*
“prev”链接(在原始CLH锁中不使用)主要用于处理取消操作。如果一个节点被取消,它的后继节点(通常)会重新链接到一个未取消的前节点。关于旋锁的类似力学解释,请参阅http://www.cs.rochester.edu/u/scott/synchronization/上Scott和Scherer的论文
*
我们也使用“下一个”链接去执行阻塞机制。每个节点的线程id保存在它自己的节点中,因此前一个节点通过遍历next链接来确定它是哪个线程,从而向下一个节点发出唤醒信号。确定后继节点必须避免与新排队的节点竞争,以设置它们的前一个节点的“next”字段。当节点的后续节点显示为空时,通过从atomicall更新的“tail”向后检查来解决这个问题。
*(或者,换句话说,下一个链接是一种优化,所以我们通常不需要向后扫描。)
对消算法为基本算法引入了一些保守性。因为我们必须轮询其他节点的取消,我们可能会忽略一个被取消的节点是在我们前面还是后面。这可以通过在取消时总是解除后继器来处理,允许它们稳定在新的前身上,除非我们能够确定将承担此责任的uncancelle前身。
CLH队列需要一个虚拟头节点来启动。但
*我们不会在施工时创建它们,因为如果没有争用,那将是浪费精力。相反,在第一次争用时构造节点并设置头和尾指针。
等待条件的线程使用相同的节点,但使用额外的链接。条件只需要链接简单(非并发)链接队列中的节点,因为它们只在独占持有时才被访问。在await之后,一个节点被插入到条件队列中。收到信号后,节点被转移到主队列。status字段的特殊值用于标记节点所在的队列。
*
*
感谢Dave Dice, Mark Moir, Victor Luchangco, Bill
* Scherer和Michael Scott以及JSR-166成员
*专家组,提供有帮助的想法,讨论和批评
*关于这个类的设计。
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels. To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then,on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt. Nodes never leave this state. In particular, a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue. It will not be used as a sync queue node until transferred, at which time the status will be set to 0. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other nodes. This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.
* 0: None of the above 。The values are arranged numerically to simplify use. Non-negative values mean that a node doesn't need to signal. So, most code doesn't need to check for particular values, just for sign.The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes. It is modified using CAS (or when possible, unconditional volatile writes).
/ * *
*状态字段,只接受值:
* SIGNAL:该节点的后继节点被阻塞(或即将被阻塞)(通过park),所以当前节点在释放或取消后继节点时必须解除其后继节点的阻塞。为了避免竞争,acquire方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞。
* CANCELLED:该节点由于超时或中断而被取消。节点永远不会离开这个状态。特别是,一个被取消节点的线程永远不会再次阻塞。
* CONDITION:该节点目前在条件队列中。它将不会被用作同步队列节点,直到传输时,状态将被设置为0。(这里使用这个值与字段的其他用途没有任何关系,只是简化了机制。)
* PROPAGATE:一个releasshared应该传播到其他节点。这是在doreleasshared中设置的(仅针对头节点),以确保传播继续,即使其他操作已经介入。
* 0:以上都不是。数值按数字排列以简化使用。非负值意味着节点不需要发出信号。所以,大多数代码不需要检查特定的值,只需要检查符号。正常同步节点初始化为0,条件节点初始化为CONDITION。它可以使用CAS(或者在可能的情况下,使用无条件的volatile写操作)进行修改。
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//这段代码的意思是,从队列(Node链表)的末尾开始检查,如果是waitStatus<=0(等待被唤醒,或者等待被加入条件队列)
//这里的写法有些特别 t=t.prev,这其实就是类似i++而已。先执行下面的for循环,再执行t=t.prev。这里的执行结果,因为没有跳出for循环,所以
//会循环到t=null,也就是函数的变量node的下一个node,除非队列里面还有其他的Node=null
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果当前节点需要信号(被唤醒),则使用CAS将节点的继承者都设置为需要被唤醒,直至成功
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //成功后,则释放继承者
}
else if (ws == 0 && //这里ws==0,0是没有任何意义的,只是方便标记这个状态应该设置成共享值PROGAGATE。这里也是采用CAS设置直至成功
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed //这
//这里的写法比较有意思,如果head有变动,则重新循环释放信号。如果是我们的写法,则是if(h!=head){continue;}
break;
}
}
确保发布的传播,即使有其他正在进行的获取/发布。
如果head需要信号,则按照通常的方式尝试释放它的继承者。但如果没有,则将status设置为PROPAGATE,以确保在发布时继续传播。
此外,我们必须循环,以防止在执行此操作时添加新节点。
另外,与unpark继承者的其他用途不同,我们需要知道CAS重置状态是否失败,如果失败,则重新检查。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
* 设置队列头,并检查是否后来者可能在共享模式下等待,如果是这样传播,如果任何一个> 0或propagate状态设置。
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
/ *
*尝试信号下一个队列节点,如果:
*传播由调用者指示,
* 或者被前一个操作记录(在setHead之前或之后作为h.waitStatus记录)(注意:这个操作使用waitStatus的符号检查,因为传播状态可以转换被唤醒)。
*下一个节点正在共享模式下等待,或者我们不知道,因为它看起来是空的
*
*这两种检查的保守性可能会导致不必要的唤醒,但只有在存在多个自旋获取/释放时才会如此,所以大多数情况下现在或不久就需要被唤醒
*。
* /
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* Cancels an ongoing attempt to acquire.
* 取消node获取信号
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// 先跳过前置的节点。这里很有意思,也很巧妙,或者很另类,具体参考下面的图
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
取消节点获取信号,图示如下:
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
* 检查并更新一个获取信号失败的节点的状态。如果线程需要阻塞,则返回true。在所有的请求循环中,这是一个主要的唤醒控制。
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//这种写法蛮有意思的。意思是,将前驱节点b的前驱节点c赋值给前驱a,最后再将节点a的前驱节点设置为c。也就是,前驱节点b会被覆盖
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}