Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

 

概要

本章介绍JUC包中的LinkedBlockingDeque。内容包括:
LinkedBlockingDeque介绍
LinkedBlockingDeque原理和数据结构
LinkedBlockingDeque函数列表
LinkedBlockingDeque源码分析(JDK1.7.0_40版本)
LinkedBlockingDeque示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503480.html

 

LinkedBlockingDeque介绍

LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。

此外,LinkedBlockingDeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

 

LinkedBlockingDeque原理和数据结构

LinkedBlockingDeque的数据结构,如下图所示:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

说明
1. LinkedBlockingDeque继承于AbstractQueue,它本质上是一个支持FIFO和FILO的双向的队列。
2. LinkedBlockingDeque实现了BlockingDeque接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
3. LinkedBlockingDeque是通过双向链表实现的。
3.1 first是双向链表的表头。
3.2 last是双向链表的表尾。
3.3 count是LinkedBlockingDeque的实际大小,即双向链表中当前节点个数。
3.4 capacity是LinkedBlockingDeque的容量,它是在创建LinkedBlockingDeque时指定的。
3.5 lock是控制对LinkedBlockingDeque的互斥锁,当多个线程竞争同时访问LinkedBlockingDeque时,某线程获取到了互斥锁lock,其它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取cpu执行权。
3.6 notEmptynotFull分别是“非空条件”和“未满条件”。通过它们能够更加细腻进行并发控制。

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
     -- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。
     -- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

关于ReentrantLock 和 Condition等更多的内容,可以参考:
    (01) Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock
    (02) Java多线程系列--“JUC锁”03之 公平锁(一)
    (03) Java多线程系列--“JUC锁”04之 公平锁(二)
    (04) Java多线程系列--“JUC锁”05之 非公平锁
    (05) Java多线程系列--“JUC锁”06之 Condition条件

 

