@TOC[目录]
BlockingQueue用法
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:
一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。
BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 | 特定值 | 阻塞 | 超时 |
---|---|---|---|
插入 | add(o) | offer(o) | put(o) |
移除 | remove(o) | poll(o) | take(o) |
检查 | element(o) | peek(o) |
四组不同的行为方式解释:
- 抛异常:如果试图的操作无法立即执行,抛一个异常。
- 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。
可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。
BlockingQueue 是个接口,你需要使用它的实现之一来使用 BlockingQueue。java.util.concurrent 具有以下 BlockingQueue 接口的实现(Java 6):
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockingQueue
- PriorityBlockingQueue
- SynchronousQueue
ArrayBlockingQueue
数组阻塞队列 ArrayBlockingQueue
ArrayBlockingQueue 类实现了 BlockingQueue 接口。 ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
UML
Ex.1
public class ArrayBlockingQueueEx {
/**
* 主线程
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
/**
* 消费者
*/
static class Consumer implements Runnable {
protected BlockingQueue queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (Throwable e) {
e.printStackTrace();
}
}
}
/**
* 生产者
*/
static class Producer implements Runnable {
protected BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
Ex.1.out
1
2
3
DelayQueue
延迟队列 DelayQueue
DelayQueue 实现了 BlockingQueue 接口。 DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现
java.util.concurrent.Delayed 接口。
DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。
传递给 getDelay 方法的 getDelay 实例是一个枚举类型,它表明了将要延迟的时间段。
TimeUnit 枚举将会取以下值:
枚举值 | 含义 |
---|---|
DAYS | 天 |
HOURS | 小时 |
MINUTES | 分钟 |
SECONDS | 秒 |
MILLISECONDS | 毫秒 |
MICROSECONDS | 微秒 |
NANOSECONDS | 纳秒 |
正如你所看到的,Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。这个可能在对 DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
UML
Ex.2
public class DelayQueueEx {
/**
* 测试Delay属性
*
* @param args
*/
public static void main(String[] args) {
DelayQueue<DelayElement> queue = new DelayQueue<>();
for (int i = 0; i < 10; i++) {
long delay = Double.valueOf(Math.random() * 10000).longValue();
queue.put(new DelayElement(delay, "task:" + i));
}
DelayElement ele = null;
while (true) {
try {
if ((ele = queue.take()) != null) {
System.out.println(ele.toString());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 延迟元素
*/
@Getter
@ToString
static class DelayElement implements Delayed {
/**
* 延迟时间
*/
long delay;
/**
* 到期时间
*/
long expire;
/**
* 任务名称
*/
String name;
public DelayElement(long delay, String name) {
this.delay = delay;
this.name = name;
this.expire = System.currentTimeMillis() + delay;
}
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
* 剩余时间 = 到期时间 - 当前时间
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
* than, equal to, or greater than the specified object.
*
* <p>The implementor must ensure <tt>sgn(x.compareTo(y)) ==
* -sgn(y.compareTo(x))</tt> for all <tt>x</tt> and <tt>y</tt>. (This
* implies that <tt>x.compareTo(y)</tt> must throw an exception iff
* <tt>y.compareTo(x)</tt> throws an exception.)
*
* <p>The implementor must also ensure that the relation is transitive:
* <tt>(x.compareTo(y)>0 && y.compareTo(z)>0)</tt> implies
* <tt>x.compareTo(z)>0</tt>.
*
* <p>Finally, the implementor must ensure that <tt>x.compareTo(y)==0</tt>
* implies that <tt>sgn(x.compareTo(z)) == sgn(y.compareTo(z))</tt>, for
* all <tt>z</tt>.
*
* <p>It is strongly recommended, but <i>not</i> strictly required that
* <tt>(x.compareTo(y)==0) == (x.equals(y))</tt>. Generally speaking, any
* class that implements the <tt>Comparable</tt> interface and violates
* this condition should clearly indicate this fact. The recommended
* language is "Note: this class has a natural ordering that is
* inconsistent with equals."
*
* <p>In the foregoing description, the notation
* <tt>sgn(</tt><i>expression</i><tt>)</tt> designates the mathematical
* <i>signum</i> function, which is defined to return one of <tt>-1</tt>,
* <tt>0</tt>, or <tt>1</tt> according to whether the value of
* <i>expression</i> is negative, zero or positive.
*
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(Delayed o) {
return Long.valueOf(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
}
}
}
Ex.2.out
DelayQueueEx.DelayElement(delay=1167, expire=1566299775344, name=task:6)
DelayQueueEx.DelayElement(delay=1701, expire=1566299775877, name=task:1)
DelayQueueEx.DelayElement(delay=3548, expire=1566299777725, name=task:7)
DelayQueueEx.DelayElement(delay=6072, expire=1566299780249, name=task:9)
DelayQueueEx.DelayElement(delay=6701, expire=1566299780878, name=task:5)
DelayQueueEx.DelayElement(delay=7510, expire=1566299781686, name=task:0)
DelayQueueEx.DelayElement(delay=7652, expire=1566299781829, name=task:8)
DelayQueueEx.DelayElement(delay=8476, expire=1566299782653, name=task:3)
DelayQueueEx.DelayElement(delay=8771, expire=1566299782948, name=task:2)
DelayQueueEx.DelayElement(delay=8976, expire=1566299783153, name=task:4)
LinkedBlockingQueue
链阻塞队列 LinkedBlockingQueue
LinkedBlockingQueue 类实现了 BlockingQueue 接口。
LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
UML
Ex.3
public class LinkedBlockingQueueEx {
/**
* 链阻塞队列
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new LinkedBlockingQueue();
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
Ex.3.out
1
2
3
PriorityBlockingQueue
具有优先级的阻塞队列 PriorityBlockingQueue
PriorityBlockingQueue 类实现了 BlockingQueue 接口。
PriorityBlockingQueue 是一个***的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入
null 值。
所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的
Comparable 实现。
注意
:
- PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。
- 如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。
public class PriorityBlockingQueueEx {
/**
* 测试优先级队列 PriorityBlockingQueue
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Ele> queue = new PriorityBlockingQueue<>();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
int value = Double.valueOf(Math.random() * 10).intValue();
try {
queue.put(new Ele(value));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(3000);
new Thread(() -> {
while (true) {
try {
System.out.println(queue.take().toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
@Getter
@AllArgsConstructor
@ToString
static class Ele implements Comparable<Ele> {
int value;
@Override
public int compareTo(Ele o) {
return this.getValue() - o.getValue();
}
}
}
PriorityBlockingQueueEx.Ele(value=1)
PriorityBlockingQueueEx.Ele(value=2)
PriorityBlockingQueueEx.Ele(value=2)
PriorityBlockingQueueEx.Ele(value=3)
PriorityBlockingQueueEx.Ele(value=3)
PriorityBlockingQueueEx.Ele(value=6)
PriorityBlockingQueueEx.Ele(value=6)
PriorityBlockingQueueEx.Ele(value=7)
PriorityBlockingQueueEx.Ele(value=8)
PriorityBlockingQueueEx.Ele(value=9)
SynchronousQueue
同步队列 SynchronousQueue
SynchronousQueue 类实现了 BlockingQueue 接口。
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。
public class SynchronousQueueEx {
/**
* 同步队列测试
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Ele> queue = new SynchronousQueue<>();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
int value = Double.valueOf(Math.random() * 10).intValue();
try {
Ele ele = new Ele(value);
queue.put(ele);
System.out.println(String.format("[%s] try put: %s", Thread.currentThread().getName(), ele.toString()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread1.start();
Thread.sleep(3000);
Thread thread2 = new Thread(() -> {
while (true) {
try {
Ele take = queue.take();
System.out.println(String.format("[%s] try take: %s", Thread.currentThread().getName(), take.toString()));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread2.start();
}
@Getter
@AllArgsConstructor
@ToString
static class Ele {
int value;
}
}
[Thread-0] try put: SynchronousQueueEx.Ele(value=9)
[Thread-1] try take: SynchronousQueueEx.Ele(value=9)
[Thread-1] try take: SynchronousQueueEx.Ele(value=5)
[Thread-0] try put: SynchronousQueueEx.Ele(value=5)
[Thread-0] try put: SynchronousQueueEx.Ele(value=2)
[Thread-1] try take: SynchronousQueueEx.Ele(value=2)
[Thread-0] try put: SynchronousQueueEx.Ele(value=4)
[Thread-1] try take: SynchronousQueueEx.Ele(value=4)
[Thread-0] try put: SynchronousQueueEx.Ele(value=6)
[Thread-1] try take: SynchronousQueueEx.Ele(value=6)
[Thread-1] try take: SynchronousQueueEx.Ele(value=9)
[Thread-0] try put: SynchronousQueueEx.Ele(value=9)
[Thread-1] try take: SynchronousQueueEx.Ele(value=3)
[Thread-0] try put: SynchronousQueueEx.Ele(value=3)
[Thread-1] try take: SynchronousQueueEx.Ele(value=8)
[Thread-0] try put: SynchronousQueueEx.Ele(value=8)
[Thread-1] try take: SynchronousQueueEx.Ele(value=7)
[Thread-0] try put: SynchronousQueueEx.Ele(value=7)
[Thread-0] try put: SynchronousQueueEx.Ele(value=7)
[Thread-1] try take: SynchronousQueueEx.Ele(value=7)
从日志输出可以看出同步队列中的元素每次取完后才会再次put进去,如果没被取走的情况下,put会被阻塞。