生产者-消费者模型的3种Java实现:synchronized,signal/notifyAll及BlockingQueue

我的技术博客经常被流氓网站恶意爬取转载。请移步原文:http://www.cnblogs.com/hamhog/p/3555111.html,享受整齐的排版、有效的链接、正确的代码缩进、更好的阅读体验。

【实现1:synchronized】

含测试函数main。

public class ProductManagerUsingSync {

    static final int MAX_AMOUNT = 1000;
int currentAmount; /**
* @param args
*/
public static void main(String[] args) {
ProductManagerUsingSync manager = new ProductManagerUsingSync(); for (int i = 0; i < 5; i++){
int consume = (int) Math.round(Math.random()*50) + 10;
Thread consumerThread = new Thread(new ConsumerWithSync(consume, manager));
consumerThread.start();
} for (int i = 0; i < 10; i++){
int produce = (int) Math.round(Math.random()*50) + 10;
Thread producerThread = new Thread(new ProducerWithSync(produce, manager));
producerThread.start();
}
} public ProductManagerUsingSync() {
currentAmount = 0;
} /**
* Add product. If can't, return.
* @param addAmount
* @return if succeeded.
*/
public boolean addProduct(int addAmount){
if (currentAmount + addAmount > MAX_AMOUNT)
return false; currentAmount += addAmount;
System.out.println("produced: " + addAmount + " current: " + currentAmount);
return true;
} /**
* Take product. If can't, return.
* @param takeAmount The amount of product to take.
* @return if succeeded.
*/
public boolean takeProduct(int takeAmount){
if (takeAmount > currentAmount)
return false; currentAmount -= takeAmount;
System.out.println("consumed: " + takeAmount + " current: " + currentAmount);
return true;
} } class ProducerWithSync implements Runnable {
private int amount;
private ProductManagerUsingSync manager; ProducerWithSync(int amount, ProductManagerUsingSync manager) {
this.amount = amount;
this.manager = manager;
} @Override
public void run() {
while (true) {
synchronized (manager) {
if (manager.addProduct(amount))
return;
}
}
}
} class ConsumerWithSync implements Runnable {
private int amount;
private ProductManagerUsingSync manager; ConsumerWithSync(int amount, ProductManagerUsingSync manager) {
this.amount = amount;
this.manager = manager;
} @Override
public void run() {
while (true) {
synchronized (manager) {
if (manager.takeProduct(amount))
return;
}
}
}
}

解释:Consumer类和Producer类在run方法中进行产品的生产和消费。重点在于:1. 在尝试生产、消费前会获取manager上的锁。由于所有的生产者、消费者中的manager都是同一个实例,因此消费、生产过程是保证线程安全(单线程串行)的。2. 在生产、消费失败的情况下,会进入死循环,反复再次尝试,直到成功为止。

这种实现方法下,暂时不能生产、消费时需要一直死循环,太占资源了;如果在每次循环之间sleep,则不一定能及时生产、消费。

【实现2:signal/notifyAll】

含测试函数main。

public class ProductManagerUsingSignal {

    static final int MAX_AMOUNT = 1000;
int currentAmount; /**
* @param args useless
*/
public static void main(String[] args) {
ProductManagerUsingSignal manager = new ProductManagerUsingSignal(); for (int i = 0; i < 5; i++){
int consume = (int) Math.round(Math.random()*50);
Thread consumerThread = new Thread(new Consumer(consume, manager));
consumerThread.start();
} for (int i = 0; i < 10; i++){
int produce = (int) Math.round(Math.random()*50);
Thread producerThread = new Thread(new Producer(produce, manager));
producerThread.start();
}
} public ProductManagerUsingSignal(){
currentAmount = 0;
} /**
* Add product. If can't, wait. NotifyAll when finished.
* @param addAmount The amount of product to add.
*/
public synchronized void addProduct(int addAmount){
while (currentAmount + addAmount > MAX_AMOUNT) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentAmount += addAmount;
System.out.println("produced: " + addAmount + " current: " + currentAmount);
notifyAll();
} /**
* Take product. If can't, wait. NotifyAll when finished.
* @param takeAmount The amount of product to take.
*/
public synchronized void takeProduct(int takeAmount){
while (takeAmount > currentAmount) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentAmount -= takeAmount;
System.out.println("consumed: " + takeAmount + " current: " + currentAmount);
notifyAll();
} } class Producer implements Runnable {
private int amount;
private ProductManagerUsingSignal manager; Producer(int amount, ProductManagerUsingSignal manager) {
this.amount = amount;
this.manager = manager;
} @Override
public void run() {
manager.addProduct(amount);
}
} class Consumer implements Runnable {
private int amount;
private ProductManagerUsingSignal manager; Consumer(int amount, ProductManagerUsingSignal manager) {
this.amount = amount;
this.manager = manager;
} @Override
public void run() {
manager.takeProduct(amount);
}
}

解释:这种实现同样用synchronized保证线程安全;它的重点在于,当生产、消费失败时,会进入wait状态,让位给其他线程;而完成一次成功的生产或消费后,会调用notifyAll方法,唤醒之前等待状态的进程。这种实现在效率上要好于第一种。

【实现3:BlockingQueue】

含测试函数main。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; public class ProductManagerUsingBlockingQueue { BlockingQueue<Integer> sharedQueue;
/**
* @param args
*/
public static void main(String[] args) {
sharedQueue = new LinkedBlockingQueue<Integer>(); for (int i = 0; i < 10; i++){
Thread consumerThread = new Thread(new ConsumerWithBlockingQueue(sharedQueue));
consumerThread.start();
} for (int i = 0; i < 10; i++){
Thread producerThread = new Thread(new ProducerWithBlockingQueue(i, sharedQueue));
producerThread.start();
}
} } class ProducerWithBlockingQueue implements Runnable { private int amount;
private final BlockingQueue<Integer> sharedQueue; public ProducerWithBlockingQueue (int amount, BlockingQueue<Integer> sharedQueue) {
this.amount = amount;
this.sharedQueue = sharedQueue;
} @Override
public void run() { try {
sharedQueue.put(amount);
System.out.println("produced: " + amount);
} catch (InterruptedException e) {
e.printStackTrace();
}
} } class ConsumerWithBlockingQueue implements Runnable{ private final BlockingQueue<Integer> sharedQueue; public ConsumerWithBlockingQueue (BlockingQueue<Integer> sharedQueue) {
this.sharedQueue = sharedQueue;
} @Override
public void run() {
try {
System.out.println("consumed: " + sharedQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
} }

解释:这种方法借助数据结构BlockingQueue(初始化好像应该放在构造函数里,暂时来不及改了),底层原理与signal/notifyAll类似,但代码实现就简洁了许多。

【总结】

在需要实现生产者-消费者模式的场景下,我们可以优先考虑用BlockingQueue来实现。

上一篇:Agilent RF fundamentals (5)


下一篇:20165210 Java第四周学习总结