二十四、多线程生产消费模式

实际生活中,需要操作共享的某个资源(水池),但是对这个共享资源的操作方式不同(部分是注水[生产]、部分是抽水[消费])。把这种现象我们可以称为生产和消费模型。

生产:它可以采用部分线程进行模拟。多个线程同时给水池中注水。

消费:它可以采用部分线程进行模拟。多个线程同时从水池中抽水。

对资源的不同的操作方式,每种方式都可以让部分的线程去负责。多个不同的线程,他们对相同的资源(超市、水池等)操作方式不一致。

这个时候我们不能使用一个run方法对线程的任务进行封装。所以这里就需要定义不同的线程任务类,描述不同的线程的任务。

通过不同的线程操作,来控制同一个资源,这种现象就属于生产消费模型。

简单的实现生产消费模式

第一种

1.创建公共资源类

/**
 * 被多个线程操作的共享数据的共享资源类
 * @author Administrator
 *
 */
public class Resource {
		// 注水的方法
		public void add() {

		}
		// 抽水的方法
		public void delete() {

		}
}

2.生产共享资源目标类

/**
  * 生产共享资源的目标类
 * @author Administrator
 *
 */
public class ShengChan implements Runnable{
	//定义共享资源对象
	private  Resource  resource;
	//通过构造方法传入共享资源对象
	public ShengChan(Resource  resource) {
		this.resource=resource;
	}
	
	@Override
	public void run() {
		//访问共享资源的生产方法
		resource.add();
	}
}


消费共享资源的目标类
package com.wangxing.test1;
/**
 * 消费共享资源的目标类
 * @author Administrator
 *
 */
public class XiaoFei implements Runnable{
	//定义共享资源对象
	private  Resource  resource;
	//通过构造方法传入共享资源对象
	public XiaoFei(Resource  resource) {
		this.resource=resource;
	}
	@Override
	public void run() {
		//访问共享资源的消费方法
		resource.delete();
	}
}

3.测试主类

package com.wangxing.test1;
public class TestMain {
	public static void main(String[] args) {
		//创建共享资源类对象
		Resource  resource=new Resource();
		//创建生产共享资源的目标类对象
		ShengChan  sc=new ShengChan(resource);
		//创建消费共享资源的目标类对象
		XiaoFei  xf=new XiaoFei(resource);
		//创建生产者线程对象
		Thread  scthread=new Thread(sc);
		//创建消费者线程对象
		Thread  xfthread=new Thread(xf);
		//启动生产和消费线程
		scthread.start();
		xfthread.start();
	}
}

5.秀海Resource为add()/delete() 添加具体实现的动作

/**
 * 被多个线程操作的共享数据的共享资源类
 * @author Administrator
 *
 */
public class Resource {
		//保存共享资源的数组【水池】
		private Object objs[] = new Object[1];
		//记录生产和消费的次数
		private int num = 1;
		
		// 注水的方法
		public void add() {
			objs[0] = "水" + num;
			System.out.println(Thread.currentThread().getName() + "正要注进入的水是:" + objs[0]);
			num++;
		}
		// 抽水的方法
		public void delete() {
			System.out.println(Thread.currentThread().getName() + "抽出的水是:" + objs[0]);
			objs[0] = null;
		}
}

6.修改生产者的目标类为注水方法添加死循环,以达到持续注水的目标。

/**
  * 生产共享资源的目标类
 * @author Administrator
 *
 */
public class ShengChan implements Runnable{
	//定义共享资源对象
	private  Resource  resource;
	//通过构造方法传入共享资源对象
	public ShengChan(Resource  resource) {
		this.resource=resource;
	}
	@Override
	public void run() {
		//持续注水
		while(true){
		//访问共享资源的生产方法
		resource.add();
		}
	}
}

7.修改消费者的目标类为抽水方法添加死循环,以达到持续抽水的目标。

/**
 * 消费共享资源的目标类
 * @author Administrator
 *
 */
public class XiaoFei implements Runnable{
	//定义共享资源对象
	private  Resource  resource;
	//通过构造方法传入共享资源对象
	public XiaoFei(Resource  resource) {
		this.resource=resource;
	}
	@Override
	public void run() {
		//持续抽水
		while(true){
		//访问共享资源的消费方法
		resource.delete();
		}
	}
}

注意:

有时候会出现生产者注水为null的情况:

有两个线程分别是生产者负责注水的线程和负责抽水的线程。

假设CPU在消费者线程上,那么喜爱丰字格正在打印了抽水为null的情况,还没有        将数组空间赋值为null之前,CPU切换到生产者,生产者将水注入到数组空间中 之后,打印出正要注入进的水是:水2,CPU又切回到消费者线程上,消费者线程就会将数组空间立即赋值为null。CPU如果在切回到生产者线程上,执行了注水次数加1之后。CPU如果在切回到消费者线程上,这是消费者线程就会输出抽水为null的情况。