LinkedBlockingDeque函数列表

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque。
LinkedBlockingDeque()
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingDeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加。
LinkedBlockingDeque(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingDeque。
LinkedBlockingDeque(int capacity)

// 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾。
boolean add(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出 IllegalStateException。
void addFirst(E e)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出 IllegalStateException。
void addLast(E e)
// 以原子方式 (atomically) 从此双端队列移除所有元素。
void clear()
// 如果此双端队列包含指定的元素,则返回 true。
boolean contains(Object o)
// 返回在此双端队列的元素上以逆向连续顺序进行迭代的迭代器。
Iterator<E> descendingIterator()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 获取但不移除此双端队列表示的队列的头部。
E element()
// 获取,但不移除此双端队列的第一个元素。
E getFirst()
// 获取,但不移除此双端队列的最后一个元素。
E getLast()
// 返回在此双端队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offer(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将在指定的等待时间内一直等待可用空间。
boolean offer(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerFirst(E e)
// 将指定的元素插入此双端队列的开头,必要时将在指定的等待时间内等待可用空间。
boolean offerFirst(E e, long timeout, TimeUnit unit)
// 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾,并在成功时返回 true;如果当前没有空间可用,则返回 false。
boolean offerLast(E e)
// 将指定的元素插入此双端队列的末尾,必要时将在指定的等待时间内等待可用空间。
boolean offerLast(E e, long timeout, TimeUnit unit)
// 获取但不移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E peek()
// 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E peekFirst()
// 获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E peekLast()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。
E poll()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。
E poll(long timeout, TimeUnit unit)
// 获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
E pollFirst()
// 获取并移除此双端队列的第一个元素,必要时将在指定的等待时间等待可用元素。
E pollFirst(long timeout, TimeUnit unit)
// 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。
E pollLast()
// 获取并移除此双端队列的最后一个元素,必要时将在指定的等待时间内等待可用元素。
E pollLast(long timeout, TimeUnit unit)
// 从此双端队列所表示的堆栈中弹出一个元素。
E pop()
// 将元素推入此双端队列表示的栈。
void push(E e)
// 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将一直等待可用空间。
void put(E e)
// 将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。
void putFirst(E e)
// 将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。
void putLast(E e)
// 返回理想情况下(没有内存和资源约束)此双端队列可不受阻塞地接受的额外元素数。
int remainingCapacity()
// 获取并移除此双端队列表示的队列的头部。
E remove()
// 从此双端队列移除第一次出现的指定元素。
boolean remove(Object o)
// 获取并移除此双端队列第一个元素。
E removeFirst()
// 从此双端队列移除第一次出现的指定元素。
boolean removeFirstOccurrence(Object o)
// 获取并移除此双端队列的最后一个元素。
E removeLast()
// 从此双端队列移除最后一次出现的指定元素。
boolean removeLastOccurrence(Object o)
// 返回此双端队列中的元素数。
int size()
// 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素。
E take()
// 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素。
E takeFirst()
// 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素。
E takeLast()
// 返回以恰当顺序(从第一个元素到最后一个元素)包含此双端队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此双端队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

 

LinkedBlockingDeque源码分析(JDK1.7.0_40版本)

LinkedBlockingDeque.java的完整源码如下:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
   1 /*
   2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
   3  *
   4  *
   5  *
   6  *
   7  *
   8  *
   9  *
  10  *
  11  *
  12  *
  13  *
  14  *
  15  *
  16  *
  17  *
  18  *
  19  *
  20  *
  21  *
  22  *
  23  */
  24 
  25 /*
  26  *
  27  *
  28  *
  29  *
  30  *
  31  * Written by Doug Lea with assistance from members of JCP JSR-166
  32  * Expert Group and released to the public domain, as explained at
  33  * http://creativecommons.org/publicdomain/zero/1.0/
  34  */
  35 
  36 package java.util.concurrent;
  37 
  38 import java.util.AbstractQueue;
  39 import java.util.Collection;
  40 import java.util.Iterator;
  41 import java.util.NoSuchElementException;
  42 import java.util.concurrent.locks.Condition;
  43 import java.util.concurrent.locks.ReentrantLock;
  44 
  45 /**
  46  * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
  47  * linked nodes.
  48  *
  49  * <p> The optional capacity bound constructor argument serves as a
  50  * way to prevent excessive expansion. The capacity, if unspecified,
  51  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
  52  * dynamically created upon each insertion unless this would bring the
  53  * deque above capacity.
  54  *
  55  * <p>Most operations run in constant time (ignoring time spent
  56  * blocking).  Exceptions include {@link #remove(Object) remove},
  57  * {@link #removeFirstOccurrence removeFirstOccurrence}, {@link
  58  * #removeLastOccurrence removeLastOccurrence}, {@link #contains
  59  * contains}, {@link #iterator iterator.remove()}, and the bulk
  60  * operations, all of which run in linear time.
  61  *
  62  * <p>This class and its iterator implement all of the
  63  * <em>optional</em> methods of the {@link Collection} and {@link
  64  * Iterator} interfaces.
  65  *
  66  * <p>This class is a member of the
  67  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  68  * Java Collections Framework</a>.
  69  *
  70  * @since 1.6
  71  * @author  Doug Lea
  72  * @param <E> the type of elements held in this collection
  73  */
  74 public class LinkedBlockingDeque<E>
  75     extends AbstractQueue<E>
  76     implements BlockingDeque<E>,  java.io.Serializable {
  77 
  78     /*
  79      * Implemented as a simple doubly-linked list protected by a
  80      * single lock and using conditions to manage blocking.
  81      *
  82      * To implement weakly consistent iterators, it appears we need to
  83      * keep all Nodes GC-reachable from a predecessor dequeued Node.
  84      * That would cause two problems:
  85      * - allow a rogue Iterator to cause unbounded memory retention
  86      * - cause cross-generational linking of old Nodes to new Nodes if
  87      *   a Node was tenured while live, which generational GCs have a
  88      *   hard time dealing with, causing repeated major collections.
  89      * However, only non-deleted Nodes need to be reachable from
  90      * dequeued Nodes, and reachability does not necessarily have to
  91      * be of the kind understood by the GC.  We use the trick of
  92      * linking a Node that has just been dequeued to itself.  Such a
  93      * self-link implicitly means to jump to "first" (for next links)
  94      * or "last" (for prev links).
  95      */
  96 
  97     /*
  98      * We have "diamond" multiple interface/abstract class inheritance
  99      * here, and that introduces ambiguities. Often we want the
 100      * BlockingDeque javadoc combined with the AbstractQueue
 101      * implementation, so a lot of method specs are duplicated here.
 102      */
 103 
 104     private static final long serialVersionUID = -387911632671998426L;
 105 
 106     /** Doubly-linked list node class */
 107     static final class Node<E> {
 108         /**
 109          * The item, or null if this node has been removed.
 110          */
 111         E item;
 112 
 113         /**
 114          * One of:
 115          * - the real predecessor Node
 116          * - this Node, meaning the predecessor is tail
 117          * - null, meaning there is no predecessor
 118          */
 119         Node<E> prev;
 120 
 121         /**
 122          * One of:
 123          * - the real successor Node
 124          * - this Node, meaning the successor is head
 125          * - null, meaning there is no successor
 126          */
 127         Node<E> next;
 128 
 129         Node(E x) {
 130             item = x;
 131         }
 132     }
 133 
 134     /**
 135      * Pointer to first node.
 136      * Invariant: (first == null && last == null) ||
 137      *            (first.prev == null && first.item != null)
 138      */
 139     transient Node<E> first;
 140 
 141     /**
 142      * Pointer to last node.
 143      * Invariant: (first == null && last == null) ||
 144      *            (last.next == null && last.item != null)
 145      */
 146     transient Node<E> last;
 147 
 148     /** Number of items in the deque */
 149     private transient int count;
 150 
 151     /** Maximum number of items in the deque */
 152     private final int capacity;
 153 
 154     /** Main lock guarding all access */
 155     final ReentrantLock lock = new ReentrantLock();
 156 
 157     /** Condition for waiting takes */
 158     private final Condition notEmpty = lock.newCondition();
 159 
 160     /** Condition for waiting puts */
 161     private final Condition notFull = lock.newCondition();
 162 
 163     /**
 164      * Creates a {@code LinkedBlockingDeque} with a capacity of
 165      * {@link Integer#MAX_VALUE}.
 166      */
 167     public LinkedBlockingDeque() {
 168         this(Integer.MAX_VALUE);
 169     }
 170 
 171     /**
 172      * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
 173      *
 174      * @param capacity the capacity of this deque
 175      * @throws IllegalArgumentException if {@code capacity} is less than 1
 176      */
 177     public LinkedBlockingDeque(int capacity) {
 178         if (capacity <= 0) throw new IllegalArgumentException();
 179         this.capacity = capacity;
 180     }
 181 
 182     /**
 183      * Creates a {@code LinkedBlockingDeque} with a capacity of
 184      * {@link Integer#MAX_VALUE}, initially containing the elements of
 185      * the given collection, added in traversal order of the
 186      * collection‘s iterator.
 187      *
 188      * @param c the collection of elements to initially contain
 189      * @throws NullPointerException if the specified collection or any
 190      *         of its elements are null
 191      */
 192     public LinkedBlockingDeque(Collection<? extends E> c) {
 193         this(Integer.MAX_VALUE);
 194         final ReentrantLock lock = this.lock;
 195         lock.lock(); // Never contended, but necessary for visibility
 196         try {
 197             for (E e : c) {
 198                 if (e == null)
 199                     throw new NullPointerException();
 200                 if (!linkLast(new Node<E>(e)))
 201                     throw new IllegalStateException("Deque full");
 202             }
 203         } finally {
 204             lock.unlock();
 205         }
 206     }
 207 
 208 
 209     // Basic linking and unlinking operations, called only while holding lock
 210 
 211     /**
 212      * Links node as first element, or returns false if full.
 213      */
 214     private boolean linkFirst(Node<E> node) {
 215         // assert lock.isHeldByCurrentThread();
 216         if (count >= capacity)
 217             return false;
 218         Node<E> f = first;
 219         node.next = f;
 220         first = node;
 221         if (last == null)
 222             last = node;
 223         else
 224             f.prev = node;
 225         ++count;
 226         notEmpty.signal();
 227         return true;
 228     }
 229 
 230     /**
 231      * Links node as last element, or returns false if full.
 232      */
 233     private boolean linkLast(Node<E> node) {
 234         // assert lock.isHeldByCurrentThread();
 235         if (count >= capacity)
 236             return false;
 237         Node<E> l = last;
 238         node.prev = l;
 239         last = node;
 240         if (first == null)
 241             first = node;
 242         else
 243             l.next = node;
 244         ++count;
 245         notEmpty.signal();
 246         return true;
 247     }
 248 
 249     /**
 250      * Removes and returns first element, or null if empty.
 251      */
 252     private E unlinkFirst() {
 253         // assert lock.isHeldByCurrentThread();
 254         Node<E> f = first;
 255         if (f == null)
 256             return null;
 257         Node<E> n = f.next;
 258         E item = f.item;
 259         f.item = null;
 260         f.next = f; // help GC
 261         first = n;
 262         if (n == null)
 263             last = null;
 264         else
 265             n.prev = null;
 266         --count;
 267         notFull.signal();
 268         return item;
 269     }
 270 
 271     /**
 272      * Removes and returns last element, or null if empty.
 273      */
 274     private E unlinkLast() {
 275         // assert lock.isHeldByCurrentThread();
 276         Node<E> l = last;
 277         if (l == null)
 278             return null;
 279         Node<E> p = l.prev;
 280         E item = l.item;
 281         l.item = null;
 282         l.prev = l; // help GC
 283         last = p;
 284         if (p == null)
 285             first = null;
 286         else
 287             p.next = null;
 288         --count;
 289         notFull.signal();
 290         return item;
 291     }
 292 
 293     /**
 294      * Unlinks x.
 295      */
 296     void unlink(Node<E> x) {
 297         // assert lock.isHeldByCurrentThread();
 298         Node<E> p = x.prev;
 299         Node<E> n = x.next;
 300         if (p == null) {
 301             unlinkFirst();
 302         } else if (n == null) {
 303             unlinkLast();
 304         } else {
 305             p.next = n;
 306             n.prev = p;
 307             x.item = null;
 308             // Don‘t mess with x‘s links.  They may still be in use by
 309             // an iterator.
 310             --count;
 311             notFull.signal();
 312         }
 313     }
 314 
 315     // BlockingDeque methods
 316 
 317     /**
 318      * @throws IllegalStateException {@inheritDoc}
 319      * @throws NullPointerException  {@inheritDoc}
 320      */
 321     public void addFirst(E e) {
 322         if (!offerFirst(e))
 323             throw new IllegalStateException("Deque full");
 324     }
 325 
 326     /**
 327      * @throws IllegalStateException {@inheritDoc}
 328      * @throws NullPointerException  {@inheritDoc}
 329      */
 330     public void addLast(E e) {
 331         if (!offerLast(e))
 332             throw new IllegalStateException("Deque full");
 333     }
 334 
 335     /**
 336      * @throws NullPointerException {@inheritDoc}
 337      */
 338     public boolean offerFirst(E e) {
 339         if (e == null) throw new NullPointerException();
 340         Node<E> node = new Node<E>(e);
 341         final ReentrantLock lock = this.lock;
 342         lock.lock();
 343         try {
 344             return linkFirst(node);
 345         } finally {
 346             lock.unlock();
 347         }
 348     }
 349 
 350     /**
 351      * @throws NullPointerException {@inheritDoc}
 352      */
 353     public boolean offerLast(E e) {
 354         if (e == null) throw new NullPointerException();
 355         Node<E> node = new Node<E>(e);
 356         final ReentrantLock lock = this.lock;
 357         lock.lock();
 358         try {
 359             return linkLast(node);
 360         } finally {
 361             lock.unlock();
 362         }
 363     }
 364 
 365     /**
 366      * @throws NullPointerException {@inheritDoc}
 367      * @throws InterruptedException {@inheritDoc}
 368      */
 369     public void putFirst(E e) throws InterruptedException {
 370         if (e == null) throw new NullPointerException();
 371         Node<E> node = new Node<E>(e);
 372         final ReentrantLock lock = this.lock;
 373         lock.lock();
 374         try {
 375             while (!linkFirst(node))
 376                 notFull.await();
 377         } finally {
 378             lock.unlock();
 379         }
 380     }
 381 
 382     /**
 383      * @throws NullPointerException {@inheritDoc}
 384      * @throws InterruptedException {@inheritDoc}
 385      */
 386     public void putLast(E e) throws InterruptedException {
 387         if (e == null) throw new NullPointerException();
 388         Node<E> node = new Node<E>(e);
 389         final ReentrantLock lock = this.lock;
 390         lock.lock();
 391         try {
 392             while (!linkLast(node))
 393                 notFull.await();
 394         } finally {
 395             lock.unlock();
 396         }
 397     }
 398 
 399     /**
 400      * @throws NullPointerException {@inheritDoc}
 401      * @throws InterruptedException {@inheritDoc}
 402      */
 403     public boolean offerFirst(E e, long timeout, TimeUnit unit)
 404         throws InterruptedException {
 405         if (e == null) throw new NullPointerException();
 406         Node<E> node = new Node<E>(e);
 407         long nanos = unit.toNanos(timeout);
 408         final ReentrantLock lock = this.lock;
 409         lock.lockInterruptibly();
 410         try {
 411             while (!linkFirst(node)) {
 412                 if (nanos <= 0)
 413                     return false;
 414                 nanos = notFull.awaitNanos(nanos);
 415             }
 416             return true;
 417         } finally {
 418             lock.unlock();
 419         }
 420     }
 421 
 422     /**
 423      * @throws NullPointerException {@inheritDoc}
 424      * @throws InterruptedException {@inheritDoc}
 425      */
 426     public boolean offerLast(E e, long timeout, TimeUnit unit)
 427         throws InterruptedException {
 428         if (e == null) throw new NullPointerException();
 429         Node<E> node = new Node<E>(e);
 430         long nanos = unit.toNanos(timeout);
 431         final ReentrantLock lock = this.lock;
 432         lock.lockInterruptibly();
 433         try {
 434             while (!linkLast(node)) {
 435                 if (nanos <= 0)
 436                     return false;
 437                 nanos = notFull.awaitNanos(nanos);
 438             }
 439             return true;
 440         } finally {
 441             lock.unlock();
 442         }
 443     }
 444 
 445     /**
 446      * @throws NoSuchElementException {@inheritDoc}
 447      */
 448     public E removeFirst() {
 449         E x = pollFirst();
 450         if (x == null) throw new NoSuchElementException();
 451         return x;
 452     }
 453 
 454     /**
 455      * @throws NoSuchElementException {@inheritDoc}
 456      */
 457     public E removeLast() {
 458         E x = pollLast();
 459         if (x == null) throw new NoSuchElementException();
 460         return x;
 461     }
 462 
 463     public E pollFirst() {
 464         final ReentrantLock lock = this.lock;
 465         lock.lock();
 466         try {
 467             return unlinkFirst();
 468         } finally {
 469             lock.unlock();
 470         }
 471     }
 472 
 473     public E pollLast() {
 474         final ReentrantLock lock = this.lock;
 475         lock.lock();
 476         try {
 477             return unlinkLast();
 478         } finally {
 479             lock.unlock();
 480         }
 481     }
 482 
 483     public E takeFirst() throws InterruptedException {
 484         final ReentrantLock lock = this.lock;
 485         lock.lock();
 486         try {
 487             E x;
 488             while ( (x = unlinkFirst()) == null)
 489                 notEmpty.await();
 490             return x;
 491         } finally {
 492             lock.unlock();
 493         }
 494     }
 495 
 496     public E takeLast() throws InterruptedException {
 497         final ReentrantLock lock = this.lock;
 498         lock.lock();
 499         try {
 500             E x;
 501             while ( (x = unlinkLast()) == null)
 502                 notEmpty.await();
 503             return x;
 504         } finally {
 505             lock.unlock();
 506         }
 507     }
 508 
 509     public E pollFirst(long timeout, TimeUnit unit)
 510         throws InterruptedException {
 511         long nanos = unit.toNanos(timeout);
 512         final ReentrantLock lock = this.lock;
 513         lock.lockInterruptibly();
 514         try {
 515             E x;
 516             while ( (x = unlinkFirst()) == null) {
 517                 if (nanos <= 0)
 518                     return null;
 519                 nanos = notEmpty.awaitNanos(nanos);
 520             }
 521             return x;
 522         } finally {
 523             lock.unlock();
 524         }
 525     }
 526 
 527     public E pollLast(long timeout, TimeUnit unit)
 528         throws InterruptedException {
 529         long nanos = unit.toNanos(timeout);
 530         final ReentrantLock lock = this.lock;
 531         lock.lockInterruptibly();
 532         try {
 533             E x;
 534             while ( (x = unlinkLast()) == null) {
 535                 if (nanos <= 0)
 536                     return null;
 537                 nanos = notEmpty.awaitNanos(nanos);
 538             }
 539             return x;
 540         } finally {
 541             lock.unlock();
 542         }
 543     }
 544 
 545     /**
 546      * @throws NoSuchElementException {@inheritDoc}
 547      */
 548     public E getFirst() {
 549         E x = peekFirst();
 550         if (x == null) throw new NoSuchElementException();
 551         return x;
 552     }
 553 
 554     /**
 555      * @throws NoSuchElementException {@inheritDoc}
 556      */
 557     public E getLast() {
 558         E x = peekLast();
 559         if (x == null) throw new NoSuchElementException();
 560         return x;
 561     }
 562 
 563     public E peekFirst() {
 564         final ReentrantLock lock = this.lock;
 565         lock.lock();
 566         try {
 567             return (first == null) ? null : first.item;
 568         } finally {
 569             lock.unlock();
 570         }
 571     }
 572 
 573     public E peekLast() {
 574         final ReentrantLock lock = this.lock;
 575         lock.lock();
 576         try {
 577             return (last == null) ? null : last.item;
 578         } finally {
 579             lock.unlock();
 580         }
 581     }
 582 
 583     public boolean removeFirstOccurrence(Object o) {
 584         if (o == null) return false;
 585         final ReentrantLock lock = this.lock;
 586         lock.lock();
 587         try {
 588             for (Node<E> p = first; p != null; p = p.next) {
 589                 if (o.equals(p.item)) {
 590                     unlink(p);
 591                     return true;
 592                 }
 593             }
 594             return false;
 595         } finally {
 596             lock.unlock();
 597         }
 598     }
 599 
 600     public boolean removeLastOccurrence(Object o) {
 601         if (o == null) return false;
 602         final ReentrantLock lock = this.lock;
 603         lock.lock();
 604         try {
 605             for (Node<E> p = last; p != null; p = p.prev) {
 606                 if (o.equals(p.item)) {
 607                     unlink(p);
 608                     return true;
 609                 }
 610             }
 611             return false;
 612         } finally {
 613             lock.unlock();
 614         }
 615     }
 616 
 617     // BlockingQueue methods
 618 
 619     /**
 620      * Inserts the specified element at the end of this deque unless it would
 621      * violate capacity restrictions.  When using a capacity-restricted deque,
 622      * it is generally preferable to use method {@link #offer(Object) offer}.
 623      *
 624      * <p>This method is equivalent to {@link #addLast}.
 625      *
 626      * @throws IllegalStateException if the element cannot be added at this
 627      *         time due to capacity restrictions
 628      * @throws NullPointerException if the specified element is null
 629      */
 630     public boolean add(E e) {
 631         addLast(e);
 632         return true;
 633     }
 634 
 635     /**
 636      * @throws NullPointerException if the specified element is null
 637      */
 638     public boolean offer(E e) {
 639         return offerLast(e);
 640     }
 641 
 642     /**
 643      * @throws NullPointerException {@inheritDoc}
 644      * @throws InterruptedException {@inheritDoc}
 645      */
 646     public void put(E e) throws InterruptedException {
 647         putLast(e);
 648     }
 649 
 650     /**
 651      * @throws NullPointerException {@inheritDoc}
 652      * @throws InterruptedException {@inheritDoc}
 653      */
 654     public boolean offer(E e, long timeout, TimeUnit unit)
 655         throws InterruptedException {
 656         return offerLast(e, timeout, unit);
 657     }
 658 
 659     /**
 660      * Retrieves and removes the head of the queue represented by this deque.
 661      * This method differs from {@link #poll poll} only in that it throws an
 662      * exception if this deque is empty.
 663      *
 664      * <p>This method is equivalent to {@link #removeFirst() removeFirst}.
 665      *
 666      * @return the head of the queue represented by this deque
 667      * @throws NoSuchElementException if this deque is empty
 668      */
 669     public E remove() {
 670         return removeFirst();
 671     }
 672 
 673     public E poll() {
 674         return pollFirst();
 675     }
 676 
 677     public E take() throws InterruptedException {
 678         return takeFirst();
 679     }
 680 
 681     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
 682         return pollFirst(timeout, unit);
 683     }
 684 
 685     /**
 686      * Retrieves, but does not remove, the head of the queue represented by
 687      * this deque.  This method differs from {@link #peek peek} only in that
 688      * it throws an exception if this deque is empty.
 689      *
 690      * <p>This method is equivalent to {@link #getFirst() getFirst}.
 691      *
 692      * @return the head of the queue represented by this deque
 693      * @throws NoSuchElementException if this deque is empty
 694      */
 695     public E element() {
 696         return getFirst();
 697     }
 698 
 699     public E peek() {
 700         return peekFirst();
 701     }
 702 
 703     /**
 704      * Returns the number of additional elements that this deque can ideally
 705      * (in the absence of memory or resource constraints) accept without
 706      * blocking. This is always equal to the initial capacity of this deque
 707      * less the current {@code size} of this deque.
 708      *
 709      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
 710      * an element will succeed by inspecting {@code remainingCapacity}
 711      * because it may be the case that another thread is about to
 712      * insert or remove an element.
 713      */
 714     public int remainingCapacity() {
 715         final ReentrantLock lock = this.lock;
 716         lock.lock();
 717         try {
 718             return capacity - count;
 719         } finally {
 720             lock.unlock();
 721         }
 722     }
 723 
 724     /**
 725      * @throws UnsupportedOperationException {@inheritDoc}
 726      * @throws ClassCastException            {@inheritDoc}
 727      * @throws NullPointerException          {@inheritDoc}
 728      * @throws IllegalArgumentException      {@inheritDoc}
 729      */
 730     public int drainTo(Collection<? super E> c) {
 731         return drainTo(c, Integer.MAX_VALUE);
 732     }
 733 
 734     /**
 735      * @throws UnsupportedOperationException {@inheritDoc}
 736      * @throws ClassCastException            {@inheritDoc}
 737      * @throws NullPointerException          {@inheritDoc}
 738      * @throws IllegalArgumentException      {@inheritDoc}
 739      */
 740     public int drainTo(Collection<? super E> c, int maxElements) {
 741         if (c == null)
 742             throw new NullPointerException();
 743         if (c == this)
 744             throw new IllegalArgumentException();
 745         final ReentrantLock lock = this.lock;
 746         lock.lock();
 747         try {
 748             int n = Math.min(maxElements, count);
 749             for (int i = 0; i < n; i++) {
 750                 c.add(first.item);   // In this order, in case add() throws.
 751                 unlinkFirst();
 752             }
 753             return n;
 754         } finally {
 755             lock.unlock();
 756         }
 757     }
 758 
 759     // Stack methods
 760 
 761     /**
 762      * @throws IllegalStateException {@inheritDoc}
 763      * @throws NullPointerException  {@inheritDoc}
 764      */
 765     public void push(E e) {
 766         addFirst(e);
 767     }
 768 
 769     /**
 770      * @throws NoSuchElementException {@inheritDoc}
 771      */
 772     public E pop() {
 773         return removeFirst();
 774     }
 775 
 776     // Collection methods
 777 
 778     /**
 779      * Removes the first occurrence of the specified element from this deque.
 780      * If the deque does not contain the element, it is unchanged.
 781      * More formally, removes the first element {@code e} such that
 782      * {@code o.equals(e)} (if such an element exists).
 783      * Returns {@code true} if this deque contained the specified element
 784      * (or equivalently, if this deque changed as a result of the call).
 785      *
 786      * <p>This method is equivalent to
 787      * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
 788      *
 789      * @param o element to be removed from this deque, if present
 790      * @return {@code true} if this deque changed as a result of the call
 791      */
 792     public boolean remove(Object o) {
 793         return removeFirstOccurrence(o);
 794     }
 795 
 796     /**
 797      * Returns the number of elements in this deque.
 798      *
 799      * @return the number of elements in this deque
 800      */
 801     public int size() {
 802         final ReentrantLock lock = this.lock;
 803         lock.lock();
 804         try {
 805             return count;
 806         } finally {
 807             lock.unlock();
 808         }
 809     }
 810 
 811     /**
 812      * Returns {@code true} if this deque contains the specified element.
 813      * More formally, returns {@code true} if and only if this deque contains
 814      * at least one element {@code e} such that {@code o.equals(e)}.
 815      *
 816      * @param o object to be checked for containment in this deque
 817      * @return {@code true} if this deque contains the specified element
 818      */
 819     public boolean contains(Object o) {
 820         if (o == null) return false;
 821         final ReentrantLock lock = this.lock;
 822         lock.lock();
 823         try {
 824             for (Node<E> p = first; p != null; p = p.next)
 825                 if (o.equals(p.item))
 826                     return true;
 827             return false;
 828         } finally {
 829             lock.unlock();
 830         }
 831     }
 832 
 833     /*
 834      * TODO: Add support for more efficient bulk operations.
 835      *
 836      * We don‘t want to acquire the lock for every iteration, but we
 837      * also want other threads a chance to interact with the
 838      * collection, especially when count is close to capacity.
 839      */
 840 
 841 //     /**
 842 //      * Adds all of the elements in the specified collection to this
 843 //      * queue.  Attempts to addAll of a queue to itself result in
 844 //      * {@code IllegalArgumentException}. Further, the behavior of
 845 //      * this operation is undefined if the specified collection is
 846 //      * modified while the operation is in progress.
 847 //      *
 848 //      * @param c collection containing elements to be added to this queue
 849 //      * @return {@code true} if this queue changed as a result of the call
 850 //      * @throws ClassCastException            {@inheritDoc}
 851 //      * @throws NullPointerException          {@inheritDoc}
 852 //      * @throws IllegalArgumentException      {@inheritDoc}
 853 //      * @throws IllegalStateException         {@inheritDoc}
 854 //      * @see #add(Object)
 855 //      */
 856 //     public boolean addAll(Collection<? extends E> c) {
 857 //         if (c == null)
 858 //             throw new NullPointerException();
 859 //         if (c == this)
 860 //             throw new IllegalArgumentException();
 861 //         final ReentrantLock lock = this.lock;
 862 //         lock.lock();
 863 //         try {
 864 //             boolean modified = false;
 865 //             for (E e : c)
 866 //                 if (linkLast(e))
 867 //                     modified = true;
 868 //             return modified;
 869 //         } finally {
 870 //             lock.unlock();
 871 //         }
 872 //     }
 873 
 874     /**
 875      * Returns an array containing all of the elements in this deque, in
 876      * proper sequence (from first to last element).
 877      *
 878      * <p>The returned array will be "safe" in that no references to it are
 879      * maintained by this deque.  (In other words, this method must allocate
 880      * a new array).  The caller is thus free to modify the returned array.
 881      *
 882      * <p>This method acts as bridge between array-based and collection-based
 883      * APIs.
 884      *
 885      * @return an array containing all of the elements in this deque
 886      */
 887     @SuppressWarnings("unchecked")
 888     public Object[] toArray() {
 889         final ReentrantLock lock = this.lock;
 890         lock.lock();
 891         try {
 892             Object[] a = new Object[count];
 893             int k = 0;
 894             for (Node<E> p = first; p != null; p = p.next)
 895                 a[k++] = p.item;
 896             return a;
 897         } finally {
 898             lock.unlock();
 899         }
 900     }
 901 
 902     /**
 903      * Returns an array containing all of the elements in this deque, in
 904      * proper sequence; the runtime type of the returned array is that of
 905      * the specified array.  If the deque fits in the specified array, it
 906      * is returned therein.  Otherwise, a new array is allocated with the
 907      * runtime type of the specified array and the size of this deque.
 908      *
 909      * <p>If this deque fits in the specified array with room to spare
 910      * (i.e., the array has more elements than this deque), the element in
 911      * the array immediately following the end of the deque is set to
 912      * {@code null}.
 913      *
 914      * <p>Like the {@link #toArray()} method, this method acts as bridge between
 915      * array-based and collection-based APIs.  Further, this method allows
 916      * precise control over the runtime type of the output array, and may,
 917      * under certain circumstances, be used to save allocation costs.
 918      *
 919      * <p>Suppose {@code x} is a deque known to contain only strings.
 920      * The following code can be used to dump the deque into a newly
 921      * allocated array of {@code String}:
 922      *
 923      * <pre>
 924      *     String[] y = x.toArray(new String[0]);</pre>
 925      *
 926      * Note that {@code toArray(new Object[0])} is identical in function to
 927      * {@code toArray()}.
 928      *
 929      * @param a the array into which the elements of the deque are to
 930      *          be stored, if it is big enough; otherwise, a new array of the
 931      *          same runtime type is allocated for this purpose
 932      * @return an array containing all of the elements in this deque
 933      * @throws ArrayStoreException if the runtime type of the specified array
 934      *         is not a supertype of the runtime type of every element in
 935      *         this deque
 936      * @throws NullPointerException if the specified array is null
 937      */
 938     @SuppressWarnings("unchecked")
 939     public <T> T[] toArray(T[] a) {
 940         final ReentrantLock lock = this.lock;
 941         lock.lock();
 942         try {
 943             if (a.length < count)
 944                 a = (T[])java.lang.reflect.Array.newInstance
 945                     (a.getClass().getComponentType(), count);
 946 
 947             int k = 0;
 948             for (Node<E> p = first; p != null; p = p.next)
 949                 a[k++] = (T)p.item;
 950             if (a.length > k)
 951                 a[k] = null;
 952             return a;
 953         } finally {
 954             lock.unlock();
 955         }
 956     }
 957 
 958     public String toString() {
 959         final ReentrantLock lock = this.lock;
 960         lock.lock();
 961         try {
 962             Node<E> p = first;
 963             if (p == null)
 964                 return "[]";
 965 
 966             StringBuilder sb = new StringBuilder();
 967             sb.append(‘[‘);
 968             for (;;) {
 969                 E e = p.item;
 970                 sb.append(e == this ? "(this Collection)" : e);
 971                 p = p.next;
 972                 if (p == null)
 973                     return sb.append(‘]‘).toString();
 974                 sb.append(‘,‘).append(‘ ‘);
 975             }
 976         } finally {
 977             lock.unlock();
 978         }
 979     }
 980 
 981     /**
 982      * Atomically removes all of the elements from this deque.
 983      * The deque will be empty after this call returns.
 984      */
 985     public void clear() {
 986         final ReentrantLock lock = this.lock;
 987         lock.lock();
 988         try {
 989             for (Node<E> f = first; f != null; ) {
 990                 f.item = null;
 991                 Node<E> n = f.next;
 992                 f.prev = null;
 993                 f.next = null;
 994                 f = n;
 995             }
 996             first = last = null;
 997             count = 0;
 998             notFull.signalAll();
 999         } finally {
1000             lock.unlock();
1001         }
1002     }
1003 
1004     /**
1005      * Returns an iterator over the elements in this deque in proper sequence.
1006      * The elements will be returned in order from first (head) to last (tail).
1007      *
1008      * <p>The returned iterator is a "weakly consistent" iterator that
1009      * will never throw {@link java.util.ConcurrentModificationException
1010      * ConcurrentModificationException}, and guarantees to traverse
1011      * elements as they existed upon construction of the iterator, and
1012      * may (but is not guaranteed to) reflect any modifications
1013      * subsequent to construction.
1014      *
1015      * @return an iterator over the elements in this deque in proper sequence
1016      */
1017     public Iterator<E> iterator() {
1018         return new Itr();
1019     }
1020 
1021     /**
1022      * Returns an iterator over the elements in this deque in reverse
1023      * sequential order.  The elements will be returned in order from
1024      * last (tail) to first (head).
1025      *
1026      * <p>The returned iterator is a "weakly consistent" iterator that
1027      * will never throw {@link java.util.ConcurrentModificationException
1028      * ConcurrentModificationException}, and guarantees to traverse
1029      * elements as they existed upon construction of the iterator, and
1030      * may (but is not guaranteed to) reflect any modifications
1031      * subsequent to construction.
1032      *
1033      * @return an iterator over the elements in this deque in reverse order
1034      */
1035     public Iterator<E> descendingIterator() {
1036         return new DescendingItr();
1037     }
1038 
1039     /**
1040      * Base class for Iterators for LinkedBlockingDeque
1041      */
1042     private abstract class AbstractItr implements Iterator<E> {
1043         /**
1044          * The next node to return in next()
1045          */
1046          Node<E> next;
1047 
1048         /**
1049          * nextItem holds on to item fields because once we claim that
1050          * an element exists in hasNext(), we must return item read
1051          * under lock (in advance()) even if it was in the process of
1052          * being removed when hasNext() was called.
1053          */
1054         E nextItem;
1055 
1056         /**
1057          * Node returned by most recent call to next. Needed by remove.
1058          * Reset to null if this element is deleted by a call to remove.
1059          */
1060         private Node<E> lastRet;
1061 
1062         abstract Node<E> firstNode();
1063         abstract Node<E> nextNode(Node<E> n);
1064 
1065         AbstractItr() {
1066             // set to initial position
1067             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1068             lock.lock();
1069             try {
1070                 next = firstNode();
1071                 nextItem = (next == null) ? null : next.item;
1072             } finally {
1073                 lock.unlock();
1074             }
1075         }
1076 
1077         /**
1078          * Returns the successor node of the given non-null, but
1079          * possibly previously deleted, node.
1080          */
1081         private Node<E> succ(Node<E> n) {
1082             // Chains of deleted nodes ending in null or self-links
1083             // are possible if multiple interior nodes are removed.
1084             for (;;) {
1085                 Node<E> s = nextNode(n);
1086                 if (s == null)
1087                     return null;
1088                 else if (s.item != null)
1089                     return s;
1090                 else if (s == n)
1091                     return firstNode();
1092                 else
1093                     n = s;
1094             }
1095         }
1096 
1097         /**
1098          * Advances next.
1099          */
1100         void advance() {
1101             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1102             lock.lock();
1103             try {
1104                 // assert next != null;
1105                 next = succ(next);
1106                 nextItem = (next == null) ? null : next.item;
1107             } finally {
1108                 lock.unlock();
1109             }
1110         }
1111 
1112         public boolean hasNext() {
1113             return next != null;
1114         }
1115 
1116         public E next() {
1117             if (next == null)
1118                 throw new NoSuchElementException();
1119             lastRet = next;
1120             E x = nextItem;
1121             advance();
1122             return x;
1123         }
1124 
1125         public void remove() {
1126             Node<E> n = lastRet;
1127             if (n == null)
1128                 throw new IllegalStateException();
1129             lastRet = null;
1130             final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1131             lock.lock();
1132             try {
1133                 if (n.item != null)
1134                     unlink(n);
1135             } finally {
1136                 lock.unlock();
1137             }
1138         }
1139     }
1140 
1141     /** Forward iterator */
1142     private class Itr extends AbstractItr {
1143         Node<E> firstNode() { return first; }
1144         Node<E> nextNode(Node<E> n) { return n.next; }
1145     }
1146 
1147     /** Descending iterator */
1148     private class DescendingItr extends AbstractItr {
1149         Node<E> firstNode() { return last; }
1150         Node<E> nextNode(Node<E> n) { return n.prev; }
1151     }
1152 
1153     /**
1154      * Save the state of this deque to a stream (that is, serialize it).
1155      *
1156      * @serialData The capacity (int), followed by elements (each an
1157      * {@code Object}) in the proper order, followed by a null
1158      * @param s the stream
1159      */
1160     private void writeObject(java.io.ObjectOutputStream s)
1161         throws java.io.IOException {
1162         final ReentrantLock lock = this.lock;
1163         lock.lock();
1164         try {
1165             // Write out capacity and any hidden stuff
1166             s.defaultWriteObject();
1167             // Write out all elements in the proper order.
1168             for (Node<E> p = first; p != null; p = p.next)
1169                 s.writeObject(p.item);
1170             // Use trailing null as sentinel
1171             s.writeObject(null);
1172         } finally {
1173             lock.unlock();
1174         }
1175     }
1176 
1177     /**
1178      * Reconstitute this deque from a stream (that is,
1179      * deserialize it).
1180      * @param s the stream
1181      */
1182     private void readObject(java.io.ObjectInputStream s)
1183         throws java.io.IOException, ClassNotFoundException {
1184         s.defaultReadObject();
1185         count = 0;
1186         first = null;
1187         last = null;
1188         // Read in all elements and place in queue
1189         for (;;) {
1190             @SuppressWarnings("unchecked")
1191             E item = (E)s.readObject();
1192             if (item == null)
1193                 break;
1194             add(item);
1195         }
1196     }
1197 
1198 }
View Code

 

下面从ArrayBlockingQueue的创建,添加,取出,遍历这几个方面对LinkedBlockingDeque进行分析

1. 创建

下面以LinkedBlockingDeque(int capacity)来进行说明。

public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

说明:capacity是“链式阻塞队列”的容量。


LinkedBlockingDeque中相关的数据结果定义如下:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
// “双向队列”的表头
transient Node<E> first;
// “双向队列”的表尾
transient Node<E> last;
// 节点数量
private transient int count;
// 容量
private final int capacity;
// 互斥锁 , 互斥锁对应的“非空条件notEmpty”, 互斥锁对应的“未满条件notFull”
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

说明:lock是互斥锁,用于控制多线程对LinkedBlockingDeque中元素的互斥访问;而notEmpty和notFull是与lock绑定的条件,它们用于实现对多线程更精确的控制。

双向链表的节点Node的定义如下:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
static final class Node<E> {
    E item;       // 数据
    Node<E> prev; // 前一节点
    Node<E> next; // 后一节点

    Node(E x) { item = x; }
}
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

 

2. 添加

下面以offer(E e)为例,对LinkedBlockingDeque的添加方法进行说明。

public boolean offer(E e) {
    return offerLast(e);
}

offer()实际上是调用offerLast()将元素添加到队列的末尾。

offerLast()的源码如下:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    // 新建节点
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 将“新节点”添加到双向链表的末尾
        return linkLast(node);
    } finally {
        // 释放锁
        lock.unlock();
    }
}
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

