ArrayBlockingQueue:
一个阻塞队列,注意只要有Blocking,都是阻塞的,要阻塞,那容量必须是固定的,在构造方法中指定数量.
如果已经Full,那么put操作会一直等待有空位置出来。
如果已经Empty,那么take会一直等有新元素进来.
示例代码
package com.abc.test;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 注意:以下测试用例只能单独执行.
*/
public class ArrayBlockingQueueTest {
public static class MyElement {
final int order;
final String name;
public MyElement(String name, int order) {
this.name = name;
this.order = order;
}
}
ArrayBlockingQueue<MyElement> arrayBlockingQueue = new ArrayBlockingQueue<>(5);
//注意线程池数目不能设置为1个.
ExecutorService pools = Executors.newFixedThreadPool(10);
/**
* 先准备5个元素,供每个测试用例使用.
*/
@Before
public void init() {
arrayBlockingQueue.add(new MyElement("1", 1));
arrayBlockingQueue.add(new MyElement("3", 3));
arrayBlockingQueue.add(new MyElement("5", 5));
arrayBlockingQueue.add(new MyElement("10", 10));
arrayBlockingQueue.add(new MyElement("15", 15));
}
/**
* 只能放5个元素,继续add会报错.
*/
@Test(expected = IllegalStateException.class)
public void testFullAndAdd() {
arrayBlockingQueue.add(new MyElement("16", 16));
}
/**
* 只能放5个元素,offer不成功会返回false.
*/
@Test
public void testFullAndOffer() {
Assert.assertFalse(arrayBlockingQueue.offer(new MyElement("16", 16)));
}
/**
* poll.
*/
@Test
public void testPoll() throws InterruptedException {
MyElement element = arrayBlockingQueue.poll();
Assert.assertEquals(element.order, 15);
Assert.assertEquals(arrayBlockingQueue.size(), 4);
MyElement element2 = arrayBlockingQueue.poll();
Assert.assertEquals(element2.order, 1);
Assert.assertEquals(arrayBlockingQueue.size(), 3);
}
/**
* put的时候如果已经full会等待有多余位置出来.
*/
@Test
public void testFullAndPut() throws InterruptedException {
pools.execute(new Runnable() {
@Override
public void run() {
try {
arrayBlockingQueue.put(new MyElement("16", 16));
System.out.println("after put");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
pools.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
arrayBlockingQueue.poll();
System.out.println("after poll");
}
});
Thread.sleep(2000);
{
MyElement[] array = new MyElement[5];
arrayBlockingQueue.toArray(array);
MyElement last = array[array.length - 1];
Assert.assertEquals(last.order, 15);
}
pools.awaitTermination(8, TimeUnit.SECONDS);
{
MyElement[] array = new MyElement[5];
arrayBlockingQueue.toArray(array);
MyElement last = array[array.length - 1];
Assert.assertEquals(last.order, 16);
}
MyElement element = arrayBlockingQueue.poll();
Assert.assertEquals(element.order, 3);
Assert.assertEquals(arrayBlockingQueue.size(), 4);
}
/**
* take如果为空会等待.
*/
@Test
public void testTake() throws InterruptedException {
arrayBlockingQueue.clear();
pools.execute(new Runnable() {
@Override
public void run() {
try {
MyElement element = arrayBlockingQueue.take();
Assert.assertEquals(element.order, 100);
System.out.println("after put");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
pools.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
arrayBlockingQueue.put(new MyElement("100", 100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
pools.awaitTermination(8, TimeUnit.SECONDS);
}
}