package com.lilei.pack09;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MySyncQueue<T> {
private Object[] ts;
int pos = -1;
public MySyncQueue(int size) {
ts = new Object[size];
}
public synchronized void push(T t) throws InterruptedException {
while (true) {
if (pos + 1 < ts.length) {
ts[++pos] = t;
notifyAll();
System.out.println(Thread.currentThread().getName() + " push,currentSize=" + (pos + 1));
return;
} else {
wait();
}
}
}
public synchronized T pop() throws InterruptedException {
while (true) {
if (pos >= 0) {
@SuppressWarnings("unchecked")
T t = (T) ts[pos--];
notifyAll();
System.out.println(Thread.currentThread().getName() + " pop,currentSize=" + (pos + 1));
return t;
} else {
wait();
}
}
}
public static class Inner {
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(30);
final MySyncQueue<Inner> queue = new MySyncQueue<Inner>(15);
int repeat = 1000;
while (repeat-->0) {
for (int i = 0; i < 15; i++) {
es.execute(new Runnable() {
public void run() {
try {
queue.pop();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
for (int i = 0; i < 15; i++) {
es.execute(new Runnable() {
public void run() {
try {
queue.push(new Inner());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
es.shutdown();
}
}