BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。
首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。
BlockingQueue的核心方法:
放入数据:
offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.
offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
测试代码:
package BlockingQueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class BlockingQueueTest { public static void main(String args[]) throws InterruptedException{ BlockingQueue<String> queue = new ArrayBlockingQueue(10); Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); Thread.sleep(10 * 1000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); // 退出Executor service.shutdown(); } }
生产者:
package BlockingQueue; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable{ private volatile boolean isRunning = true; private BlockingQueue<String> queue; private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Producer(BlockingQueue queue){ this.queue = queue; } public void run(){ String data = null; Random r = new Random(); System.out.println("启动生产者线程"); try{ while(isRunning){ System.out.println("正在生产数据....."); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet(); System.out.println("将数据:" + data + "放入队列..."); if (!queue.offer(data, 2, TimeUnit.SECONDS)) { System.out.println("放入数据失败:" + data); } } }catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally{ System.out.println("退出生产者线程!"); } } public void stop(){ isRunning = false; } }
消费者:
package BlockingQueue; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class Consumer implements Runnable{ private BlockingQueue<String> queue; private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Consumer(BlockingQueue<String> queue){ this.queue = queue; } public void run(){ System.out.println("启动消费者线程:"); Random r = new Random(); boolean isRunning = true; try{ while(isRunning){ System.out.println("正从队列获取数据..."); String data = queue.poll(2,TimeUnit.SECONDS); if(null != data){ System.out.println("拿到数据:" + data); System.out.println("正在消费数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); }else{ isRunning = false; } } }catch(InterruptedException e){ e.printStackTrace(); Thread.currentThread().interrupt(); }finally{ System.out.println("退出消费者线程!"); } } }
参考:http://wsmajunfeng.iteye.com/blog/1629354