使用BlockingQueue的生产者消费者模式

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。

首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

使用BlockingQueue的生产者消费者模式

 

通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。

BlockingQueue的核心方法:

放入数据:

  offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.

  offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。

  put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。

获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入; 
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

测试代码:

package BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class BlockingQueueTest {
    public static void main(String args[]) throws InterruptedException{
        BlockingQueue<String> queue = new ArrayBlockingQueue(10);
        
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        
        ExecutorService service = Executors.newCachedThreadPool();
        
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer);
        
        Thread.sleep(10 * 1000);
        producer1.stop();
        producer2.stop();
        producer3.stop();
        
        Thread.sleep(2000);
        // 退出Executor
        service.shutdown();
    }
}

生产者:

package BlockingQueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


public class Producer implements Runnable{
    
    private volatile boolean      isRunning               = true;
    private BlockingQueue<String> queue;
    private static AtomicInteger  count                   = new AtomicInteger();
    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
        
    public Producer(BlockingQueue queue){
        this.queue = queue;
    }
    
    public void run(){
        String data = null;
        Random r = new Random();
        System.out.println("启动生产者线程");
        try{
            while(isRunning){
                System.out.println("正在生产数据.....");
                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                
                data = "data:" + count.incrementAndGet();
                System.out.println("将数据:" + data + "放入队列...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("放入数据失败:" + data);
                }
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        finally{
            System.out.println("退出生产者线程!");
        }
    }
    
    public void stop(){
        isRunning = false;    
    }
    
    
}

消费者:

package BlockingQueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable{
     private BlockingQueue<String> queue;
     private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;
    
    public Consumer(BlockingQueue<String> queue){
        this.queue = queue;
    }
    
    public void run(){
        System.out.println("启动消费者线程:");
        Random r = new Random();
        boolean isRunning = true;
        try{
            while(isRunning){
                System.out.println("正从队列获取数据...");
                String data = queue.poll(2,TimeUnit.SECONDS);
                if(null != data){
                     System.out.println("拿到数据:" + data);
                     System.out.println("正在消费数据:" + data);
                     Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
                }else{
                    isRunning = false;
                }
            }
        }catch(InterruptedException e){
             e.printStackTrace();
             Thread.currentThread().interrupt();
        }finally{
            System.out.println("退出消费者线程!");
        }
    }
}

 

 使用BlockingQueue的生产者消费者模式

参考:http://wsmajunfeng.iteye.com/blog/1629354

上一篇:修改eclipse3.7默认字体


下一篇:ArcEngine中COM对象与其基础RCW分开后就不能再使用