http://www.cnblogs.com/archy_yu/archive/2013/04/19/3018479.html
随笔- 92 文章- 0 评论- 825
谈谈java的BlockingQueue
最近在维护一个java工程,在群里面也就聊起来java的优劣!无奈一些Java的终极粉丝,总是号称性能已经不必C++差,并且很多标准类库都是大师级的人写的,如何如何稳定等等。索性就认真研究一番,他们给我的一项说明就是,在线程之间投递消息,用java已经封装好的BlockingQueue,就足够用了。
既然足够用那就写代码测试喽,简简单单写一个小程序做了一番测试:
//默认包 import java.util.concurrent.*;
import base.MyRunnable;
public class Test
{ public static void main(String[] args)
{
BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
java.lang.Runnable r = new MyRunnable(queue);
Thread t = new Thread(r);
t.start();
while ( true )
{
try
{
while ( true )
{
for ( int i = 0 ;i < 10000 ;i++)
{
queue.offer(i);
}
}
}
catch ( Exception e)
{
e.printStackTrace();
}
}
}
} //需要添加的包 package base;
import java.lang.Runnable;
import java.util.concurrent.*;
import java.util.*;
public class MyRunnable implements Runnable
{ public MyRunnable(BlockingQueue<Integer> queue)
{
this .queue = queue;
}
public void run()
{
Date d = new Date();
long starttime = d.getTime();
System.err.println(starttime);
int count = 0 ;
while ( true )
{
try
{
Integer i = this .queue.poll();
if (i != null )
{
count ++;
}
if (count == 100000 )
{
Date e = new Date();
long endtime = e.getTime();
System.err.println(count);
System.err.println(endtime);
System.err.print(endtime - starttime);
break ;
}
}
catch (Exception e)
{
}
}
}
private BlockingQueue<Integer> queue;
} |
传递十万条数据,在我的测试机上面,大概需要50ms左右,倒是还可以!索性就看了一下BlockingQueue的底层实现
我在上面的测试代码中使用的offer 和 poll,就看看这两个实现函数吧,首先是offer
public E poll() {
final AtomicInteger count = this .count;
if (count.get() == 0 )
return null ;
E x = null ;
int c = - 1 ;
final ReentrantLock takeLock = this .takeLock;
takeLock.lock();
try {
if (count.get() > 0 ) {
x = extract();
c = count.getAndDecrement();
if (c > 1 )
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
|
和一般的同步线程类似,只是多加了一个signal,在学习unix环境高级编程时候,看到条件变量用于线程之间的同步,可以实现线程以竞争的方式实现同步!
poll函数的实现也是类似!
public boolean offer(E e) {
if (e == null ) throw new NullPointerException();
final AtomicInteger count = this .count;
if (count.get() == capacity)
return false ;
int c = - 1 ;
final ReentrantLock putLock = this .putLock;
putLock.lock();
try {
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0 )
signalNotEmpty();
return c >= 0 ;
}
|