上面的这两个问题就是因为当前线程正在访问公共的资源的时候,其他的线程也可以访问公共资源产生的。所以线程操作需要共享数据,需要线程同步。

线程同步能够保证注水的时候不能抽水,或者抽水的时候不能给当前这个空间注水。

修改Resource为注水和抽水方法添加同步代码块保证注水的时候不能抽水,或者抽水的时后不能给当前这个空间注水

修改后


/**
 * 被多个线程操作的共享数据的共享资源类
 * @author Administrator
 *
 */
public class Resource {
		//保存共享资源的数组【水池】
		private Object objs[] = new Object[1];
		//记录生产和消费的次数
		private int num = 1;
		//创建一个同步对象
		private static final Object loc = new Object();
		// 注水的方法
		public void add() {
			synchronized (loc) {
				objs[0] = "水" + num;
				System.out.println(Thread.currentThread().getName() + "正要注进入的水是:" + objs[0]);
				num++;
			}
		}
		// 抽水的方法
		public void delete() {
			synchronized (loc) {
			System.out.println(Thread.currentThread().getName() + "抽出的水是:" + objs[0]);
			objs[0] = null;
			}
		}
}

注意:

上面执行完成以后会出现多次注水没有抽水,或者多次抽水,没有注水的问题?

要解决上面这个多次操作的问题,首先需要先判断是否满足抽水或者注水的条件。

什么时候抽水:当数组空间中不是null的时候可以进行抽水。

什么时候注水:数组空间为null的时候才能注水。

如果不满足注水的时候,但是当前正好CPU在注水的线程上,这时就必须让这个注水的线程等待,等到可以注水的时候将本次注水的动作做完。

如果不满足抽水的时候,但是当前正好CPU在抽水的线程上,必须让抽水的线程等待,等到数组有水的时候将本次的抽水的动作做完。

8.需要使用Java中线程的等待和唤醒机制(线程间的通信)

等待:如果判断发现不满足,这个线程就要等待。等待到满足操作的时候,才能继续进行执行。

注水线程注水结束之后,应该告诉抽水线程可以抽水。同样道理,抽水线程抽完水之后,应该告诉注水线程可以注水了。

唤醒:当某个一方操作完成之后,需要将处于另外一方操作的等待的线程等待的状态恢复到可以操作的状态(把一方通知另外一方的这个操作称为线程的唤醒)。

在Java提供两个不同的方法分别代表等待和唤醒:

等待和唤醒的方法没有定义在Thread类中,而是定义在Object类中(因为只有同步的锁才能让线程等待或者将等待的线程唤醒,而同步的锁是任意对象,等待和唤醒的方法只能定义在Object类中)

void

wait() 在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等

void

notify() 唤醒在此对象监视器上等待的单个线程。

void

notifyAll() 唤醒在此对象监视器上等待的所有线程。

注意:等待和唤醒(线程通信)必须位于同步中。因为等待和唤醒必须使用当前的锁才完成。

修改Resource为注水和抽水方法添加线程等待和唤醒操作

/**
 * 被多个线程操作的共享数据的共享资源类
 * @author Administrator
 *
 */
public class Resource {
		//保存共享资源的数组【水池】
		private Object objs[] = new Object[1];
		//记录生产和消费的次数
		private int num = 1;
		//创建一个同步对象
		private static final Object loc = new Object();
		// 注水的方法
		public void add() throws InterruptedException {
			synchronized (loc) {
				//注水时判断数组是否有水
				//有水,就无需注水,如果此时正好切换到注水线程,
				//那么注水线程就应该等待
				if(objs[0]!=null) {
					//注水线程等待
					loc.wait();
				}
				objs[0] = "水" + num;
				System.out.println(Thread.currentThread().getName() + "正要注进入的水是:" + objs[0]);
				num++;
				//唤醒抽水线程运行
				loc.notify();
			}
		}
		// 抽水的方法
		public void delete()throws InterruptedException{
			synchronized (loc) {
				//抽水时判断数组是否有水
				//没水,就无需抽水,如果此时正好切换到抽水线程,
				//那么抽水线程就应该等待
				if(objs[0]==null) {
					//抽水线程等待
					loc.wait();
				}
				System.out.println(Thread.currentThread().getName() + "抽出的水是:" + objs[0]);
				objs[0] = null;
				//唤醒注水线程运行
				loc.notify();
			}
		}
}

二十四、多线程生产消费模式

上面的程序处理好了单线程的注水和抽水动作。

下面我们将程序修改成多注水和所抽水的情况。

修改主类多创建几个注水和抽水线程对象,并启动运行。

将单注水和单抽水修改为两个注水和两个抽水,结果程序中又出现了多次注水,或者多次抽水的现象。