说明:offerLast()的作用,是新建节点并将该节点插入到双向链表的末尾。它在插入节点前,会获取锁;操作完毕,再释放锁。

linkLast()的源码如下:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
private boolean linkLast(Node<E> node) {
    // 如果“双向链表的节点数量” > “容量”,则返回false,表示插入失败。
    if (count >= capacity)
        return false;
    // 将“node添加到链表末尾”,并设置node为新的尾节点
    Node<E> l = last;
    node.prev = l;
    last = node;
    if (first == null)
        first = node;
    else
        l.next = node;
    // 将“节点数量”+1
    ++count;
    // 插入节点之后,唤醒notEmpty上的等待线程。
    notEmpty.signal();
    return true;
}
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

说明:linkLast()的作用,是将节点插入到双向队列的末尾;插入节点之后,唤醒notEmpty上的等待线程。


3. 删除

下面以take()为例,对LinkedBlockingDeque的取出方法进行说明。

public E take() throws InterruptedException {
    return takeFirst();
}

take()实际上是调用takeFirst()队列的第一个元素。

takeFirst()的源码如下:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        E x;
        // 若“队列为空”,则一直等待。否则,通过unlinkFirst()删除第一个节点。
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        // 释放锁
        lock.unlock();
    }
}
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

说明:takeFirst()的作用,是删除双向链表的第一个节点,并返回节点对应的值。它在插入节点前,会获取锁;操作完毕,再释放锁。

