1.何为生产者与消费者
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @ClassName:Restraurant
* @Description:何为生产者与消费者
* @author:
* @date:2018年5月3日
*/
public class Restraurant {
Meal m=null;
Chef chef=new Chef(this);
WaitPerson wait=new WaitPerson(this);
ExecutorService service=Executors.newCachedThreadPool();
public Restraurant() {
service.execute(chef);
service.execute(wait);
}
public static void main(String[] args) {
new Restraurant();
}
}
/**
* @ClassName:Meal
* @Description:生产者生成的数据
* @author:
* @date:2018年5月3日
*/
class Meal{
private final int orderNum;//食物订单编号
public Meal(int num){
orderNum=num;
}
public String toString(){
return "Meal"+orderNum;
}
}
/**
* @ClassName:Chef
* @Description:厨师类,及生产者
* @author:
* @date:2018年5月3日
*/
class Chef implements Runnable{
Restraurant r;
int count=0;
public Chef(Restraurant r) {
this.r=r;
}
@Override
public void run() {
try{
while(!Thread.interrupted()){
synchronized (this) {
while(r.m!=null){
System.out.println("厨师等待中");
wait();//等待服务员取餐
}
}
if(count++==10){
System.out.println("今日已售完");
r.service.shutdownNow();
}
System.out.println("订单完成,服务员取餐");
synchronized (r.wait) {
r.m=new Meal(count);
r.wait.notifyAll();
}
TimeUnit.SECONDS.sleep(1);
}
}catch (InterruptedException e) {
System.out.println("生产者线程强制中断");
}
}
}
/**
* @ClassName:WaitPerson
* @Description:服务员类,即消费者
* @author:
* @date:2018年5月3日
*/
class WaitPerson implements Runnable{
Restraurant r;
public WaitPerson(Restraurant r) {
this.r=r;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (r.m == null) {
System.out.println("服务员等待中");
wait();// 等待厨师生成食物
}
}
System.out.println("服务员以取餐" + r.m);
synchronized (r.chef) {
r.m = null;
r.chef.notifyAll();
}
}
} catch (InterruptedException e) {
System.out.println("消费者线程强制中断");
}
}
}
2.生产者与消费者模式
1)产生原因:在多线程开发 中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理 完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须 等待生产者。wait与notify方法以一种非常低级的方式解决了任务互相通知的问题,即每次交互都要进行一次握手,极大影响的效率以及性能,为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。
2)原理:生产者和消费者模式是通过一个容器(比如同步阻塞队列)来解决生产者和消费者的强耦合问题。生产者和消 费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用 等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。java.util.concurrent.BlockingQueue接口提供了这个队列,通常使用其实现子类ArrayBlockingQueue,LinkedBlockingQueue。当消费者任务试图从同步队列中获取对象,如果队列为空时,那么队列则会挂起消费者任务,并且当拥有足够多的元素可用时才会恢复消费者任务。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class UseBlockingQueue {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<Toast> dry=new LinkedBlockingQueue<Toast>(),
butter=new LinkedBlockingQueue<Toast>(),
jam=new LinkedBlockingQueue<Toast>(),
con=new LinkedBlockingQueue<Toast>();
ExecutorService exec=Executors.newCachedThreadPool();
exec.execute(new MakeToast(dry));//制作初始吐司任务
exec.execute(new Butter(dry,butter));//吐司抹黄油任务
exec.execute(new Jam(butter,jam));//吐司抹果酱任务
exec.execute(new Consumer(jam));//消费者任务,食用吐司
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Toast{
private int status;//吐司状态:0代表制作吐司,1代表抹黄油,2代表向抹了黄油的吐司抹果酱
private final int id;
public Toast(int id1) {
id=id1;
}
public void butter(){
status=1;
};
public void jam(){
status=2;
}
public int getStatus(){
return status;
}
public int getId(){
return id;
}
public String toString(){
return "toast "+id+":"+status;
}
}
/**
* @Description:制作初始吐司
*/
class MakeToast implements Runnable{
private LinkedBlockingQueue<Toast> queue=new LinkedBlockingQueue<Toast>();
private int count=0;
public MakeToast(LinkedBlockingQueue<Toast> q) {
queue=q;
}
@Override
public void run() {
try{
while(!Thread.interrupted()){
Thread.sleep(1000);//制作时间
Toast t=new Toast(count);
System.out.println(t);
queue.put(t);//添加到同步队列
count++;
}
}catch (InterruptedException e) {
System.out.println("make process interrupted");
}
System.out.println("make process off");
}
}
/**
* @Description:涂抹黄油
*/
class Butter implements Runnable{
private LinkedBlockingQueue<Toast> queue1,queue2;//未加料吐司队列,抹黄油后吐司队列
public Butter(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
queue1=q1;
queue2=q2;
}
@Override
public void run() {
try{
while(!Thread.interrupted()){
Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
t.butter();
System.out.println(t);
queue2.put(t);
}
}catch (InterruptedException e) {
System.out.println("butter process interrupted");
}
System.out.println("butter process off");
}
}
/**
* @Description:涂抹果酱
*/
class Jam implements Runnable{
private LinkedBlockingQueue<Toast> queue1,queue2;//抹黄油后吐司队列,抹果酱吐司队列
public Jam(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
queue1=q1;
queue2=q2;
}
@Override
public void run() {
try{
while(!Thread.interrupted()){
Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
t.jam();
System.out.println(t);
queue2.put(t);
}
}catch (InterruptedException e) {
System.out.println("jam process interrupted");
}
System.out.println("jam process off");
}
}
/**
* @Description:被食用
*/
class Consumer implements Runnable{
private LinkedBlockingQueue<Toast> finished;//抹黄油后吐司队列,抹果酱吐司队列
int count=0;
public Consumer(LinkedBlockingQueue<Toast> q) {
finished=q;
}
@Override
public void run() {
try{
while(!Thread.interrupted()){
Toast t=finished.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
if(t.getId()!=count++||t.getStatus()!=2){
System.out.println("过程出现错误");
return;
}else{
System.out.println("所有过程正确实现"+"toast "+t.getId()+"被食用");
}
}
}catch (InterruptedException e) {
System.out.println("eat process interrupted");
}
System.out.println("eat process off");
}
}