在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
代码示例:
public class Message {
private final String data;
public Message(String data) {
this.data = data;
}
public String getData(){
return data;
}
}
public class MessageQueue {
private static final int MAX_LIMIT = 100;
private final LinkedList<Message> queue;
private int limit ;
public MessageQueue() {
this(MAX_LIMIT);
}
public MessageQueue(int limit) {
this.limit = limit;
this.queue = new LinkedList<>();
}
public void put(Message message) throws InterruptedException {
synchronized (queue){
while (queue.size() > MAX_LIMIT){
queue.wait();
}
this.queue.addLast(message);
queue.notifyAll();
}
}
public Message get() throws InterruptedException {
synchronized (queue){
while (queue.isEmpty()){
queue.wait();
}
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public int getMaxLimit(){
return limit;
}
public int getSize(){
return queue.size();
}
}
public class ConsumerThread extends Thread {
private final MessageQueue queue;
public ConsumerThread(int index,MessageQueue queue) {
super("consumer - " + index);
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
Message message = queue.get();
System.out.println(getName() + " get message is " + message.getData());
sleep(1000);
} catch (InterruptedException e) {
break;
}
}
}
}
public class ProducerThread extends Thread {
private final MessageQueue queue ;
private static final AtomicInteger integer = new AtomicInteger();
public ProducerThread(int i,MessageQueue queue) {
super("PRODUCER - " + i);
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
Message message = new Message("message " + integer.incrementAndGet());
queue.put(message);
System.out.println(getName() + " put message is " + message.getData());
sleep(1000);
} catch (InterruptedException e) {
break;
}
}
}
}
public class ClientTest {
public static void main(String[] args){
final MessageQueue queue = new MessageQueue();
IntStream.range(1,5).forEach((i)-> new ProducerThread(i,queue).start());
IntStream.range(1,3).forEach((i)->
new ConsumerThread(i,queue).start()
);
}
}