在研究Smack的源码的时候,我对它的连接Connection以及派生类XMPPConnection的关注是最多的,由于一个即时通信程序,它的网络模块必是它的核心。
而我非常在乎它是怎样实现的。
在收发数据包的时候,我看到了队列的身影。BlockingQueue和ArrayBlockingQueue。所以,我认为用到什么然后去查阅。去记录,这样的方法是比較高效率的。
BlockingQueue是在Java的新的Concurrent包中的。
Reference:
http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html
在新增的Concurrent包中。BlockingQueue非常好的攻克了多线程中。怎样高效安全“传输”数据的问题。通过这些高效而且线程安全的队列类。为我们高速搭建高质量的多线程程序带来极大的便利。本文具体介绍了BlockingQueue家庭中的全部成员,包含他们各自的功能以及常见使用场景。
-
认识BlockingQueue
堵塞队列。顾名思义,首先它是一个队列。而一个队列在数据结构中所起的作用大致例如以下图所看到的:
从上图我们能够非常清楚看到。通过一个共享的队列,能够使得数据由队列的一端输入。从另外一端输出;
经常使用的队列主要有下面两种:(当然通过不同的实现方式,还能够延伸出非常多不同类型的队列,DelayQueue就是当中的一种)
先进先出(FIFO):先插入的队列的元素也最先出队列。类似于排队的功能。从某种程度上来说这样的队列也体现了一种公平性。后进先出(LIFO):后插入队列的元素最先出队列。这样的队列优先处理近期发生的事件。
多线程环境中,通过队列能够非常easy实现数据共享,比方经典的“生产者”和“消费者”模型中,通过队列能够非常便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。假设生产者线程须要把准备好的数据共享给消费者线程。利用队列的方式来传递数据,就能够非常方便地解决他们之间的数据共享问题。但假设生产者和消费者在某个时间段内。万一发生数据处理速度不匹配的情况呢?理想情况下,假设生产者产出数据的速度大于消费者消费的速度,而且当生产出来的数据累积到一定程度的时候。那么生产者必须暂停等待一下(堵塞生产者线程)。以便等待消费者线程把累积的数据处理完成,反之亦然。然而,在concurrent包公布曾经。在多线程环境下,我们每一个程序猿都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时。强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓堵塞。在某些情况下会挂起线程(即堵塞)。一旦条件满足。被挂起的线程又会自己主动被唤醒)
以下两幅图演示了BlockingQueue的两个常见堵塞场景:
如上图所看到的:当队列中没有数据的情况下。消费者端的全部线程都会被自己主动堵塞(挂起)。直到有数据放入队列。
如上图所看到的:当队列中填满数据的情况下,生产者端的全部线程都会被自己主动堵塞(挂起),直到队列中有空的位置,线程被自己主动唤醒。
这也是我们在多线程环境下,为什么须要BlockingQueue的原因。作为BlockingQueue的使用者。我们再也不须要关心什么时候须要堵塞线程,什么时候须要唤醒线程。由于这一切BlockingQueue都给你一手包办了。既然BlockingQueue如此神通广大,让我们一起来见识下它的经常用法:
BlockingQueue的核心方法:
放入数据:
offer(anObject):表示假设可能的话,将anObject加到BlockingQueue里,即假设BlockingQueue能够容纳,
则返回true,否则返回false.(本方法不堵塞当前运行方法的线程)
offer(E o, long timeout, TimeUnit unit),能够设定等待的时间。假设在指定的时间内,还不能往队列中
增加BlockingQueue。则返回失败。
put(anObject):把anObject加到BlockingQueue里,假设BlockQueue没有空间,则调用此方法的线程被阻断
直到BlockingQueue里面有空间再继续.
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能马上取出,则能够等time參数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,假设在指定时间内,
队列一旦有数据可取,则马上返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被增加;
drainTo():一次性从BlockingQueue获取全部可用的数据对象(还能够指定获取数据的个数),
通过该方法。能够提升获取数据效率。不须要多次分批加锁或释放锁。
-
常见BlockingQueue
在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue家庭大致有哪些成员?
- BlockingQueue成员具体介绍
1. ArrayBlockingQueue
基于数组的堵塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个经常使用的堵塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量。分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象。由此也意味着两者无法真正并行执行。这点尤其不同于LinkedBlockingQueue。依照实现原理来分析。ArrayBlockingQueue全然能够採用分离锁,从而实现生产者和消费者操作的全然并行执行。Doug Lea之所以没这样去做,或许是由于ArrayBlockingQueue的数据写入和获取操作已经足够轻巧。以至于引入独立的锁机制。除了给代码带来额外的复杂性外。其在性能上全然占不到不论什么廉价。
ArrayBlockingQueue和LinkedBlockingQueue间另一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁不论什么额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内须要高效并发地处理大批量数据的系统中。其对于GC的影响还是存在一定的差别。而在创建ArrayBlockingQueue时,我们还能够控制对象的内部锁是否採用公平锁。默认採用非公平锁。
2. LinkedBlockingQueue
基于链表的堵塞队列。同ArrayListBlockingQueue类似。其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部。而生产者马上返回。仅仅有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue能够通过构造函数指定该值)。才会堵塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于相同的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还由于其对于生产者端和消费者端分别採用了独立的锁来控制数据同步。这也意味着在高并发的情况下生产者和消费者能够并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发人员,我们须要注意的是,假设构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这种话,假设生产者的速度一旦大于消费者的速度,或许还没有等到队列满堵塞产生,系统内存就有可能已被消耗殆尽了。ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最经常使用的堵塞队列。普通情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
以下的代码演示了怎样使用BlockingQueue:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; /**
* @author jackyuj
*/
public class BlockingQueueTest { public static void main(String[] args) throws InterruptedException {
// 声明一个容量为10的缓存队列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue); // 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer); // 运行10s
Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop(); Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; /**
* 消费者线程
*
* @author jackyuj
*/
public class Consumer implements Runnable { public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
} public void run() {
System.out.println("启动消费者线程! ");
Random r = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正从队列获取数据...");
String data = queue.poll(2, TimeUnit.SECONDS);
if (null != data) {
System.out.println("拿到数据:" + data);
System.out.println("正在消费数据:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超过2s还没数据。觉得全部生产线程都已经退出,自己主动退出消费线程。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程! ");
}
} private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
} import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; /**
* 生产者线程
*
* @author jackyuj
*/
public class Producer implements Runnable { public Producer(BlockingQueue queue) {
this.queue = queue;
} public void run() {
String data = null;
Random r = new Random(); System.out.println("启动生产者线程!");
try {
while (isRunning) {
System.out.println("正在生产数据...");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入数据失败:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生产者线程!");
}
} public void stop() {
isRunning = false;
} private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; } - 3 DelayQueue
- DelayQueue中的元素仅仅有当其指定的延迟时间到了,才可以从队列中获取到该元素。DelayQueue是一个没有限制大小的队列,因此往队列中插入数据的操作(生产者)永远不会被堵塞。而仅仅有获取数据的操作(消费者)才会被堵塞。
- 使用场景:
DelayQueue使用场景较少。但都相当巧妙,常见的样例比方使用一个DelayQueue来管理一个超时未响应的连接队列。4. PriorityBlockingQueue
基于优先级的堵塞队列(优先级的推断通过构造函数传入的Compator对象来决定)。但须要注意的是PriorityBlockingQueue并不会堵塞数据生产者,而仅仅会在没有可消费的数据时。堵塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度。否则时间一长,会终于耗尽全部的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁採用的是公平锁。
5. SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易。有点像原始社会中的生产者和消费者。生产者拿着产品去集市销售给产品的终于消费者,而消费者必须亲自去集市找到所要商品的直接生产者,假设一方没有找到合适的目标。那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),假设有经销商。生产者直接把产品批发给经销商,而无需在意经销商终于会将这些产品卖给那些消费者。由于经销商能够库存一部分商品。因此相对于直接交易模式,整体来说採用中间经销商的模式会吞吐量高一些(能够批量买卖);但还有一方面,又由于经销商的引入,使得产品从生产者到消费者中间添加了额外的交易环节。单个产品的及时响应性能可能会减少。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的差别:
假设採用公平模式:SynchronousQueue会採用公平锁,并配合一个FIFO队列来堵塞多余的生产者和消费者,从而体系总体的公平策略。
但假设是非公平模式(SynchronousQueue默认):SynchronousQueue採用非公平锁,同一时候配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,假设生产者和消费者的处理速度有差距,则非常easy出现饥渴的情况,就可以能有某些生产者或者是消费者的数据永远都得不到处理。
- 小结
BlockingQueue不光实现了一个完整队列所具有的基本功能,同一时候在多线程环境下,他还自己主动管理了多线间的自己主动等待于唤醒功能,从而使得程序猿能够忽略这些细节,关注更高级的功能。 - 最后我们给出Smack中对ArrayBlockingQueue的使用的一个案例:
package org.jivesoftware.smack; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit; import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.packet.Packet; /**
* Provides a mechanism to collect packets into a result queue that pass a
* specified filter. The collector lets you perform blocking and polling
* operations on the result queue. So, a PacketCollector is more suitable to
* use than a {@link PacketListener} when you need to wait for a specific
* result.<p>
*
* Each packet collector will queue up a configured number of packets for processing before
* older packets are automatically dropped. The default number is retrieved by
* {@link SmackConfiguration#getPacketCollectorSize()}.
*
* @see Connection#createPacketCollector(PacketFilter)
* @author Matt Tucker
*/
public class PacketCollector { private PacketFilter packetFilter;
private ArrayBlockingQueue<Packet> resultQueue;
private Connection connection;
private boolean cancelled = false; /**
* Creates a new packet collector. If the packet filter is <tt>null</tt>, then
* all packets will match this collector.
*
* @param conection the connection the collector is tied to.
* @param packetFilter determines which packets will be returned by this collector.
*/
protected PacketCollector(Connection conection, PacketFilter packetFilter) {
this(conection, packetFilter, SmackConfiguration.getPacketCollectorSize());
} /**
* Creates a new packet collector. If the packet filter is <tt>null</tt>, then
* all packets will match this collector.
*
* @param conection the connection the collector is tied to.
* @param packetFilter determines which packets will be returned by this collector.
* @param maxSize the maximum number of packets that will be stored in the collector.
*/
protected PacketCollector(Connection conection, PacketFilter packetFilter, int maxSize) {
this.connection = conection;
this.packetFilter = packetFilter;
this.resultQueue = new ArrayBlockingQueue<Packet>(maxSize);
} /**
* Explicitly cancels the packet collector so that no more results are
* queued up. Once a packet collector has been cancelled, it cannot be
* re-enabled. Instead, a new packet collector must be created.
*/
public void cancel() {
// If the packet collector has already been cancelled, do nothing.
if (!cancelled) {
cancelled = true;
connection.removePacketCollector(this);
}
} /**
* Returns the packet filter associated with this packet collector. The packet
* filter is used to determine what packets are queued as results.
*
* @return the packet filter.
*/
public PacketFilter getPacketFilter() {
return packetFilter;
} /**
* Polls to see if a packet is currently available and returns it, or
* immediately returns <tt>null</tt> if no packets are currently in the
* result queue.
*
* @return the next packet result, or <tt>null</tt> if there are no more
* results.
*/
public Packet pollResult() {
return resultQueue.poll();
} /**
* Returns the next available packet. The method call will block (not return)
* until a packet is available.
*
* @return the next available packet.
*/
public Packet nextResult() {
try {
return resultQueue.take();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
} /**
* Returns the next available packet. The method call will block (not return)
* until a packet is available or the <tt>timeout</tt> has elapased. If the
* timeout elapses without a result, <tt>null</tt> will be returned.
*
* @param timeout the amount of time to wait for the next packet (in milleseconds).
* @return the next available packet.
*/
public Packet nextResult(long timeout) {
try {
return resultQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
} /**
* Processes a packet to see if it meets the criteria for this packet collector.
* If so, the packet is added to the result queue.
*
* @param packet the packet to process.
*/
protected void processPacket(Packet packet) {
if (packet == null) {
return;
} if (packetFilter == null || packetFilter.accept(packet)) {
while (!resultQueue.offer(packet)) {
// Since we know the queue is full, this poll should never actually block.
resultQueue.poll();
}
}
}
}