阻塞队列的原理

一、什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。

常见的有以下几种
(1)ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
(2)LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,而在未指明容量时,容量默认为Integer.MAX_VALUE。
(3)LinkedBlockingDeque: 使用双向队列实现的双端阻塞队列,双端意味着可以像普通队列一样FIFO(先进先出),可以以像栈一样FILO(先进后出)
(4)PriorityBlockingQueue: 一个支持优先级排序的*阻塞队列,对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较,跟时间没有任何关系,仅仅是按照优先级取任务。
(5)DelayQueue:同PriorityBlockingQueue,也是二叉堆实现的优先级阻塞队列。要求元素都实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
(6)SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

二、Condition的原理

阻塞队列实际上是使用了Condition来模拟线程间协作。
Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition。
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用

注意事项

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

三、put和take的原理

这里用ArrayBlockingQueue来举例
它带有的属性如下


// 存储队列元素的数组,是个循环数组
final Object[] items;

// 拿数据的索引,用于take,poll,peek,remove方法
int takeIndex;

// 放数据的索引,用于put,offer,add方法
int putIndex;

// 元素个数
int count;

// 可重入锁
final ReentrantLock lock;

// notEmpty条件对象,由lock创建
private final Condition notEmpty;

// notFull条件对象,由lock创建
private final Condition notFull;

1.数据的添加

put:阻塞添加

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 不允许元素为空
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加锁,保证调用put方法的时候只有1个线程
    try {
        while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里
            notFull.await(); // 线程阻塞并被挂起,同时释放锁
        insert(e); // 调用insert方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用put方法
    }
}

insert

private void insert(E x) {
    items[putIndex] = x; // 元素添加到数组里
    putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
    ++count; // 元素个数+1
    notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
}

2.数据的获取

take:通知模式
Condition notEmpty:队列为空,notEmpty会await
插入元素会signal

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
    try {
        while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
            notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
        return extract(); // 调用extract方法
    } finally {
        lock.unlock(); // 释放锁,让其他线程可以调用take方法
    }
}

extract

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
    items[takeIndex] = null; // 对应取索引上的数据清空
    takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0
    --count; // 元素个数-1
    notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
    return x; // 返回元素
}
上一篇:MySql数据库列表数据分页查询、全文检索API零代码实现


下一篇:Android驱动入门-LED--测试APP编写③