生产者消费者并发编程:
假设仓库有10个仓位,分别有10个生产者和10个消费者,生产者不断生产产品,放入仓库的仓位中,而消费者则不断从仓库中获取产品,
如果仓库已满,则生产者要等待,等消费者消费后,空出仓位后,再继续放入产品。
反之如果仓库已空,则消费者要等待,等待生产者生产出产品后,再继续消费产品。
关于生产者、消费者有四种实现方式
1,wait,nofity方式
2,ReentrantLock锁的await()和signal()
3,阻塞队列的方式
4,Semaphore 信号量方式
下面分别以这四种方式实现
1,wait,nofity方式
package producer; public class WareHouse { private Object[] space = new Object[10]; private int currentPutIdx = 0; private int currentGetIdx = 0; private volatile int count = 0; public synchronized void putProduct(Object o) throws InterruptedException{ while (count >= 10) { // 库位是否已满,如果满就等待 wait(); } if (currentPutIdx > (space.length - 1)) { currentPutIdx = 0; } space[currentPutIdx++] = o; synchronized (this) { count++; } System.out.println("当前线程:" + Thread.currentThread().getName()+ " 放入一个资源,当前资源数:" + count); notifyAll(); } public synchronized void getProduct() throws InterruptedException{ while (count <= 0) { // 已空,等待 wait(); } if (currentGetIdx > (space.length - 1)) { currentGetIdx = 0; } if (space[currentGetIdx] == null) { System.out.println("当前线程:" + Thread.currentThread().getName()+ " is null"); } else { space[currentGetIdx++] = null; synchronized (this) { count--; } System.out.println("当前线程:" + Thread.currentThread().getName()+ " 得到一个资源,当前资源数:" + count); notifyAll(); } } }
package producer; import java.util.Random; public class Test { public static void main(String[] args) { WareHouse warehouse = new WareHouse(); for(int i =0;i<1000;i++){ new Thread(new Producer(warehouse), "Producer "+i).start(); } for(int i =0;i<1000;i++){ new Thread(new Consumer(warehouse), "Consumer "+i).start(); } } static class Producer implements Runnable{ private WareHouse factory; public Producer(WareHouse factory) { this.factory = factory; } public void run() { while(true){ try { Thread.sleep(new Random().nextInt(1000)); factory.putProduct(new Object()); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer implements Runnable{ private WareHouse factory; public Consumer(WareHouse factory){ this.factory = factory; } public void run() { while(true){ try { Thread.sleep(new Random().nextInt(1000)); factory.getProduct(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
2,ReentrantLock锁的await()和signal()
package sort; import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProducerConsumer { private LinkedList<Object> myList = new LinkedList<Object>(); private int MAX = 10; private final Lock lock = new ReentrantLock(); private final Condition full = lock.newCondition(); private final Condition empty = lock.newCondition(); public ProducerConsumer() { } public void start() { new Producer().start(); new Consumer().start(); } public static void main(String[] args) throws Exception { ProducerConsumer s2 = new ProducerConsumer(); s2.start(); } class Producer extends Thread { public void run() { while (true) { lock.lock(); try { while (myList.size() == MAX) { System.out.println("warning: it‘s full!"); full.await(); } Object o = new Object(); if (myList.add(o)) { System.out.println("Producer: " + o); empty.signal(); } } catch (InterruptedException ie) { System.out.println("producer is interrupted!"); } finally { lock.unlock(); } } } } class Consumer extends Thread { public void run() { while (true) { lock.lock(); try { while (myList.size() == 0) { System.out.println("warning: it‘s empty!"); empty.await(); } Object o = myList.removeLast(); System.out.println("Consumer: " + o); full.signal(); } catch (InterruptedException ie) { System.out.println("consumer is interrupted!"); } finally { lock.unlock(); } } } } }
3,阻塞队列的方式
import java.util.concurrent.*; public class ProducerConsumer { // 建立一个阻塞队列 private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10); public ProducerConsumer() { } public void start() { new Producer().start(); new Consumer().start(); } public static void main(String[] args) throws Exception { ProducerConsumer s3 = new ProducerConsumer(); s3.start(); } class Producer extends Thread { public void run() { while (true) { try { Object o = new Object(); // 取出一个对象 queue.put(o); System.out.println("Producer: " + o); } catch (InterruptedException e) { System.out.println("producer is interrupted!"); } // } } } } class Consumer extends Thread { public void run() { while (true) { try { // 取出一个对象 Object o = queue.take(); System.out.println("Consumer: " + o); } catch (InterruptedException e) { System.out.println("producer is interrupted!"); } // } } } } }
4,Semaphore 信号量方式
package com.test.current; import java.util.concurrent.Semaphore; public class TestSemaphore { public static void main(String[] args) { for (int i = 0; i <= 3; i++) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } } //仓库 static WareHouse buffer = new WareHouse(); //生产者 static class Producer implements Runnable{ static int num = 1; public void run() { int n = num++; while(true){ try{ buffer.put(n); System.out.println(">"+n); Thread.sleep(10); }catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer implements Runnable{ public void run() { while (true) { try { System.out.println("<"+buffer.take()); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class WareHouse{ //非满锁 final Semaphore notFull = new Semaphore(10); //非空锁 final Semaphore notEmpty = new Semaphore(0); //互斥锁 final Semaphore mutex = new Semaphore(1); //库存容量 final Object[] items = new Object[10]; int putPosi, takePosi, count; public void put(Object x)throws InterruptedException{ try{ notFull.acquire(); mutex.acquire(); items[putPosi] = x; if (++putPosi == items.length) { putPosi = 0; } count++; }finally{ notEmpty.release(); mutex.release(); } } public Object take()throws InterruptedException{ notEmpty.acquire(); mutex.acquire(); try{ Object x = items[takePosi]; if(++takePosi == items.length){ takePosi = 0; } --count; return x; }finally{ notFull.release(); mutex.release(); } } } }