手写MiniBlockingQueue阻塞队列
BlockingQueue接口
package com.xiaozhou;
public interface BlockingQueue<T> {
void put(T element) throws InterruptedException;
T take() throws InterruptedException;
}
MiniBlockingQueue
package com.xiaozhou;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MiniBlockingQueue<T> implements BlockingQueue<T>{
//底层是一个数组
private Object[] elementData;
private int size;
public MiniBlockingQueue(int size) {
this.elementData = new Object[size];
this.size = size;
}
//当前队列中的元素个数, put时的指针索引, take时的指针索引
private int count , putIndex, takeIndex;
private Lock lock = new ReentrantLock();
private Condition fullCondition = lock.newCondition();
private Condition emptyCondition = lock.newCondition();
@Override
public void put(T element) throws InterruptedException {
lock.lock();
try {
//当阻塞队列满时 该put的线程需要进入条件队列中等待
if (count == size){
fullCondition.await();
}
elementData[putIndex] = element;
putIndex++;
count++;
if (putIndex == size) putIndex = 0;
//通知take阻塞的线程来消费
emptyCondition.signal();
} finally {
lock.unlock();
}
}
@Override
public T take() throws InterruptedException {
lock.lock();
try {
//当队列为空时,需要进入条件队列中等待
if (count == 0){
emptyCondition.await();
}
Object element = elementData[takeIndex];
takeIndex++;
count--;
if (takeIndex == size) takeIndex = 0;
fullCondition.signal();
return (T) element;
} finally {
lock.unlock();
}
}
//测试
public static void main(String[] args) throws InterruptedException {
MiniBlockingQueue<String> queue = new MiniBlockingQueue<String>(3);
// 当阻塞队列中为空时take 会阻塞,直到有生产者放进去值才会take到
// new Thread(()->{
// try {
// String element = queue.take();
// System.out.println(Thread.currentThread().getName() + " 从队列中获取值为: " + element);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// },"A").start();
//
//
// TimeUnit.SECONDS.sleep(2);
//
// queue.put("zzp");
// 当阻塞队列中值满时,再去put则会阻塞,直到有消费者消费后才会put进去
new Thread(()->{
for (int i = 0; i < 4; i++) {
try {
queue.put(String.valueOf(i));
System.out.println(Thread.currentThread().getName() + "放入:" + String.valueOf(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
TimeUnit.SECONDS.sleep(2);
for (int i = 0; i < 4; i++) {
String res = queue.take();
System.out.println(Thread.currentThread().getName() + "获取到:" + res);
}
}
}
手写迷你版BlockingQueue阻塞队列