并发编程学习笔记(13)----ConcurrentLinkedQueue(非阻塞队列)和BlockingQueue(阻塞队列)原理

·  在并发编程中,我们有时候会需要使用到线程安全的队列,而在Java中如果我们需要实现队列可以有两种方式,一种是阻塞式队列。另一种是非阻塞式的队列,阻塞式队列采用锁来实现,而非阻塞式队列则是采用cas算法来保证线程安全的,接下来就让我们来看一下jdk中两种队列的实现方式。

1. ConcurrentLinkedQueue的实现原理

  顾名思义,这是一个基于链表结构的队列,它是一个先进先出的队列,当我们添加元素时,添加的元素链接到队列的尾部,当获取元素时返回队列的头部元素。

  先看添加队列时ConcurrentLinkedQueue的实现方法offer():

 public boolean offer(E e) {
//判断元素是否为空,为空则抛出异常
checkNotNull(e);
//创建一个新的节点 Node中的结构为item(数据) next(下一个节点)
final Node<E> newNode = new Node<E>(e);
//自旋,将节点添加到队列尾部
for (Node<E> t = tail, p = t;;) {
//获取到p的下一个节点,即是当前tail节点的的下一个节点
Node<E> q = p.next;
//如果q为空,表示q的下一个节点为null,直接将当前节点添加到队列尾部
if (q == null) {
// p is last node
//添加节点到队列尾部
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
//第一次进来时p == t,所以这里不会将当前节点设置成tail
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
            //多线程操作时候,由于poll时候会把老的head变为自引用,然后head的next变为新head,所以这里需要
            //重新找新的head,因为新的head后面的节点才是激活的节点
                p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}

  首先检查当前进来的元素是否为null,为null则抛出空指针异常。将tail赋值给t和p,无限循环,将当前p.next()赋值给q,如果为空,则将当前节点添加到队列尾部,添加成功则继续看p 是否等于t,第一次进来时是相等的,所以不会调用casTail()方法,直接返回true即可,因为在多线程的环境下,可能会出现线程进入循环时,q不等于空,此时看p == q是否成立,开始必然是不成立的,执行最后一个else,将tal赋值给t,并将q赋值给p,继续循环,此时p的next为空,则执行将节点添加到队列尾部的操作,并且此时的p 不等于t,更新尾节点tail位置为当前新添加的节点。

  出队方法poll():

 public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item; if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

  先来个死循环,再来一个死循环将head赋值给h,h 赋值给p, 得到头节点的值保存到item中,当item不为空时,通过cas算法将p节点的item设置为null,设置成功后,判断p是否等于h,等于则更新头节点,并将p.next()赋值给q,当p.next不为空时,头节点设置为q.否则设置为p,如果p.next()为null,则将更新头节点,返回null,如果p==q,自引用了,则重新找新的头节点。

  ConcurrentLinkedQueue主要就是利用了cas算法来保证了多线程环境下线程的安全,这样的算法其实性能来说是比较优的,速度相比较阻塞式的算法会更好一些。

2. BlockingQueue

  BlockingQueue是一个阻塞式队列,当tack()时队列为空,则它不会返回空或者是抛异常,线程此时会一直等待着,知道队列中存在数据时才会去取出数据,同时put()当队列满了的情况下,也会等待,知道其他线程取出队列中的数据,腾出空间之后再执行入队操作,其实它也提供了add/remove这样的非阻塞式方法的,当队列full或队列为空时,直接抛出异常,这里我们主要说的是它阻塞的情况,主要有put/take()方法。

  这里以BlockingQueue的实现类ArrayBlockingQueue源码进行分析。

  put():

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}

  其实这里的实现原理就是生产者与消费者使用Condition实现原理一样,notFull和notEmpty两个Condition,使用可中断锁,count作为元素个数标记,以一个数组来保存元素,当count等于数组长度时,使notFull等待,否则调用insert()方法添加元素。

  insert()方法:

 private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}

  这里很简单,将元素保存到数组中,count++,唤醒等待读取元素的线程,告诉它已经有数据了,可以获取了。

  take

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

  使用中断锁,当count为0时,表示当前队列中已经没有资源了,所以线程等待,否则就调用extract()返回数据。

  extract():

private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}

  得到takeIndex位置的元素,保存到x中,并且将下表位置的元素置为空,更改takeIndex,count--,唤醒可能由于队列满了的情况下被等待的添加元素的线程,返回x,这样就取到了当前的元素。

  这里的实现原理跟生产者/消费者模式一样,同时使用Condition来指定唤醒某些等待的线程,实现了多线程下队列的阻塞和线程安全。

原文 并发编程学习笔记(13)----ConcurrentLinkedQueue(非阻塞队列)和BlockingQueue(阻塞队列)原理

上一篇:机器学习/逻辑回归(logistic regression)/--附python代码


下一篇:HDU 5564 Clarke and digits 状压dp+矩阵加速