1 生产者消费者模式概述
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,
直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,
才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式
2 实现生产者消费者模式
产品类
package com.thread.pc.blockingqueue; import java.util.UUID; /**
* 产品类
*
* @author yyx 2018年12月22日
*/
public class Product {
private UUID proCode; // 产品唯一编码 public Product(UUID proCode) {
super();
this.proCode = proCode;
} public UUID getProCode() {
return proCode;
} public void setProCode(UUID proCode) {
this.proCode = proCode;
} }
生产者
package com.thread.pc.lockcondition; /**
* 生产者
* @author yyx 2019年1月5日
*/
public class Producer implements Runnable {
private Warehouse warehouse; public Producer(Warehouse warehouse) {
super();
this.warehouse = warehouse;
} @Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
warehouse.addProduct();
}
} }
消费者
package com.thread.pc.lockcondition; /**
* 消费者
*
* @author yyx 2019年1月5日
*/
public class Consumer implements Runnable {
private Warehouse warehouse; public Consumer(Warehouse warehouse) {
super();
this.warehouse = warehouse;
} @Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
warehouse.removeProduct();
}
}
}
2.1 使用lock、condition和await、singalAll
仓库类
package com.thread.pc.lockcondition; import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; /**
* 仓库类
*
* @author yyx 2019年1月5日
*/
public class Warehouse {
private final int MAX_SIZE = 5;
private List<Product> listProduct;
private Lock lock;
private Condition conditionProducer;
private Condition conditionConsumer; public Warehouse(List<Product> listProduct, Lock lock, Condition conditionProducer, Condition conditionConsumer) {
super();
this.listProduct = listProduct;
this.lock = lock;
this.conditionProducer = conditionProducer;
this.conditionConsumer = conditionConsumer;
} public void addProduct() {
lock.lock();
try {
String currentName = Thread.currentThread().getName();
while (listProduct.size() >= MAX_SIZE) { // 为了避免虚假唤醒,应该总是使用在循环中
try {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
conditionProducer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.add(product);
conditionConsumer.signalAll();
} finally {
lock.unlock();
}
} public void removeProduct() {
lock.lock();
try {
String currentName = Thread.currentThread().getName();
while (listProduct.size() <= 0) {
try {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
conditionConsumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = listProduct.get(0);
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.remove(0);
conditionProducer.signalAll();
} finally {
lock.unlock();
}
}
}
测试类
package com.thread.pc.lockcondition; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 测试类
* @author 2018年12月22日
*/
public class TestModel {
public static void main(String[] args) {
List<Product> listProduct=new ArrayList<Product>();
Lock lock = new ReentrantLock();
Condition conditionProducer = lock.newCondition();
Condition conditionConsumer = lock.newCondition(); Warehouse warehouse = new Warehouse(listProduct,lock, conditionProducer,conditionConsumer);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer,"生产者A").start();
new Thread(producer,"生产者B").start();
new Thread(consumer,"消费者C").start();
new Thread(consumer,"消费者D").start();
}
}
2.2 使用synchronized修饰代码块
仓库类
package com.thread.pc.synchronizedcodeblock; import java.util.List;
import java.util.UUID; /**
* 仓库类
*
* @author yyx 2019年1月5日
*/
public class Warehouse {
private final int MAX_SIZE = 10; // 最大数量
private List<Product> listProduct; public Warehouse(List<Product> listProduct) {
super();
this.listProduct = listProduct;
} /**
* 生产产品
*/
public void addProduct() {
synchronized (listProduct) {
String currentName = Thread.currentThread().getName();
while (listProduct.size() >= MAX_SIZE) { // 为了避免虚假唤醒,应该总是使用在循环中
try {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
listProduct.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.add(product);
listProduct.notifyAll();
}
} /**
* 消费产品
*/
public void removeProduct() {
synchronized (listProduct) {
String currentName = Thread.currentThread().getName();
while (listProduct.size() <= 0) {
try {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
listProduct.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = listProduct.get(0);
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.remove(0);
listProduct.notifyAll();
}
}
}
测试类
package com.thread.pc.synchronizedcodeblock; import java.util.ArrayList;
import java.util.List; /**
* 测试类
* @author yyx
* 2019年1月5日
*/
public class TestModel {
public static void main(String[] args) {
List<Product> listProduct = new ArrayList<Product>(); Warehouse warehouse = new Warehouse(listProduct);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer,"生产者A").start();
new Thread(producer,"生产者B").start();
new Thread(consumer,"消费者C").start();
new Thread(consumer,"消费者D").start();
}
}
2.3 使用synchronized修饰方法
仓库类
package com.thread.pc.synchronizedmethod; import java.util.List;
import java.util.UUID; /**
* 仓库类
*
* @author yyx 2019年1月5日
*/
public class Warehouse {
private final int MAX_SIZE = 10;
private List<Product> listProduct; public Warehouse(List<Product> listProduct) {
super();
this.listProduct = listProduct;
} /**
* 生产产品
*/
public synchronized void addProduct() {
String currentName = Thread.currentThread().getName();
while (listProduct.size() >= MAX_SIZE) {
try {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.add(product);
notifyAll();
} /**
* 消费产品
*/
public synchronized void removeProduct() {
String currentName = Thread.currentThread().getName();
while (listProduct.size() <= 0) {
try {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = listProduct.get(0);
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.remove(0);
notifyAll();
}
}
测试类
package com.thread.pc.synchronizedmethod; import java.util.ArrayList;
import java.util.List; /**
* 测试类
*
* @author yyx 2019年1月5日
*/
public class TestModel {
public static void main(String[] args) {
List<Product> listProduct = new ArrayList<Product>(); Warehouse warehouse = new Warehouse(listProduct);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer, "生产者A").start();
new Thread(producer, "生产者B").start();
new Thread(consumer, "消费者C").start();
new Thread(consumer, "消费者D").start();
}
}
2.4 使用BlockingQueue
仓库类
package com.thread.pc.blockingqueue; import java.util.UUID;
import java.util.concurrent.BlockingQueue;
/**
* 仓库
* @author yyx 2018年12月22日
*/
public class Warehouse {
private final int MAX_SIZE = 10;
private BlockingQueue<Product> blockingQueue; public Warehouse(BlockingQueue<Product> blockingQueue) {
super();
this.blockingQueue = blockingQueue;
} public void addProduct() {
String currentName = Thread.currentThread().getName();
if (blockingQueue.size() >= MAX_SIZE) {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
} else {
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
try {
blockingQueue.put(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} public void removeProduct() {
String currentName = Thread.currentThread().getName();
if (blockingQueue.size() <= 0) {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
} else {
try {
Product product = blockingQueue.take();
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
测试类
package com.thread.pc.blockingqueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 测试类
* @author yyx 2018年12月22日
*/
public class TestModel {
public static void main(String[] args) {
BlockingQueue<Product> blockingQueue = new LinkedBlockingQueue<>(10); Warehouse warehouse = new Warehouse(blockingQueue);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer).start();
new Thread(producer).start();
new Thread(consumer).start();
}
}