unlinkFirst()的源码如下:

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();
    Node<E> f = first;
    if (f == null)
        return null;
    // 删除并更新“第一个节点”
    Node<E> n = f.next;
    E item = f.item;
    f.item = null;
    f.next = f; // help GC
    first = n;
    if (n == null)
        last = null;
    else
        n.prev = null;
    // 将“节点数量”-1
    --count;
    // 删除节点之后,唤醒notFull上的等待线程。
    notFull.signal();
    return item;
}
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

说明:unlinkFirst()的作用,是将双向队列的第一个节点删除;删除节点之后,唤醒notFull上的等待线程。

 

4. 遍历

下面对LinkedBlockingDeque的遍历方法进行说明。

public Iterator<E> iterator() {
    return new Itr();
}

iterator()实际上是返回一个Iter对象。

Itr类的定义如下:

private class Itr extends AbstractItr {
    // “双向队列”的表头
    Node<E> firstNode() { return first; }
    // 获取“节点n的下一个节点”
    Node<E> nextNode(Node<E> n) { return n.next; }
}

Itr继承于AbstractItr,而AbstractItr的定义如下:

 

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
private abstract class AbstractItr implements Iterator<E> {
    // next是下一次调用next()会返回的节点。
    Node<E> next;
    // nextItem是next()返回节点对应的数据。
    E nextItem;
    // 上一次next()返回的节点。
    private Node<E> lastRet;
    // 返回第一个节点
    abstract Node<E> firstNode();
    // 返回下一个节点
    abstract Node<E> nextNode(Node<E> n);

