非阻塞队列,需要考虑到:
1.并发中的同步
2.线程间通信
public class Quene_Pro_Con
{
//定义队列大小
private static int size = 10;
//非阻塞队列
private static PriorityQueue<Integer> quene = new PriorityQueue<Integer>(); public static void main(String[] args)
{
Quene_Pro_Con test = new Quene_Pro_Con();
Producer pro = test.new Producer();
Consumer con = test.new Consumer();
pro.start();
con.start();
}
//内部类生产者
class Producer extends Thread{
public void run(){
while(true){
try
{
Thread.sleep(300);
}
catch (InterruptedException e1)
{
e1.printStackTrace();
}
synchronized (quene)//获取队列对象的锁
{
//判断:队列满了不生产,持续(用while)等待直到被消费
while (quene.size() == size)
{
System.out.println("【生产者】队列满了,等待消费...");
try
{
quene.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
//队列未满:生产一个产品
quene.offer(1);
//如果消费者处于等待,唤醒
quene.notify();
System.out.println("【生产者】生产一个产品,当前队列中产品数量:" + quene.size());
}
}
}
}
//内部类消费者
class Consumer extends Thread{
public void run(){
while(true){
try
{
Thread.sleep(500);
}
catch (InterruptedException e1)
{
e1.printStackTrace();
}
synchronized (quene)//获取队列对象的锁
{
//判断:队列满了不生产,持续(用while)等待直到被消费
while (quene.size() == 0)
{
System.out.println("【消费者】:队列空了,等待生产...");
try
{
quene.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
//队列不空:消费一个产品 每次移走队首元素
quene.poll();
//如果消生产者等待,唤醒
quene.notify();
System.out.println("【消费者】消费了1个产品,当前队列中产品数量:" + quene.size());
}
}
}
}
}