原文地址:
https://my.oschina.net/tangcoffee/blog/305656
参考文档:
http://my.oschina.net/u/862897/blog/164425
http://my.oschina.net/cshbbrain/blog/87076
http://my.oschina.net/bluesky0leon/blog/132361
http://blog.csdn.net/caiwenfeng_for_23/article/details/8458299
aio(或者叫nio2 ?) jdk1.7的新特性,代码上比nio写着舒服,但是性能貌似没比nio强。。。
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.apache.log4j.Logger; public class AioServer implements Runnable{
final static Logger logger = Logger.getLogger(AioServer.class);
Object lock = new Object();
InetSocketAddress serverAddress = null;
int backlog = 0;
int buff_size = 1024;
int threadPoolSize = 0; public AioServer(int port){
this.serverAddress = new InetSocketAddress(port);
initialization();
} public AioServer(String ip,int port){
this.serverAddress = new InetSocketAddress(ip,port);
initialization();
} void initialization(){
threadPoolSize = threadPoolSize>0? threadPoolSize: Runtime.getRuntime().availableProcessors();
} @Override
public void run() {
try {
logger.info("aioserver threadPoolSize:"+this.threadPoolSize);
ExecutorService threadPool = Executors.newFixedThreadPool(this.threadPoolSize);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool);
final AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(channelGroup);
if(this.backlog>0){ assc.bind(serverAddress,this.backlog); }
else { assc.bind(serverAddress); }
logger.info("aioserver listen:"+this.serverAddress);
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel,Object>(){
@Override
public void completed(AsynchronousSocketChannel result,
Object attachment) {
assc.accept(null, this);
handler(result,attachment);
} @Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}); synchronized(lock){
lock.wait();
}
channelGroup.shutdownNow();
logger.info("aioserver shutdownC.");
} catch (Exception e) {
e.printStackTrace();
}
} static byte[] echo = "done.".getBytes();
static int connCount = 1;
void handler(AsynchronousSocketChannel conn,Object att){
try{
// logger.info("connect server :"+connCount++);
ByteBuffer buff = ByteBuffer.allocate(this.buff_size);
buff.clear(); int rl = conn.read(buff).get();
buff.flip();
logger.info("recv "+rl+": "+new String(buff.array(),0,rl)); buff.clear(); //清空buff数据
buff.put(echo);
buff.flip();
int wl = conn.write(buff).get();
logger.info("send "+wl);
conn.close();
}catch(Exception ex){
ex.printStackTrace();
}
} public void setThreadPoolSize(int threadPoolSize){
this.threadPoolSize = threadPoolSize;
} public void setBacklog(int backlog){
this.backlog = backlog;
} public void shutdown(){
//logger.info("call shutdown()");
synchronized(lock){
lock.notifyAll();
}
}
}
AioTest1.java
static void t3(){
AioServer aiose = new AioServer(9777);
//线程模式启动
new Thread(aiose).start();;
//非线程模式启动
// aiose.run();
try {
Thread.sleep(1000*60*5);
//3秒后关闭
aiose.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}