生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。下面是生产者/消费者问题的Java实现与详解:
import java.util.Stack; //import java.sql.Timestamp; public class ProducerConsumer { public static void main(String[] args) { //建立一个存储区 ProductStack ss = new ProductStack(); //添加生产者 Producer p1 = new Producer(ss); p1.setName("NO.1P"); Producer p2 = new Producer(ss); p2.setName("NO.2P"); Producer p3 = new Producer(ss); p3.setName("NO.3P"); Producer p4 = new Producer(ss); p4.setName("NO.4P"); Producer p5 = new Producer(ss); p5.setName("NO.5P"); //添加消费者 Consumer c1 = new Consumer(ss); c1.setName("NO.1C"); Consumer c2 = new Consumer(ss); c2.setName("NO.3C"); Consumer c3 = new Consumer(ss); c3.setName("NO.3C"); Consumer c4 = new Consumer(ss); c4.setName("NO.4C"); Consumer c5 = new Consumer(ss); c5.setName("NO.5C"); //打开存储区 ss.openStack(); //各角色开始进行生产或消费活动 p1.start(); p2.start(); p3.start(); c1.start(); c2.start(); //活动100ms后停止 try { Thread.sleep(100); ss.closeStack(); } catch (InterruptedException e) { e.printStackTrace(); } //1秒后重新开始活动 try { Thread.sleep(1000); ss.openStack(); } catch (InterruptedException e) { e.printStackTrace(); } p4.start(); p5.start(); c3.start(); c4.start(); c5.start(); //活动100ms后再次停止 try { Thread.sleep(100); ss.closeStack(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 产品 * @author Administrator * */ class Product { //已生产的产品总数 private static Integer totalProduct = 0; //产品id(生产日期+生产总数) private String id = null; public Product() { this.id = generateId(); } private String generateId(){ //类锁 由于所有线程的Product对象不同,故只能用类锁使任何使用该类对象的线程在此处进行同步 synchronized(Product.class){ ++totalProduct; // String genId = new Timestamp(System.currentTimeMillis()).toString().replaceAll( // "[-,:, ,.]", "") // + "-" + totalProduct; return String.valueOf(totalProduct); // return genId; } } public String getId() { return id; } } /** * 产品存储区 所有生产者和消费者共享 * @author Administrator * */ class ProductStack { //标志存储区是否打开 private boolean StackOpen = false; //存储区能容纳的最大产品数 private int capacity = 10; //当前的产品数 private int Current = 0; //存放产品的容器 private Product[] ProductArray = new Product[capacity]; //存储区关闭后使用的外部(备用)存储区 private Stack<Product> externalStack = new Stack<Product>(); /** * 默认构造方法 */ public ProductStack(){ } /** * 构造方法 * @param capacity 存储区容量 */ public ProductStack(int capacity){ ProductArray = new Product[capacity]; } /** * 存储产品 * @param pt 传入生产出的产品 */ /* * 对象锁,相当于方法中加synchronized(this){方法体} * 所有继承object的类都有一个锁标记(lock flag),当多个线程对同一个对象进行访问时, * 如果遇到同步代码块,会先检查锁标记是否已经打开:如果已打开,线程就被放到锁池中等待, * 等其他同步代码块释放了锁标记后才继续执行;如果未打开,则为对象添加一个锁标记,然后再执行。 */ public synchronized void push(Product pt) { while (StackOpen && Current == ProductArray.length) { try { /* * 线程释放锁标记,被放入等待池, 当同一对象中的其他同步代码块调用 * notify/notifyAll时,线程被放到锁池中,等其他同步代码块释放锁标记后执行。 */ this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if(!StackOpen){ externalStack.push(pt); System.out.println("由于存储区已关闭 ,第" + pt.getId() + "号导出至外部存储区!"); return; } ProductArray[Current] = pt; String threadName = Thread.currentThread().getName(); if(threadName.equals("externalStackImportThread")){ System.out.println("第" + pt.getId() + "号已从外部存储区导入!"); }else{ System.out.println(threadName + "生产的:第" + pt.getId() + "号已入库!"); } ++Current; /* 释放本对象等待池中的所有线程,进入锁池,等push释放锁标记后,共同竞争以进入running状态 * 此时,存储区至少有一个产品,所以通知在pop中等待的线程,等push结束后,可以相互竞争以继续执行 */ this.notifyAll(); } /** * 取出产品 * @return 返回从库中取出的产品 */ public synchronized Product pop() { while (StackOpen && Current == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if(!StackOpen){ return null; } --Current; //此时,存储区至少还有一个产品可存储,所以通知在push中等待的线程,等pop结束后,可以相互竞争以继续执行 this.notifyAll(); Product pt = ProductArray[Current]; System.out.println("----------" + Thread.currentThread().getName() + "消费的:第" + pt.getId() + "号已出库!"); return pt; } /** * 打开存储区 */ public void openStack(){ System.out.println("-----------------------存储区已打开!现有产品数: " + getCurrent() + "-----------------------"); StackOpen = true; //导入外部存储区中的产品 if(!externalStack.isEmpty()){ ImportExternalStack(); } } /** * 导入外部存储区 */ private void ImportExternalStack(){ //使用Runnable匿名类建立一个导入线程 Thread thread = new Thread(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub while(!externalStack.isEmpty()){ Product pt = externalStack.pop(); push(pt); } } }); thread.setName("externalStackImportThread"); thread.start(); } /** * 关闭存储区 */ public synchronized void closeStack(){ StackOpen = false; //通知所有正在等待的线程“查看”当前库状态 this.notifyAll(); System.out.println("-----------------------存储区已关闭!现有产品数: " + getCurrent() + "-----------------------"); } /** * 查询存储区是否打开 * @return */ public boolean isStackOpen() { return StackOpen; } /** * 查询存储区产品的最大存储数量 * @return */ public int getMaxProduct() { return capacity; } /** * 获得当前产品数量 * @return */ public int getCurrent() { return Current; } /** * 获得外部存储区产品数量 * @return */ public int getExternalStackCount() { return externalStack.size(); } } /** * 生产者 * @author Administrator * */ class Producer extends Thread{ ProductStack ss; public Producer(ProductStack ss) { this.ss = ss; } /** * 生产产品 */ public void run() { while (ss.isStackOpen()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } Product pt = new Product(); System.out.println(Thread.currentThread().getName() + "生产了:第" + pt.getId() + "号"); ss.push(pt); } System.out.println(Thread.currentThread().getName() +"已停止生产!"); } } /** * 消费者 * @author Administrator * */ class Consumer extends Thread{ ProductStack ss; public Consumer(ProductStack ss) { this.ss = ss; } /** * 消费产品 */ public void run() { while (ss.isStackOpen()) { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } Product pt = ss.pop(); if(pt != null){ System.out.println("----------" + Thread.currentThread().getName() + "消费了:第" + pt.getId() + "号");; } } System.out.println(Thread.currentThread().getName() + "已停止消费"); } }
输出(结果不唯一):
-----------------------存储区已打开!现有产品数: 0----------------------- NO.2P生产了:第1号 NO.3P生产了:第3号 NO.1P生产了:第2号 NO.2P生产的:第1号已入库! ----------NO.3C消费的:第1号已出库! NO.1P生产的:第2号已入库! ----------NO.3C消费了:第1号 NO.3P生产的:第3号已入库! ----------NO.1C消费的:第3号已出库! ----------NO.1C消费了:第3号 NO.2P生产了:第4号 NO.2P生产的:第4号已入库! ----------NO.1C消费的:第4号已出库! ----------NO.1C消费了:第4号 NO.1P生产了:第5号 NO.1P生产的:第5号已入库! NO.3P生产了:第6号 NO.3P生产的:第6号已入库! ----------NO.3C消费的:第6号已出库! ----------NO.3C消费了:第6号 ----------NO.1C消费的:第5号已出库! ----------NO.1C消费了:第5号 NO.3P生产了:第8号 NO.2P生产了:第9号 NO.1P生产了:第7号 NO.3P生产的:第8号已入库! NO.1P生产的:第7号已入库! NO.2P生产的:第9号已入库! ----------NO.3C消费的:第9号已出库! ----------NO.3C消费了:第9号 ----------NO.1C消费的:第7号已出库! ----------NO.1C消费了:第7号 NO.2P生产了:第10号 NO.1P生产了:第12号 NO.3P生产了:第11号 NO.2P生产的:第10号已入库! NO.3P生产的:第11号已入库! NO.1P生产的:第12号已入库! ----------NO.3C消费的:第12号已出库! ----------NO.3C消费了:第12号 ----------NO.1C消费的:第11号已出库! NO.3P生产了:第14号 NO.3P生产的:第14号已入库! NO.1P生产了:第13号 ----------NO.3C消费的:第14号已出库! NO.2P生产了:第15号 ----------NO.1C消费了:第11号 NO.1P生产的:第13号已入库! ----------NO.3C消费了:第14号 NO.2P生产的:第15号已入库! ----------NO.1C消费的:第15号已出库! ----------NO.1C消费了:第15号 NO.1P生产了:第17号 ----------NO.3C消费的:第13号已出库! NO.3P生产了:第16号 NO.1P生产的:第17号已入库! ----------NO.3C消费了:第13号 NO.2P生产了:第18号 NO.3P生产的:第16号已入库! NO.2P生产的:第18号已入库! NO.3P生产了:第19号 NO.3P生产的:第19号已入库! ----------NO.3C消费的:第19号已出库! ----------NO.3C消费了:第19号 NO.1P生产了:第20号 NO.1P生产的:第20号已入库! NO.2P生产了:第21号 NO.2P生产的:第21号已入库! ----------NO.1C消费的:第21号已出库! ----------NO.1C消费了:第21号 -----------------------存储区已关闭!现有产品数: 7----------------------- NO.3P生产了:第22号 由于存储区已关闭 ,第22号导出至外部存储区! NO.3P已停止生产! NO.3C已停止消费 NO.1P生产了:第23号 由于存储区已关闭 ,第23号导出至外部存储区! NO.1P已停止生产! NO.2P生产了:第24号 由于存储区已关闭 ,第24号导出至外部存储区! NO.2P已停止生产! NO.1C已停止消费 -----------------------存储区已打开!现有产品数: 7----------------------- 第24号已从外部存储区导入! 第23号已从外部存储区导入! 第22号已从外部存储区导入! ----------NO.5C消费的:第22号已出库! ----------NO.5C消费了:第22号 NO.4P生产了:第25号 NO.4P生产的:第25号已入库! ----------NO.3C消费的:第25号已出库! ----------NO.3C消费了:第25号 ----------NO.4C消费的:第23号已出库! ----------NO.4C消费了:第23号 NO.5P生产了:第26号 NO.5P生产的:第26号已入库! ----------NO.5C消费的:第26号已出库! ----------NO.5C消费了:第26号 NO.4P生产了:第27号 NO.4P生产的:第27号已入库! ----------NO.4C消费的:第27号已出库! ----------NO.4C消费了:第27号 ----------NO.3C消费的:第24号已出库! NO.5P生产了:第28号 ----------NO.3C消费了:第24号 NO.5P生产的:第28号已入库! ----------NO.5C消费的:第28号已出库! ----------NO.5C消费了:第28号 ----------NO.3C消费的:第20号已出库! ----------NO.3C消费了:第20号 NO.4P生产了:第29号 NO.4P生产的:第29号已入库! NO.5P生产了:第30号 NO.5P生产的:第30号已入库! ----------NO.4C消费的:第30号已出库! ----------NO.4C消费了:第30号 ----------NO.5C消费的:第29号已出库! NO.5P生产了:第31号 ----------NO.3C消费的:第18号已出库! ----------NO.3C消费了:第18号 ----------NO.4C消费的:第16号已出库! ----------NO.4C消费了:第16号 NO.5P生产的:第31号已入库! ----------NO.5C消费了:第29号 NO.4P生产了:第32号 NO.4P生产的:第32号已入库! ----------NO.3C消费的:第32号已出库! NO.5P生产了:第33号 ----------NO.5C消费的:第31号已出库! ----------NO.5C消费了:第31号 ----------NO.3C消费了:第32号 NO.4P生产了:第34号 ----------NO.4C消费的:第17号已出库! ----------NO.4C消费了:第17号 NO.4P生产的:第34号已入库! NO.5P生产的:第33号已入库! ----------NO.5C消费的:第33号已出库! ----------NO.5C消费了:第33号 ----------NO.3C消费的:第34号已出库! ----------NO.3C消费了:第34号 ----------NO.4C消费的:第10号已出库! NO.4P生产了:第35号 NO.5P生产了:第36号 ----------NO.4C消费了:第10号 NO.4P生产的:第35号已入库! NO.5P生产的:第36号已入库! ----------NO.5C消费的:第36号已出库! ----------NO.5C消费了:第36号 ----------NO.3C消费的:第35号已出库! ----------NO.3C消费了:第35号 NO.4P生产了:第37号 NO.4P生产的:第37号已入库! ----------NO.4C消费的:第37号已出库! ----------NO.4C消费了:第37号 NO.5P生产了:第38号 NO.5P生产的:第38号已入库! -----------------------存储区已关闭!现有产品数: 3----------------------- NO.5C已停止消费 NO.3C已停止消费 NO.4P生产了:第39号 由于存储区已关闭 ,第39号导出至外部存储区! NO.4P已停止生产! NO.4C已停止消费 NO.5P生产了:第40号 由于存储区已关闭 ,第40号导出至外部存储区! NO.5P已停止生产!