并发队列常见于生产者消费者的场景,例如log4j2,logback的异步日志Appender,例如类似于zipkin-reporter分布式链路日志的收集上送,以上二者之所以要使用并发队列的很大原因都是因为日志异步化处理,避免影响业务逻辑接口的吞吐量,这二者其实比较相近,都是典型的MPSC场景:multiple producers and single consumer.
当你的程序引入了异步队列这个机制,你就需要考虑到一些问题,比如如何控制队列的长度,是否会带来额外的内存负担,队列满了的策略:是阻塞业务线程还是丢弃,机器突然宕机了,队列里的数据怎么办?有好处,必要也有坏处,需要找到一个平衡点来抉择是否引入异步队列,不论是log4j2或者是logback,异步Appender并不是默认选项,大多数应用不需要考虑异步化日志,除非你的应用真正到了需要异步化日志来提高吞吐量的地步了,本文不做过多的讨论,有兴趣的可以看看相关文章,比如log42官网关于异步appender的介绍:https://logging.apache.org/log4j/2.x/manual/async.html,或者直接查看相关实现,比如可以看一下logback的AsyncAppender,代码比较简单,是一个典型的异步队列使用场景,使用了JUC下的ArrayBlockingQueue,log4j2则引入了第三方框架Disruptor。
回到队列本身,并发队列,最大的要解决的问题控制并发下的正确性,通常有两种机制,1.加锁(阻塞的) 2.CAS(lock-free),前者通常就是常见的BlockingQueue,JUC下的实现为ArrayBlockingQueue以及LinkedBlockingQueue,后者JUC下提供了ConcurrentLinkedQueue这个类,以及用的比较多的第三方框架:JCTools以及Disruptor。
本文第一个要探讨的点:BlockingQueue 以及 JUC包下ArrayBlockingQueue以及LinkedBlockingQueue其异同,以及使用场景。
BlockingQueue,阻塞队列,基于锁实现,生产者,或者是消费者线程在并发竞争时可能会拿不到锁,被挂起进入BLOCK状态(不一定会挂起,可能会有几次自旋,JVM有相关的优化),这个代价是比较昂贵的,会比较大程度影响队列的吞吐量,如何最大程度减少锁竞争是必要的。
LinkedBlockingQueue使用了哪些机制来减少锁竞争?
1.使用了"two lock queue" algorithm,该算法的主要作用是生产者和消费者之间不会相互阻塞,竞争的是不同的锁,两把锁,两个条件变量,借助AtomicInteger维护count变量;
2.cascading notifies:级联通知。
以上两点查看源代码就可以有一个比较清晰的认识,生产者进行enqueue,入队操作,消费者进行dequeue,出队操作,前者操作尾节点,后者操作头节点,所以可以用两把锁去控制,这样生产者消费者之间不会有竞争,唯一一个生产者消费者都要更新的变量:count,使用原子类去维护,确保其线程安全性,而对于级联通知,即:When a put notices that it has enabled at least one take, it signals taker. That taker in turn signals others if more items have been entered since the signal.(摘自注释),这样子最大程度避免线程的唤醒再竞争,也就是避免了锁的竞争,看一下put的代码,加了相关注释:
Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 队列满,wait while (count.get() == capacity) { notFull.await(); } // 被唤醒了,入队 enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) // 如果发现生产了之后,队列还是没满,那么继续通知别的生产者来消费 notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 如果发现,生产之前,队列是空的,那么通知消费者来消费 // 就好比说:我现在已经生产了一个了,你们可以来消费了。(此时生产者肯定是wait状态) signalNotEmpty();
ArrayBlockingQueue呢?ArrayBlockingQueue底层使用的是数组,这意味着它必定是有界的,并且是一个会循环利用的数组,维护了两个Index,类似于LBQ的head和tail节点,但是,它没有使用LBQ的双锁算法,这个是令人比较困惑的,全局使用了一把锁,也就是说生产者和消费者之间是会互相阻塞的,所以可以看到ABQ的count变量就是一个int变量,因为对于count的更新都是在同步代码块中。
为什么不使用两把锁?很显然,两把锁的实现的LBQ的吞吐量是高于ABQ的。网上有一些相关的讨论(主要在*上,google关键字即可),有人认为LBQ头尾节点是两个单独的节点,所以可以分开锁,而ABQ底层是一个数组,所以必须是一把锁,这显然是站不住脚的,数组的头尾依旧可以使用两把锁去控制,可以做一个简单的实现(参考网上,因为也有一些人有同样的疑问),并且做一个简单的吞吐量测试,测试代码使用的是Java并发编程实践上使用的代码,在我的电脑上测试,双锁的ABQ确实有更好的吞吐量,而且双锁实现的ABQ应该没有线程安全问题(我个人认为):
public class ArrayBlockingQueueTwoLocks implements BlockingQueue { final Object[] items; int takeIndex; int putIndex; // count使用原子类 private final AtomicInteger count = new AtomicInteger(); // 类似于LBQ,双锁,双条件变量 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); public void put(Object e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == items.length) { notFull.await(); } enqueue(e); c = count.getAndIncrement(); if (c + 1 < items.length) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public Object take() throws InterruptedException { Object x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == items.length) signalNotFull(); return x; } private void enqueue(Object x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; } private Object dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") Object x = (Object) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } }
完整代码,包括吞吐量测试的代码,我贴在这里了:https://pastebin.com/QpW9dsVc,
不论怎么样,JUC下提供出来了这两个实现,ABQ不使用双锁可能只有问Doug lea本人才知道了,事实上有相关的邮件,已经有人咨询了这个问题,Doug lea并没有回答。
接下来探讨一下这两个阻塞队列的异同以及各自的使用场景。
1.ABQ是有界的,LBQ可以有界也可以没有,所以如果需要一个*的队列,那只能选择LBQ。
2.ABQ底层基于数组,且是预分配的,这意味着实现他就会占据一部分内存,而之后的入队出队则是数组引用的赋值,LBQ则是动态创建节点,这点上看,ABQ显然占优
3.LBQ双锁,ABQ单锁,吞吐量前者大于后者,这是毋庸置疑的。
还值得一提的是,cache伪共享,ABQ和LBQ是没有考虑到的,所谓伪共享就是两个变量被放到同一个缓存行上,改变了其中一个,导致这一行都Invalid了,具体的可以google。
具体怎么用,选什么,我也不知道。。这只有你自己结合自己的使用场景经过一系列基准测试,得出答案,下面的观点摘自公开邮件(https://concurrency.markmail.org/search/?q=ArrayBlockingQueue+two+lock#query:ArrayBlockingQueue%20two%20lock+page:1+mid:ow7jisexn67xjv5r+state:results),首先是Brian Goetz,百度了一下是Oracle的架构师,他的观点是:
In most cases, allocation is dirt cheap -- certainly cheaper than contention -- so in most cases LBQ is preferable. In RT environments, where memory is more constrained and GC pauses are less acceptable, ABQ may be more appropriate.Doug lea的观点:
Usually, when you are putting something into a queue, you will have just allocated that new something. And similarly, when you take something out you usually use it and then let it become garbage. In which case the extra allocation for a queue node is not going to make much difference in overall GC, so you might as well go for the better scalability of LinkedBlockingQueue. I think this is the most common use case. But, if you aren't allocating the things put into queues, and don't expect lots of threads to be contending when the queue is neither empty nor full, then ArrayBlockingQueue is likely to work better.简而言之,前者认为,动态分配节点不是什么事,吞吐量比较重要,而在RT(响应时间)环境中,ABQ可能更加可以接受,因为正如上面的第二点所说,预分配内存,入队出队只是引用的赋值,肯定会快于LBQ,且更加GC-friendly,而Doug lea大佬的回答就比较抽象了,说实话我也不是很能理解,这是我的个人理解:他认为如果你是实时创建一些对象,扔到队列里,那肯定是LBQ好,因为LBQ的缺点就是动态创建NODE,既然你自己都会创建,那么直接用它也没差别了,他又拥有更好的吞吐,而如果你扔到队列里的是一些以前就存在的对象,那ABQ更好。所以我真的不能理解logback为什么使用了ABQ而不是LBQ,他追求的不应该是吞吐吗? 以上是关于阻塞队列的讨论,接下来讨论一下lock-free的队列,抛去队列本身,使用锁,还是使用CAS,是一个亘古不变的话题,锁的劣势是线程的挂起和恢复是存在比较大的开销的,甚至大于程序本身,JDK之前版本的synchronized性能是比较差的,而且现在的JVM会有一些优化,比如当线程发现锁被占用时,不一定立马将自己挂起来,可能会自旋几次(这取决于之前这个线程对于锁的持有长短),一旦自旋之后都获取不到,那么就会到了操作系统这一层,将自己挂起来。 而现如今硬件是支持CAS这种乐观的机制的,JUC下也提供了一系列原子类,就是基于CAS实现的,通常来说,竞争很激烈的情况下,锁的吞吐可能会更好,因为大部分CAS可能都是在空转,竞争中等的情况下,CAS的优势就比较明显了,可以基于JHM做一个简单的测试,在我的windows上,使用10个线程进行某个数的++操作,CAS的吞吐是高于锁的,当我大幅度提高线程数,比如500个,那么锁的吞吐一般就高于CAS。回到队列本身,无锁的队列,JUC下是ConcurrentLinkedQueue,但是一般大家如果选择无锁队列的话,都倾向于选择Disruptor或者JCTools,这两者都避免了cache伪共享,使用了无锁算法,JCTools提供了一系列面向不同场景的队列,例如MPMC(多生产者多消费者),MPSC等,netty的NioEventLoop用于存储任务的队列使用的就是JCTools的MPSC,可以看下源码,但是说实话,我找不到在经典的生产者消费者使用场景中如何使用无锁队列,如果队列没数据,其实就是应该block住,netty NIOeventLoop中并不是那种经典的生产者消费者,如果熟悉源码的人可以知道,轮询线程很多时候是hang在selector上等待IO事件的,并不是一直在轮询任务队列,除非就是消费者自己基于一定策略轮询,比如sleep 50毫秒,再去看一下,但是极端情况下肯定会存在50ms的延迟,例如Disruptor就提供了一系列wait策略,block或者sleep等等。如果后续我找到lock-free队列很经典的使用场景的话,我再更新这篇文章吧,应该是我接触的太少了:)