Java并发之:生产者消费者问题

生产者消费者问题是Java并发中的常见问题之一,在实现时,一般可以考虑使用juc包下的BlockingQueue接口,至于具体使用哪个类,则就需要根据具体的使用场景具体分析了。本文主要实现一个生产者消费者的原型,以及实现一个生产者消费者的典型使用场景。

第一个问题:实现一个生产者消费者的原型。

 import java.util.concurrent.*;

 class Consumer implements Runnable {
BlockingQueue q = null; public Consumer(BlockingQueue q) {
this.q = q;
} @Override
public void run() {
while(true) {
try {
q.take();
System.out.println("Consumer has taken a product.");
}catch(InterruptedException e) { }
}
}
} class Producer implements Runnable {
BlockingQueue q = null; public Producer(BlockingQueue q) {
this.q = q;
} @Override
public void run() {
while(true) {
try { // note that if there is any chance that block, usually we need a InterruptedException
q.put(new Object());
System.out.println("Producer has puted a product.");
}catch(InterruptedException e) { }
}
} } public class JC_ProducerConsumerPrototype {
static int queueCapacity = 1024;
//static BlockingQueue<Object> q = new ArrayBlockingQueue<Object>(queueCapacity); // Can also compile
static BlockingQueue q = new ArrayBlockingQueue(queueCapacity); // ABQ must has a capacity
public static void main(String[] args) {
Thread t1 = new Thread(new Producer(q));
Thread t2 = new Thread(new Consumer(q));
t1.start();
t2.start();
} }

第二个问题,现在假设生产者是在读取磁盘上的多个log文件,对于每一个文件,依次读取文件中的每一行,也就是一条log记录;消费者需要读取并分析这些记录,假设消费者是计算密集型的。如何在生产者消费者原型的基础上实现这些功能?

这个场景在server端开发中是经常碰到的,因为在Server端,不可避免地会产生大量的日志文件。

 import java.util.concurrent.*;
import java.io.*;
import java.nio.*;
import java.nio.file.*;
import java.util.*;
import java.nio.charset.*; class Producer implements Runnable {
BlockingQueue q = null;
String fileName = null;
CountDownLatch latch = null; public Producer(BlockingQueue q,String fileName,CountDownLatch latch) {
this.q = q;
this.fileName = fileName;
this.latch = latch;
} @Override
public void run() {
Path path = Paths.get(".",fileName);
try{
List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8);
for(int i=lines.size();i>0;i--){
try{
q.put(lines.get(i));
}catch(InterruptedException e) { }
}
}catch(IOException e){ }
latch.countDown();
}
} class Consumer implements Runnable {
BlockingQueue<String> q = null;
Boolean done = false; public Consumer(BlockingQueue q,Boolean done){
this.q = q;
this.done = done;
} @Override
public void run(){
while(!done||q.size()!=0){
try{
q.take();
}catch(InterruptedException e){ }
}
}
} public class JC_ProducerConsumerHandlingLog{
public static int fileCount = 1024;
public static String[] fileNames = new String[fileCount];
public static int cpuCount = 8;
public static CountDownLatch latch = new CountDownLatch(fileCount);
public static volatile boolean done = false;
public static BlockingQueue<String> q = new LinkedBlockingQueue<String>(fileCount);//one thread for one file public static void main(String[] args){
for(int i=0;i<fileCount;i++){
Thread t = new Thread(new Producer(q,fileNames[i],latch));
t.start();
}
for(int i=0;i<cpuCount;i++){//for computing tasks, we don't need too many threads.
Thread t = new Thread(new Consumer(q,done));
t.start();
}
try{
latch.await();
done = true;
}catch(InterruptedException e){ } }
}

需要稍微注意一下线程数的选择,对于计算密集型的任务,我认为线程数达到cpu的核数比较合理(在不考虑超线程的情况下,也就是说一个核只有一个线程)。有不同意见欢迎跟我交流!

上一篇:复选框全选、全不选和反选的效果实现VIEW:1592


下一篇:PostgreSQL 8.1 中文文档