高并发编程-队列-BlockingQueue-ArrayBlockingQueue
在介绍ArrayBlockingQueue之前,为了让大家更好的理解这个知识点,先把Queue和BlockingQueue的相关知识做个简单的介绍
一、Queue队列接口
Queue继承于Collection数据集合,Queue内部主要方法有六个,下面依次对着六个方法做简单介绍
public interface Queue<E> extends Collection<E>{ //本方法为向队列中添加元素,如果队列中空间够用,元素插入队列返回true,如果空间不够用将会抛出 IllegalStateException异常 boolean add(E e); //向队列中添加元素,添加成功返回true,如果队列已满,返回false boolean offer(E e); //删除队首元素并删除,如果队列为空将会抛NoSuchElementException 异常 E remove(); // 返回并删除队首元素,如果队列为空则返回null E poll(); //返回首部元素但不删除,如果队列为空则抛出异常NoSuchElementException E element(); // 返回首部元素但不删除,如果队列为空则返回null E peek(); }
二、BlockingQueue接口
BlockingQueue叫做阻塞队列,这个队列继承自Queue,所以也就继承了Queue中的方法,同时又扩展出了自己的两个附加操作队列
- 支出阻塞的插入方法put,当队列满员时,就会阻塞插入队列,直到队列元素被消费,才插入
- 支出阻塞的移除方法take,当队列为空时,移除线程将会等待,直到队列不为空时候,才激活消费队列中的元素
public interface BlockingQueue<E> extends Queue<E> { /** *同上*/ boolean add(E e); /** *同上*/ boolean offer(E e); /** *队列不满时候正常插入,当队列数据满时,阻塞。直到队列不满时候再次插入数据,支持中断*/ void put(E e) throws InterruptedException; /** * 向队列中插入数据,设置阻塞时间,如果超过阻塞时间还没有插入数据则返回false*/ boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 队列中有数据则获取数据并删除,如果队列中没有数据则阻塞等待,直到有数据再次获取数据*/ E take() throws InterruptedException; /** *获取队列中的数据,如果有数据则获取数据,如果没有数据则阻塞,超过阻塞时间则返回null*/ E poll(long timeout, TimeUnit unit) throws InterruptedException; /** * 同上*/ int poll(); /** * 同上*/ boolean remove(Object o); }
BlockingQueue 方法总结
如果队列数据满了或者空时,各方法返回的结果总结
- 抛出异常 add、remove、element
- 返回结果但不抛出异常 offer poll peek
- 阻塞 put take
三、ArrayBlockingQueue
ArrayBlockingQueue是有限空间的队列,又叫做有界队列,所以在初始化时需要制定容量大小。其底层所采用的是数组方式的数据存储,利用ReentrantLock所来保证它的线程安全。
使用场景:用于生产者和消费者的模式,当生产者和消费者的速度基本匹配时,选用ArrayBlockingQueue是比较合适的,如果消费者速度大于生产者,队列为空,消费者线程就会被阻塞,反之生产者就会被阻塞。
并发的处理:ArrayBlockingQueue采用的是ReentrantLock锁,入队出队时用的同一个所,这样产生的问题就是入队和出队时不能并发执行,特高并发下性能不是很好
ArrayBlockingQueue部分源码介绍
内部主要是一把ReentrantLock锁,产生的两个条件队列 notFull notFull
//数据元素 final Object[] items; //消费下一个元素 int takeIndex; //插入下一个元素 int putIndex; //元素的总数 int count; //定义全局的锁 final ReentrantLock lock; //当队列为空时,消费者需要等待 private final Condition notEmpty; //当队列满时,生产者需要等待 private final Condition notFull; //构造方法,fair如不传数据,默认是false public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //定义数据大写 this.items = new Object[capacity]; //创建全局锁,false默认为非公平锁, true为公平锁 lock = new ReentrantLock(fair); //获取锁的条件队列 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
put 数据时的源码介绍
//阻塞队列插入 public void put(E e) throws InterruptedException { //检查元素是否为空,如果为空直接抛出异常 checkNotNull(e); //获取锁 final ReentrantLock lock = this.lock; //加入中断锁 lock.lockInterruptibly(); try { //元素个数和数组长度相等,说明队列已满,生产者阻塞 while (count == items.length) notFull.await(); //如果没有满,则放入队列中 enqueue(e); } finally { //释放锁 lock.unlock(); } } //入队操作 private void enqueue(E x) { // 获取当前队列 final Object[] items = this.items; //将元素放入数组的下一格中 items[putIndex] = x; //当数据满了,指向数组头部,环形数组 if (++putIndex == items.length) putIndex = 0; count++; //唤醒阻塞的消费者 notEmpty.signal(); }
出队take方法
//消费数据 public E take() throws InterruptedException { //获取锁 final ReentrantLock lock = this.lock; //加入中断锁 lock.lockInterruptibly(); try { //如果没有数据,消费者阻塞 while (count == 0) notEmpty.await(); //否则返回消费数据并删除 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { //获取数据队列 final Object[] items = this.items; @SuppressWarnings("unchecked") //获取takeindex位置的元素 E x = (E) items[takeIndex]; //把对应位置的数组制空 items[takeIndex] = null; //如果环形数组到了最后 以为,在次指向首位 if (++takeIndex == items.length) takeIndex = 0; //总数减去 count--; if (itrs != null) itrs.elementDequeued(); //唤醒生产线程 notFull.signal(); return x; }