发生这个现象原因:是因为在唤醒的时候,抽水的线程将另外一个抽水的线程唤醒了。或者注水的线程将另外一个注水的线程唤醒了。只要自己同伴线程将自己唤醒之后,这时被唤醒的线程就可以继续操作。导致出现了多次注水,或者多次抽水的现象。

解决上面的问题:将判断有没有水的if修改为while即可。唤醒之后可以继续判断。

/**
 * 被多个线程操作的共享数据的共享资源类
 * @author Administrator
 *
 */
public class Resource {
		//保存共享资源的数组【水池】
		private Object objs[] = new Object[1];
		//记录生产和消费的次数
		private int num = 1;
		//创建一个同步对象
		private static final Object loc = new Object();
		// 注水的方法
		public void add() throws InterruptedException {
			synchronized (loc) {
				//注水时判断数组是否有水
				//有水,就无需注水,如果此时正好切换到注水线程,
				//那么注水线程就应该等待
				while(objs[0]!=null) {
					//注水线程等待
					loc.wait();
				}
				objs[0] = "水" + num;
				System.out.println(Thread.currentThread().getName() + "正要注进入的水是:" + objs[0]);
				num++;
				//唤醒抽水线程运行
				loc.notify();
			}
		}
		// 抽水的方法
		public void delete()throws InterruptedException{
			synchronized (loc) {
				//抽水时判断数组是否有水
				//没水,就无需抽水,如果此时正好切换到抽水线程,
				//那么抽水线程就应该等待
				while(objs[0]==null) {
					//抽水线程等待
					loc.wait();
				}
				System.out.println(Thread.currentThread().getName() + "抽出的水是:" + objs[0]);
				objs[0] = null;
				//唤醒注水线程运行
				loc.notify();
			}
		}
}

修改为while之后,程序又出现了新的问题:死锁(所有的线程都处于等待状态了。外面没有可以执行的线程了)。

解决方案:只能使用notifyAll唤醒所有线程。每次在唤醒的时候都是唤醒所有线程,即使唤醒了自己的同伴,也无所谓,因为还要继续判断,这样一定还会等待,但是唤醒唤醒中一定有另外一方的线程,它们肯定不会等待。它们不等待,就会去操作,它们操作完成也唤醒所有。

上面的问题的解决方案:将notify换成notifyAll方法。

/**
 * 被多个线程操作的共享数据的共享资源类
 * @author Administrator
 *
 */
public class Resource {
		//保存共享资源的数组【水池】
		private Object objs[] = new Object[1];
		//记录生产和消费的次数
		private int num = 1;
		//创建一个同步对象
		private static final Object loc = new Object();
		// 注水的方法
		public void add() throws InterruptedException {
			synchronized (loc) {
				//注水时判断数组是否有水
				//有水,就无需注水,如果此时正好切换到注水线程,
				//那么注水线程就应该等待
				while(objs[0]!=null) {
					//注水线程等待
					loc.wait();
				}
				objs[0] = "水" + num;
				System.out.println(Thread.currentThread().getName() + "正要注进入的水是:" + objs[0]);
				num++;
				//唤醒抽水线程运行
				loc.notifyAll();
			}
		}
		// 抽水的方法
		public void delete()throws InterruptedException{
			synchronized (loc) {
				//抽水时判断数组是否有水
				//没水,就无需抽水,如果此时正好切换到抽水线程,
				//那么抽水线程就应该等待
				while(objs[0]==null) {
					//抽水线程等待
					loc.wait();
				}
				System.out.println(Thread.currentThread().getName() + "抽出的水是:" + objs[0]);
				objs[0] = null;
				//唤醒注水线程运行
				loc.notifyAll();
			}
		}
}

多生产多消费的程序中,为了保证不出现全部线程被wait的情况,只能在唤醒的时候使用notifyAll将所有处于等待的线程唤醒。这样每次都可以保证一定会有存活的线程。但是这种唤醒效率太低了,经常会发生生产方唤醒自己的同伴线程,或者是消费方唤醒自己的同伴线程。

在JDK5中提供Condition接口。它用来代替等待和唤醒机制。

第二种

java.util.concurrent.locks接口 Condition

public interface Condition

在JDK5之前,一个同步的锁下面的等待和唤醒无法辨别当前让等待或唤醒的线程到底属于生产还是属于消费。而Condition接口,它可以创建出不同的等待和唤醒的对象,然后可以用在不同的场景下:

可以创建一个Condition对象,专门负责生产。

可以创建一个Condition对象,专门负责消费。

可以通过负责生产的Condition对象专门监视负责生产的线程。通过负责消费的Condition监视消费的线程。等待和唤醒的时候,可以使用各自的Condition对象。

void

wait() 在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等

void

