高并发编程-队列-BlockingQueue-ArrayBlockingQueue

高并发编程-队列-BlockingQueue-ArrayBlockingQueue

在介绍ArrayBlockingQueue之前,为了让大家更好的理解这个知识点,先把Queue和BlockingQueue的相关知识做个简单的介绍

一、Queue队列接口

  Queue继承于Collection数据集合,Queue内部主要方法有六个,下面依次对着六个方法做简单介绍

public interface Queue<E> extends Collection<E>{

  //本方法为向队列中添加元素,如果队列中空间够用,元素插入队列返回true,如果空间不够用将会抛出 IllegalStateException异常
  boolean add(E e);

  //向队列中添加元素,添加成功返回true,如果队列已满,返回false
  boolean offer(E e);

  //删除队首元素并删除,如果队列为空将会抛NoSuchElementException 异常
   E remove();
  
     // 返回并删除队首元素,如果队列为空则返回null     
    E poll();

     //返回首部元素但不删除,如果队列为空则抛出异常NoSuchElementException
    E element();  

    // 返回首部元素但不删除,如果队列为空则返回null
    E peek();   
}

二、BlockingQueue接口 

  BlockingQueue叫做阻塞队列,这个队列继承自Queue,所以也就继承了Queue中的方法,同时又扩展出了自己的两个附加操作队列

  • 支出阻塞的插入方法put,当队列满员时,就会阻塞插入队列,直到队列元素被消费,才插入
  • 支出阻塞的移除方法take,当队列为空时,移除线程将会等待,直到队列不为空时候,才激活消费队列中的元素    
public interface BlockingQueue<E> extends Queue<E> {
    /**
     *同上*/
    boolean add(E e);
    /**
     *同上*/
    boolean offer(E e);
    /**
     *队列不满时候正常插入,当队列数据满时,阻塞。直到队列不满时候再次插入数据,支持中断*/
    void put(E e) throws InterruptedException;
    /**
     * 向队列中插入数据,设置阻塞时间,如果超过阻塞时间还没有插入数据则返回false*/
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 队列中有数据则获取数据并删除,如果队列中没有数据则阻塞等待,直到有数据再次获取数据*/
    E take() throws InterruptedException;
    /**
     *获取队列中的数据,如果有数据则获取数据,如果没有数据则阻塞,超过阻塞时间则返回null*/
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 同上*/
    int poll();
    /**
     * 同上*/
    boolean remove(Object o);
}

BlockingQueue 方法总结

  如果队列数据满了或者空时,各方法返回的结果总结

  1. 抛出异常 add、remove、element
  2. 返回结果但不抛出异常 offer poll peek
  3. 阻塞 put take 

三、ArrayBlockingQueue

  ArrayBlockingQueue是有限空间的队列,又叫做有界队列,所以在初始化时需要制定容量大小。其底层所采用的是数组方式的数据存储,利用ReentrantLock所来保证它的线程安全。

  使用场景:用于生产者和消费者的模式,当生产者和消费者的速度基本匹配时,选用ArrayBlockingQueue是比较合适的,如果消费者速度大于生产者,队列为空,消费者线程就会被阻塞,反之生产者就会被阻塞。

  并发的处理:ArrayBlockingQueue采用的是ReentrantLock锁,入队出队时用的同一个所,这样产生的问题就是入队和出队时不能并发执行,特高并发下性能不是很好

ArrayBlockingQueue部分源码介绍

内部主要是一把ReentrantLock锁,产生的两个条件队列 notFull  notFull

    //数据元素
    final Object[] items;

    //消费下一个元素
    int takeIndex;

   //插入下一个元素
    int putIndex;

   //元素的总数
    int count;   

    //定义全局的锁
    final ReentrantLock lock;

   //当队列为空时,消费者需要等待
    private final Condition notEmpty;

    //当队列满时,生产者需要等待
    private final Condition notFull;
//构造方法,fair如不传数据,默认是false
 public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
       //定义数据大写      
        this.items = new Object[capacity];
        //创建全局锁,false默认为非公平锁, true为公平锁
        lock = new ReentrantLock(fair);
        //获取锁的条件队列
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }    

put 数据时的源码介绍

//阻塞队列插入
public void put(E e) throws InterruptedException {
//检查元素是否为空,如果为空直接抛出异常    
        checkNotNull(e);
    //获取锁
        final ReentrantLock lock = this.lock;
    //加入中断锁
        lock.lockInterruptibly();
        try {
        //元素个数和数组长度相等,说明队列已满,生产者阻塞
            while (count == items.length)
                notFull.await();
          //如果没有满,则放入队列中  
            enqueue(e);
        } finally {
        //释放锁
            lock.unlock();
        }
    }
//入队操作
 private void enqueue(E x) {
        // 获取当前队列
        final Object[] items = this.items;
        //将元素放入数组的下一格中    
        items[putIndex] = x;
       //当数据满了,指向数组头部,环形数组    
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
       //唤醒阻塞的消费者
        notEmpty.signal();
    }

出队take方法

//消费数据 
public E take() throws InterruptedException {
        //获取锁
        final ReentrantLock lock = this.lock;
       //加入中断锁 
       lock.lockInterruptibly();
        try {
           //如果没有数据,消费者阻塞
            while (count == 0)
                notEmpty.await();
            //否则返回消费数据并删除
            return dequeue();
        } finally {
            lock.unlock();
        }
    }



private E dequeue() {
   //获取数据队列
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    //获取takeindex位置的元素
    E x = (E) items[takeIndex];
    //把对应位置的数组制空
    items[takeIndex] = null;
    //如果环形数组到了最后 以为,在次指向首位
    if (++takeIndex == items.length)
        takeIndex = 0;
    //总数减去
    count--;
    if (itrs != null)
        itrs.elementDequeued();
   //唤醒生产线程
    notFull.signal();
    return x;
}

 

上一篇:Synchronized 看一篇就够了


下一篇:Java中的重量级锁