    AbstractItr() {
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        // 获取“LinkedBlockingDeque的互斥锁”
        lock.lock();
        try {
            // 获取“双向队列”的表头
            next = firstNode();
            // 获取表头对应的数据
            nextItem = (next == null) ? null : next.item;
        } finally {
            // 释放“LinkedBlockingDeque的互斥锁”
            lock.unlock();
        }
    }

    // 获取n的后继节点
    private Node<E> succ(Node<E> n) {
        // Chains of deleted nodes ending in null or self-links
        // are possible if multiple interior nodes are removed.
        for (;;) {
            Node<E> s = nextNode(n);
            if (s == null)
                return null;
            else if (s.item != null)
                return s;
            else if (s == n)
                return firstNode();
            else
                n = s;
        }
    }

    // 更新next和nextItem。
    void advance() {
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        lock.lock();
        try {
            // assert next != null;
            next = succ(next);
            nextItem = (next == null) ? null : next.item;
        } finally {
            lock.unlock();
        }
    }

    // 返回“下一个节点是否为null”
    public boolean hasNext() {
        return next != null;
    }

    // 返回下一个节点
    public E next() {
        if (next == null)
            throw new NoSuchElementException();
        lastRet = next;
        E x = nextItem;
        advance();
        return x;
    }

    // 删除下一个节点
    public void remove() {
        Node<E> n = lastRet;
        if (n == null)
            throw new IllegalStateException();
        lastRet = null;
        final ReentrantLock lock = LinkedBlockingDeque.this.lock;
        lock.lock();
        try {
            if (n.item != null)
                unlink(n);
        } finally {
            lock.unlock();
        }
    }
}
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

 