notify() 唤醒在此对象监视器上等待的单个线程。

void

notifyAll() 唤醒在此对象监视器上等待的所有线程。

注意:如果要想使用Condition接口,同步必须使用Lock接口。

如果程序中同步使用的同步代码块,等待和唤醒只能使用Object中的wait、notify、notifyAll方法。

只有同步使用的Lock接口,等待和唤醒才能使用Condition接口。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 被多个线程操作的共享数据的共享资源类
 * @author Administrator
 *
 */
public class Resource {
		//保存共享资源的数组【水池】
		private Object objs[] = new Object[1];
		//记录生产和消费的次数
		private int num = 1;
		// 创建Lock接口,作为同步的锁
		private Lock lock = new ReentrantLock();
		// 负责监视注水的线程
		private Condition scCondition = lock.newCondition();
		// 负责监视抽水的线程
		private Condition xfCondition = lock.newCondition();
		// 注水的方法
		public void add() {
			 try {
				//线程同步,获取锁
				lock.lock();
				//注水时判断数组是否有水
				//有水,就无需注水,如果此时正好切换到注水线程,
				//那么注水线程就应该等待
				while(objs[0]!=null) {
					//注水线程等待
					scCondition.await();
				}
				objs[0] = "水" + num;
				System.out.println(Thread.currentThread().getName() + "正要注进入的水是:" + objs[0]);
				num++;
				//唤醒抽水线程运行
				xfCondition.signal();
			 }catch(Exception e) {
				 e.printStackTrace();
			 }finally {
				//手动释放锁
				lock.unlock();
			 }
		}
		// 抽水的方法
		public void delete(){
			 try {
				//线程同步,获取锁
				lock.lock();
				//抽水时判断数组是否有水
				//没水,就无需抽水,如果此时正好切换到抽水线程,
				//那么抽水线程就应该等待
				while(objs[0]==null) {
					//抽水线程等待
					xfCondition.await();
				}
				System.out.println(Thread.currentThread().getName() + "抽出的水是:" + objs[0]);
				objs[0] = null;
				//唤醒注水线程运行
				scCondition.signal();
			}catch(Exception e) {
				 e.printStackTrace();
			 }finally {
				//手动释放锁
				lock.unlock();
			 }
		}
}

为什么Lock接口替换同步代码块?

不使用Lock接口完成线程同步,那么我们就得使用同步代码【synchronized】,实现线程同步。如果我们使用同步代码【synchronized】,实现线程同步的话,这时我们就只能使用Object类提供的wait、notify、notifyAll方法。来实现线程的等待和唤醒操作。缺点就是通过Object类提供的notifyAll这个方法会唤醒所有的等待线程,这时就就可能会唤醒自己的同伴线程,如果唤醒的是自己的同伴线程的话,那么程序就会多执行一次是否注水/抽水的判断过程,这样程序的执行效率就会降低。为了提高程序的运行效率,我们就需要在唤醒等待的线程的时候,只唤醒注水线程/抽水线程,而不会唤醒自己的同伴线程,这时我们就需要使用Condition 接口提供的等待和唤醒方法【await(),signal(),signalAll()】,因为他可以只唤醒对方线程,而不会唤醒同伴线程。Condition 接口在使用的时候是需要Lock接口对象的newCondition 方法才能创建出Condition 接口对象。所以我们在此处就使用Lock接口对象实现线程同步,来代替同步代码【synchronized】,实现线程同步。

同步代码【synchronized】-----Object类提供的wait、notify、notifyAll方法

Lock接口对象实现线程同步----Condition 接口提供的await(),signal(),signalAll()方法

signalAll这个唤醒全部线程在什么情况下使用?

signalAll这个方法是Condition 接口提供的唤醒所有等待线程,出现死锁的情况的时候可以使用signalAll这个方法,唤醒同一类的等待线程。

等待与唤醒机制的方式有2种,区别

Object类提供的wait、notify、notifyAll方法

Condition 接口提供的await(),signal(),signalAll()方法

同步代码【synchronized】实现线程同步

Lock接口对象实现线程同步

效率低

效率高

notify 与notifyAll的区别

notify

notifyAll

只随机唤醒一个 wait 线程

唤醒所有 wait 线程

可能会导致死锁

不会导致死锁

唤醒等待的线程不分彼此

signalsignalAll的区别

signal

signalAll

只随机唤醒一个 wait 线程【同一类】

唤醒所有 wait 线程【同一类】

10.9.sleep 与wait的区别

sleep

wait

Thread

Object

依赖于系统时钟和CPU调度机制

线程调用notify()或者notifyAll()方法

不释放已获取的锁资源

释放已获取的锁资源

无奈源于不够强大二十四、多线程生产消费模式

 

上一篇:多线程中生产消费模型


下一篇:小程序跨端框架实践之Remax篇