Java并发48:并发集合系列-基于CAS算法的非阻塞双向*队列ConcurrentLinkedDueue

原文地址:https://www.jianshu.com/p/602b3240afaf

ConcurrentLinkedDeque 是双向链表结构的*并发队列,从JDK 7开始加入到J.U.C的行列中,使用CAS实现并发安全。

与 ConcurrentLinkedQueue 的区别是该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除)。

ConcurrentLinkedDeque 适合“多生产,多消费”的场景。

内存一致性遵循:对 ConcurrentLinkedDeque 的插入操作先行发生于(happen-before)访问或移除操作。

相较于 ConcurrentLinkedQueue,ConcurrentLinkedDeque 由于是双端队列,所以在操作和概念上会更加复杂,来一起看下。

概述

ConcurrentLinkedDeque(后面称CLD) 的实现方式继承了 ConcurrentLinkedQueue 和 LinkedTransferQueue的思想。

ConcurrentLinkedDeque 在非阻塞算法的实现方面与 ConcurrentLinkedQueue 基本一致。

关于 ConcurrentLinkedQueue,请参考笔者的上一篇文章:Java并发47:并发集合系列-基于CAS算法的非阻塞单向*队列ConcurrentLinkedQueue

数据结构

Java并发48:并发集合系列-基于CAS算法的非阻塞双向*队列ConcurrentLinkedDueue

重要属性:

//头节点
private transient volatile Node<E> head;
//尾节点
private transient volatile Node<E> tail;
//终止节点
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
//移除节点时更新链表属性的阀值
private static final int HOPS = 2;

和ConcurrentLinkedQueue一样,CLD 内部也只维护了head和tail属性,对 head/tail 节点也使用了“不变性”和“可变性”约束,不过跟 ConcurrentLinkedQueue 有些许差异,我们来看一下:

head/tail 的不变性:

  • 第一个节点总是能以O(1)的时间复杂度从 head 通过 prev 链接到达;
  • 最后一个节点总是能以O(1)的时间复杂度从 tail 通过 next 链接到达;
  • 所有live节点(item不为null的节点),都能从第一个节点通过调用 succ() 方法遍历可达;
  • 所有live节点(item不为null的节点),都能从最后一个节点通过调用 pred() 方法遍历可达;
  • head/tail 不能为 null;
  • head 节点的 next 域不能引用到自身;
  • head/tail 不会是GC-unlinked节点(但它可能是unlink节点)。

head/tail的可变性:

  • head/tail 节点的 item 域可能为 null,也可能不为 null;
  • head/tail 节点可能从first/last/tail/head 节点访问时不可达;
  • tail 节点的 next 域可以引用到自身。

除此之外,再来看看CLD中另外两个属性:

  • PREV_TERMINATOR:prev的终止节点,next指向自身,即PREV_TERMINATOR.next = PREV_TERMINATOR。在 first 节点出列后,会把first.next指向自身(first.next=first),然后把prev设为PREV_TERMINATOR。
  • NEXT_TERMINATOR:next的终止节点,prev指向自身,即NEXT_TERMINATOR.pre = NEXT_TERMINATOR。在 last 节点出列后,会把last.prev指向自身(last.prev=last),然后把next设为NEXT_TERMINATOR。

添加(入列)

CLD的添加方法包括:offer(E)、add(E)、push(E)、addFirst(E)、addLast(E)、offerFirst(E)、offerLast(E)。

所有这些操作都是通过linkFirst(E)或linkLast(E)来实现的。

linkFirst(E) / linkLast(E)

/**
 * Links e as first element.
 */