LinkedBlockingDeque示例

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   LinkedBlockingDeque是“线程安全”的队列,而LinkedList是非线程安全的。
 6  *
 7  *   下面是“多个线程同时操作并且遍历queue”的示例
 8  *   (01) 当queue是LinkedBlockingDeque对象时,程序能正常运行。
 9  *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingDequeDemo1 {
14 
15     // TODO: queue是LinkedList对象时,程序会出错。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingDeque<String>();
18     public static void main(String[] args) {
19     
20         // 同时启动两个线程对queue进行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “线程名” + "-" + "序号"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通过“Iterator”遍历queue。
47                 printAll();
48             }
49         }
50     }
51 }
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

(某一次)运行结果

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque
ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, tb2, tb2, ta2, 
ta2, 
ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, tb3, tb3, ta3, 
ta3, ta1, 
tb1, ta1, tb2, tb1, ta2, tb2, tb3, ta2, ta3, tb3, tb4, ta3, ta4, 
tb4, ta1, ta4, tb1, tb5, 
tb2, ta1, ta2, tb1, tb3, tb2, ta3, ta2, tb4, tb3, ta4, ta3, tb5, tb4, ta5, 
ta4, ta1, tb5, tb1, ta5, tb2, tb6, 
ta2, ta1, tb3, tb1, ta3, tb2, tb4, ta2, ta4, tb3, tb5, ta3, ta5, tb4, tb6, ta4, ta6, 
tb5, ta5, tb6, ta6,
Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

结果说明示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingDeque进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingDeque中;接着,遍历并输出LinkedBlockingDeque中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。
当queue是LinkedBlockingDeque对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。

 


更多内容

1. Java多线程系列--“JUC集合”01之 框架

2. Java多线程系列目录(共xx篇)

 

Java多线程系列--“JUC集合”09之 LinkedBlockingDeque

上一篇:Database: index


下一篇:Illustrator(AI)使用渐变工具制作梦幻的十字星光效果图实力教程