生产者消费者引入
在解决进程同步问题时,最著名的问题是生产消费者问题。
大意:一群生产者生产产品并放于缓冲区,消费者从缓冲区拿东西进行使用。尽管生产者和消费者之间的行为异步运行但是他们之间必须要同步,不能缓冲池满了还有生产者往里边存,也不能缓冲池空了还有消费者从里边取。
下面我将用代码大概描述这件问题:
首先我们用一个盒子box来表示上述有n个缓冲区的缓冲池。然后定义一个生产者类和一个消费者类,一个不断往缓冲区生产商品,一个不断往缓冲区拿走商品。从而重现这个过程。
package text_1;
public class pcproblem {
int in=0,out=0;
private static int boxsize=5;
public static void main(String[] args) {
pcproblem
pc = new pcproblem();
Box[]
boxes = new Box[boxsize];//为盒子申请空间(缓冲区)
for(int i=0;i<boxsize;i++) {
boxes[i]= pc.new Box();//为盒子申请一个空间
boxes[i].setnum(i+1);//为盒子起一个名字
}
Producer p = pc.new Producer(boxes);//创建生产者线程
p.start();//启动它
Customer c = pc.new Customer(boxes);//创建消费者线程
c.start();//启动它
}
class Box{//盒子(缓冲区)
private int num;//盒子的名字
int getnum() {
return num;
}
void setnum(int num) {
this.num=num;//起名字
}
void in() {
System.out.println("向第"+num+"个盒子中放入东西");//生产者调用
}
void out() {
System.out.println("从第"+num+"个盒子中拿出东西");//消费者调用
}
}
class Producer extends Thread{//生产者
Box boxes[];
public Producer(Box boxes[]) {
this.boxes=boxes;
}
public void run() {
while(true) {
boxes[in].in();//写入
in=(in+1)%boxsize;//写入后in
}
}
}
class Customer extends Thread{//消费者
Box boxes[];
public Customer(Box boxes[]) {
this.boxes=boxes;
}
public void run() {
while(true) {
boxes[out].out();//取出
out=(out+1)%boxsize;//取出后out
}
}
}
}
运行结果:
但是在这种情况下,大家会发现一个问题,也就是当对临界资源的使用放弃了锁的保护会出现的结果:会导致资源的恶性竞争,并且共享数据也不会安全。因为资源会被重复占用,而不加入同步锁会导致抢占特定资源的进程也会出现不确定的结果。
解决办法有很多:
模拟记录型信号量解决生产者-消费者问题
用记录型信号量解决生产者-消费者问题
假定在生产者和消费者之间的共用缓冲池中有n个缓冲区;
此时,可利用互斥信号量来互斥的访问缓冲区,同时利用信号量empty和full来分别表示空的和满的缓冲区数量。
假如生产者和消费者相互等效,只要缓冲区未满便可以往里边存数据,只要缓冲区未空便可以从里边取数据。
代码如下:
package text_1;
import java.util.concurrent.Semaphore;
public class pcproblem {
int in=0,out=0;
private static int boxsize=5;
private Semaphore mutex = new Semaphore(1);
private Semaphore empty = new Semaphore(boxsize);
private Semaphore full = new Semaphore(0);
public static void main(String[] args) {
pcproblem
pc = new pcproblem();
Box[] boxes = new Box[boxsize];//为盒子申请空间(缓冲区)
for(int i=0;i<boxsize;i++) {
boxes[i]= pc.new Box();//为盒子申请一个空间
boxes[i].setnum(i+1);//为盒子起一个名字
}
Producer p = pc.new Producer(boxes);//创建生产者线程
p.start();//启动它
Customer c = pc.new Customer(boxes);//创建消费者线程
c.start();//启动它
}
//定义wait方法,尝试获取信号量
public int Wait(Semaphore semaphore) {
try {
semaphore.acquire();//减少一个许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return semaphore.availablePermits();//返回许可数(还可以走几个线程)
}
//定义signal方法:解锁
public int Signal(Semaphore semaphore) {
semaphore.release();//添加一个许可
return semaphore.availablePermits();
}
class Box{//盒子(缓冲区)
private int num;//盒子的名字
int getnum() {
return num;
}
void setnum(int num) {
this.num=num;//起名字
}
void in() {
System.out.println("向第"+num+"个盒子中放入东西");//生产者调用
}
void out() {
System.out.println("从第"+num+"个盒子中取出东西");//消费者调用
}
}
class Producer extends Thread{//生产者
Box boxes[];
public Producer(Box boxes[]) {
this.boxes=boxes;
}
public void run() {
while(true) {
Wait(empty);
Wait(mutex);
boxes[in].in();//写入
in=(in+1)%boxsize;//写入后in
Signal(mutex);
Signal(full);
}
}
}
class Customer extends Thread{//消费者
Box boxes[];
public Customer(Box boxes[]) {
this.boxes=boxes;
}
public void run() {
while(true) {
Wait(full);
Wait(mutex);
boxes[out].out();//取出
out=(out+1)%boxsize;//取出后out
Signal(mutex);
Signal(empty);
}
}
}
}
模拟AND信号量解决生产者-消费者问题
用Swait(empty,mutex)来代替Wait(empty)和Wait(mutex);
用Ssignal(mutex,full)来代替Signal(empty)和Signal (mutex);
用Swait(full,mutex)来代替Wait(full)和Wait(mutex);
用Ssignal(mutex,empty)来代替Signal(mutex)和Signal (empty);
代码如下
package text_1;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class pcproblem {
int in=0,out=0;
private static int boxsize=5;
private Semaphore mutex = new Semaphore(1);
private Semaphore empty = new Semaphore(boxsize);
private Semaphore full = new Semaphore(0);
static Lock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) {
pcproblem pc = new pcproblem();
Box[]
boxes = new Box[boxsize];//为盒子申请空间(缓冲区)
for(int i=0;i<boxsize;i++) {
boxes[i]= pc.new Box();//为盒子申请一个空间
boxes[i].setnum(i+1);//为盒子起一个名字
}
Producer p = pc.new Producer(boxes);//创建生产者线程
p.start();//启动它
Customer c = pc.new Customer(boxes);//创建消费者线程
c.start();//启动它
}
//定义Swait方法加锁
public void Swait(Semaphore...semaphores) {
lock.lock();
while(true) {
int count=0;
for(Semaphore semaphore:semaphores) {
if(semaphore.availablePermits()>0) {
count++;
}
}
if(count == semaphores.length) {
break;
}
try {
condition.await();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
for(Semaphore semaphore:semaphores) {
try {
semaphore.acquire();
}catch(InterruptedException e){
e.printStackTrace();
}
}
lock.unlock();
}
//定义signal方法:解锁
public void Ssignal(Semaphore...semaphores) {
try {
lock.tryLock(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (Semaphore semaphore:semaphores){
semaphore.release();
}
//唤醒等待队列中的一个线程
condition.signal();
lock.unlock();
}
class Box{//盒子(缓冲区)
private int num;//盒子的名字
int getnum() {
return num;
}
void setnum(int num) {
this.num=num;//起名字
}
void in() {
System.out.println("向第"+num+"个盒子中放入东西");//生产者调用
}
void out() {
System.out.println("从第"+num+"个盒子中取出东西");//消费者调用
}
}
class Producer extends Thread{//生产者
Box boxes[];
public Producer(Box boxes[]) {
this.boxes=boxes;
}
public void run() {
while(true) {
Swait(empty,mutex);
boxes[in].in();//写入
in=(in+1)%boxsize;//写入后in
Ssignal(mutex,full);
}
}
}
class Customer extends Thread{//消费者
Box boxes[];
public Customer(Box boxes[]) {
this.boxes=boxes;
}
public void run() {
while(true) {
Swait(full,mutex);
boxes[out].out();//取出
out=(out+1)%boxsize;//取出后out
Ssignal(mutex,empty);
}
}
}
}
模拟管程解决解决生产者-消费者问题
在利用管程问题解决生产者-消费者问题时,首先要为他们建立一个管程,命名为pc;
其中包括两个过程:
1) put(x);过程
2) get(x)过程
代码如下:
package text_1;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class pcproblem {
public static void main(String[] args) {
pcproblem pc = new pcproblem();
Pcp pcp = pc.new Pcp();
Producer p = pc.new Producer(pcp);
Consumer c = pc.new Consumer(pcp);
p.start();
c.start();
}
class Pcp{
public static final int boxsize = 5;
private int count;
private Lock lock;
private Condition notFull;
private Condition notEmpty;
public Pcp() {
this.count=0;
this.lock=new ReentrantLock();
this.notEmpty=lock.newCondition();
this.notFull=lock.newCondition();
}
//put过程
public void put() {
lock.lock();
while(count==boxsize) {
try {
notFull.await();
}catch(InterruptedException e) {
e.printStackTrace();
}
}
count++;
System.out.println("向第"+count+"个盒子中放入东西");
notEmpty.signal();
lock.unlock();
}
public void get() {
lock.lock();
while(count == 0){
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("从第"+count+"个盒子中取出东西");
count--;
notFull.signal();
lock.unlock();
}
}
class Producer extends Thread{//生产者
private Pcp pcp;
public Producer(Pcp pcp) {
this.pcp=pcp;
}
public void run() {
while(true) {
pcp.put();
}
}
}
class Consumer extends Thread{//消费者
private Pcp pcp;
public Consumer(Pcp pcp) {
this.pcp=pcp;
}
public void run() {
while(true) {
pcp.get();
}
}
}
}