private void linkFirst(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    restartFromHead:
    for (;;)
        //从head节点往前寻找first节点
        for (Node<E> h = head, p = h, q;;) {
            if ((q = p.prev) != null &&
                (q = (p = q).prev) != null)
                // Check for head updates every other hop.
                // If p == q, we are sure to follow head instead.
                //如果head被修改,返回head重新查找
                p = (h != (h = head)) ? h : q;
            else if (p.next == p) // 自链接节点,重新查找
                continue restartFromHead;
            else {
                // p is first node
                newNode.lazySetNext(p); // CAS piggyback
                if (p.casPrev(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this deque,
                    // and for newNode to become "live".
                    if (p != h) // hop two nodes at a time 跳两个节点时才修改head
                        casHead(h, newNode);  // Failure is OK.
                    return;
                }
                // Lost CAS race to another thread; re-read prev
            }
        }
}

/**
 * Links e as last element.
 */
private void linkLast(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    restartFromTail:
    for (;;)
        //从tail节点往后寻找last节点
        for (Node<E> t = tail, p = t, q;;) {
            if ((q = p.next) != null &&
                (q = (p = q).next) != null)
                // Check for tail updates every other hop.
                // If p == q, we are sure to follow tail instead.
                //如果tail被修改,返回tail重新查找
                p = (t != (t = tail)) ? t : q;
            else if (p.prev == p) // 自链接节点,重新查找
                continue restartFromTail;
            else {
                // p is last node
                newNode.lazySetPrev(p); // CAS piggyback
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this deque,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time 跳两个节点时才修改tail
                        casTail(t, newNode);  // Failure is OK.
                    return;
                }
                // Lost CAS race to another thread; re-read next
            }
        }
}

说明:

linkFirst是插入新节点到队列头的主函数,执行流程如下:

  • 首先从 head 节点开始向前循环找到 first 节点(p.prev==null&&p.next!=p);
  • 然后通过lazySetNext设置新节点的 next 节点为 first;
  • 然后 CAS 修改 first 的 prev 为新节点。

注意这里 CAS 指令成功后会判断 first 节点是否已经跳了两个节点,只有在跳了两个节点才会 CAS 更新 head,这也是为了节省 CAS 指令执行开销。

linkLast是插入新节点到队列尾,执行流程与linkFirst一致,不多赘述,具体见源码。


获取(出列)

CLD的获取方法分两种:

  • 获取节点:peek、peekFirst 、peekLast、getFirst、getLast,都是通过peekFirst 、peekLast实现。
  • 获取并移除节点: poll、pop、remove、pollFirst、pollLast、removeFirst、removeLast,都是通过pollFirst、pollLast实现。

pollFirst、pollLast包括了peekFirst 、peekLast的实现,都是找到并返回 first/last 节点。

不同的是,pollFirst、pollLast比peekFirst 、peekLast多了unlink这一步。

所以这里我们只对pollFirst和pollLast两个方法进行解析。

首先来看一下pollFirst() :

/**获取并移除队列首节点*/
public E pollFirst() {
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && p.casItem(item, null)) {
            unlink(p);
            return item;
        }
    }
    return null;
}

说明:

pollFirst()用于找到链表中首个 item 不为 null 的节点,并返回节点的item。

注意并不是first节点,因为first节点的item可以为null。

涉及的内部方法较多,不过都很简单,我们通过穿插代码方式分析:

1.首先通过first()方法找到first节点,first 节点必须为 active 节点(p.prev==null&&p.next!=p)。first()源码如下:

Node<E> first() {
    restartFromHead:
    for (;;)
        //从head开始往前找
        for (Node<E> h = head, p = h, q;;) {
            if ((q = p.prev) != null &&
                (q = (p = q).prev) != null)
                // Check for head updates every other hop.
                // If p == q, we are sure to follow head instead.
                //如果head被修改则返回新的head重新查找,否则继续向前(prev)查找
                p = (h != (h = head)) ? h : q;
            else if (p == h
                     // It is possible that p is PREV_TERMINATOR,
                     // but if so, the CAS is guaranteed to fail.
                    //找到的节点不是head节点,CAS修改head
                     || casHead(h, p))
                return p;
            else
                continue restartFromHead;
        }
}

2.如果first.item==null(这里是允许的,具体见上面我们对 first/last 节点的介绍),则继续调用succ方法寻找后继节点。succ源码如下:

/**返回指定节点的的后继节点,如果指定节点的next指向自己,返回first节点*/
final Node<E> succ(Node<E> p) {
    // TODO: should we skip deleted nodes here?
    Node<E> q = p.next;
    return (p == q) ? first() : q;
}

3.CAS 修改节点的 item 为 null(即 “逻辑删除-logical deletion”),然后调用unlink(p)方法解除节点链接,最后返回 item。

小结

本章与 ConcurrentLinkedQueue 一篇中的非阻塞算法基本一致,只是为双端操作定义了几个可供操作的节点类型。

上一篇:CONDOR电源维修GPMP900-48概述


下一篇:交互简历案例(48)