基于AIO的聊天室 (全future版)

Java网络通信中AIO使用,既可以在并发情况下减少cpu对内存的占用(异步非阻塞的AIO某种程度可以替代多线程),又相对于传统NIO编程更容易上手。官方提供了两种方式实现AIO的异步通信代码风格,一种是回调方式,另种是future方式。本文针对后者提供一个聊天室的案例。因为全future模式相对于handler更加简单灵活。具体代码如下:

 服务端:

package com.itheima.SocketAioBroad;
/*
* aio聊天服务端代码,请先启动服务端的控制台,再启动客户端控制台,
* Client,Client2,Client3均是客户端,模拟多人聊天室
*/
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

/*读任务节点类*/
class TaskNode
{
/*异步结果变量*/
private Future<Integer> taskResult;
/*缓存变量*/
private ByteBuffer buffer;
/*异步通道*/
private AsynchronousSocketChannel channel;
/*任务节点的构造方法*/
public TaskNode(Future<Integer> result,ByteBuffer buffer,AsynchronousSocketChannel channel)
{
this.taskResult = result;
this.buffer = buffer;
this.channel = channel;
}
/*释放任务节点*/
public void freeData()
{
this.buffer.clear();
this.buffer = null;
this.taskResult = null;
this.channel = null;
}

/*其他get,set方法不再赘述*/
public Future<Integer> getTaskResult() {
return taskResult;
}

public void setTaskResult(Future<Integer> taskResult) {
this.taskResult = taskResult;
}

public ByteBuffer getBuffer() {
return buffer;
}

public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}

public AsynchronousSocketChannel getChannel() {
return channel;
}

public void setChannel(AsynchronousSocketChannel channel) {
this.channel = channel;
}

}


public class Server2 {
/*端口*/
private final static int PORT = 9888;
/*编解码*/
private Charset charset = Charset.forName("UTF-8");
/*异步通道的服务器通道,接受异步通道的连接*/
private volatile AsynchronousServerSocketChannel server;
/*写缓存*/
private ByteBuffer buffer = ByteBuffer.allocate(1024);
/*废弃的变量*/
private ByteBuffer buffer2 = ByteBuffer.allocate(1024);
/*网上搜索的黑科技,请百度下这个CopyOnWriteArrayList,
* 是读写线程安全的并发列表,这里存储的是异步通道
*/
private volatile CopyOnWriteArrayList<AsynchronousSocketChannel> list;
/*废弃的变量*/
private volatile AtomicInteger nReadTimes;
/*同上安全的并发列表,存储的是读任务的节点*/
private volatile CopyOnWriteArrayList<TaskNode> readlist;

/*构造方法*/
public Server2() throws Exception
{
this.nReadTimes = new AtomicInteger(0);
this.setList(new CopyOnWriteArrayList<AsynchronousSocketChannel>());
this.setReadlist(new CopyOnWriteArrayList<TaskNode>());
this.setServer(AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT)));
}

/*核心方法一,服务器通道接受异步通道客户端的连接*/
public void accept() throws Exception
{
buffer.clear();
AsynchronousSocketChannel asc = null;
/*服务器循环接受客户端连接*/
while((asc = server.accept().get())!=null)
{
/*如果客户端通道列表没有包含,则添加连接到的客户端*/
if(!Server2.this.getList().contains(asc))
{
Server2.this.getList().add(asc);
/*给客户端的欢迎信息*/
String string = null;
try {
string = "欢迎"+asc.getRemoteAddress()+"来到聊天室,在线人数"+Server2.this.getList().size();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
/*循环发送给已经连接的客户端,
* 这里注意buffer的操作:
* clear,put,flip,write*/
for(AsynchronousSocketChannel temp:Server2.this.getList())
{
buffer.clear();
buffer.put((string).getBytes(charset));
buffer.flip();
try {
temp.write(buffer).get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
buffer.clear();
}
/*发送完欢迎信息等于宣告客户端与服务端连接完成,然后服务端等待客户端发来消息,并读取*/
/*这里是将异步读任务先存储起来*/
ByteBuffer rb = ByteBuffer.allocate(1024);
Future<Integer> ft = asc.read(rb);
Server2.this.readlist.add(new TaskNode(ft, rb, asc));
}
}

/*客户端连接服务端的线程,详见核心方法一*/
public void startFuture() throws Exception
{
new Thread(new Runnable() {

@Override
public void run()


{
// TODO Auto-generated method stub
try {
Server2.this.accept();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();

}


/*核心方法二,读线程处理读任务节点*/
public void readFuture() throws Exception
{
new Thread(new Runnable() {

@Override
public void run() {
// TODO Auto-generated method stub
while(true)
{
TaskNode curNode = null;
/*如果读任务的列表不为空*/
if(!Server2.this.readlist.isEmpty())
{
/*循环开始检测异步读任务是否完成,
* 因为是异步的如果用 task.get()>0判断
* 读任务正常完成会阻塞,这里用buffer.remaining判断*/
for(TaskNode task:Server2.this.readlist)
{
if(task.getBuffer().remaining()!=1024)
{
/*如果一个通道的读任务完成了,则进行标记*/
curNode = task;
break;
}

}

if(curNode!=null)
{
/*如果标记不为空,则取出读到的内容*/
String receivMsg = new String(curNode.getBuffer().array(),charset).trim();
/*声明要广播的内容变量*/
String wMsg = null;
/*获取标记已读节点的地址内容*/
try {
wMsg = "[客户端:" + curNode.getChannel().getRemoteAddress() + "]:";
} catch (IOException e) {
e.printStackTrace();
}
/*将标记已读节点从列表中删除*/
Server2.this.readlist.remove(curNode);
AsynchronousSocketChannel asc = curNode.getChannel();
/*如果读到的节点内容不是退出标示*/
if(!receivMsg.equals("exit"))
{
/*再构造一个相同客户端通道的读任务节点,并加入读列表*/
ByteBuffer rb = ByteBuffer.allocate(1024);
Future<Integer> ft = asc.read(rb);
Server2.this.readlist.add(new TaskNode(ft, rb, asc));
/*封装广播信息*/
wMsg = (wMsg + receivMsg).trim();
/*进行广播,注意传入的参数有个已读节点的通道地址,这样不会给自己广播*/
Server2.this.writeMsg(wMsg,asc);
}
/*如果读到的节点内容是退出标示*/
else
{
/*则无需再构造一个读节点,封装下线信息广播*/
wMsg = (wMsg + "已经下线,在线人数" + (Server2.this.getList().size()-1)).trim();
/*这里会将广播信息发送给本人,因为客户端本地要判断下线*/
Server2.this.writeMsg(wMsg,null);
/*如果通道下线,则移除*/
Server2.this.getList().remove(asc);
try {
/*关闭通道*/
asc.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/*释放已读节点*/
curNode.freeData();
curNode = null;
}
}
}
}
}).start();
}

/*广播消息,如果传入客户端通道,则排除,
* 不广播给自己;如果传入null,则广播给自己*/
public void writeMsg(String msg,AsynchronousSocketChannel except)
{
System.out.println(msg);
for(AsynchronousSocketChannel asc:this.getList())
{
if(except==null||!asc.equals(except))
{
buffer.clear();
buffer.put((msg).getBytes(charset));
buffer.flip();
try {
Integer tInteger = asc.write(buffer).get();
if(tInteger>0)
{
System.out.println("发送成功!!");
buffer.clear();
}
else
{
System.out.println("发送失败!!");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

/*主函数启动连接线程和读取异步节点的线程*/
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Server2 temp = new Server2();
temp.startFuture();
temp.readFuture();
}

/*其他set,get方法不再赘述*/
public Charset getCharset() {
return charset;
}

public void setCharset(Charset charset) {
this.charset = charset;
}

public AsynchronousServerSocketChannel getServer() {
return server;
}

public void setServer(AsynchronousServerSocketChannel server) {
this.server = server;
}

public AtomicInteger getnReadTimes() {
return nReadTimes;
}

public void setnReadTimes(AtomicInteger nReadTimes) {
this.nReadTimes = nReadTimes;
}

public static int getPort() {
return PORT;
}

public ByteBuffer getBuffer() {
return buffer;
}

public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}

public ByteBuffer getBuffer2() {
return buffer2;
}

public void setBuffer2(ByteBuffer buffer2) {
this.buffer2 = buffer2;
}

public void setList(CopyOnWriteArrayList<AsynchronousSocketChannel> list) {
this.list = list;
}

public void setReadlist(CopyOnWriteArrayList<TaskNode> readlist) {
this.readlist = readlist;
}

public CopyOnWriteArrayList<AsynchronousSocketChannel> getList() {
return list;
}

public CopyOnWriteArrayList<TaskNode> getReadlist() {
return readlist;
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 客户端:

package com.itheima.SocketAioBroad;

/*
* aio聊天客户端代码,请先启动服务端的控制台,再启动客户端控制台,
* Client,Client2,Client3均是客户端,模拟多人聊天室
*/
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;


public class Client {
/*读缓存*/
private ByteBuffer readBuf;
/*写缓存*/
private ByteBuffer writeBuf;
/*异步通道*/
private AsynchronousSocketChannel sc;
/*编解码*/
private Charset charset = Charset.forName("UTF-8");
/*读线程*/
private Thread inputThread;
/*构造方法:
*分配读写缓存的空间,打开异步通道并绑定地址
*异步通道发起连接,注意get是同步阻塞,不加get是异步
*/
public Client() throws Exception
{
this.readBuf = ByteBuffer.allocate(1024);
this.writeBuf = ByteBuffer.allocate(1024);
this.sc = AsynchronousSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 6666));
this.sc.connect(new InetSocketAddress("127.0.0.1", 9888)).get();
this.inputThread = null;
}

/*核心启动方法*/
public void startWork() throws Exception
{
/*先构造一个写的线程*/
this.inputThread = new Thread(new Runnable() {
public void run() {
/*构造一个输入扫描器*/
Scanner scanner = new Scanner(System.in);
while(true)
{
String sendText = scanner.nextLine();
/*注意写缓存的过程:clear,put,flip,write*/
writeBuf.clear();
writeBuf.put(charset.encode(sendText));
writeBuf.flip();
try {
/*异步写存在返回值,正常情况下大于0*/
Integer tInteger = sc.write(writeBuf).get();
if(tInteger>0)
{
//clearPrePositionData(writeBuf);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
System.out.println("[客户端:" + sc.getLocalAddress() + "]:" + sendText);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/*当输入exit时,表示将要退出,但是还没退出,先关闭写扫描器*/
if(sendText.equals("exit"))
{
scanner.close();
break;
}
}

}
});
/*写线程启动*/
this.inputThread.start();

this.readBuf.clear();

/*循环读*/
while(true)
{
/*同步读取数据,考量读的返回值是否大于0,大于0为正常,否则为异常*/
if(sc.read(this.readBuf).get()>0)
{
/*读取数据后放入变量*/
String receiveMsg = new String(this.readBuf.array(),charset).trim();
System.out.println(receiveMsg);
/*如果服务器端返回的数据包含本地地址,并且包含下线信息,
* 则确定应该是本地客户端下线,相对与输入exit关闭写扫描器,
* 这次是受到服务器的下线指令,真正下线*/
if(receiveMsg.contains(this.sc.getLocalAddress().toString().trim())&&receiveMsg.contains("已经下线"))
{
System.out.println("catch here");
break;
}
this.clearPrePositionData(this.readBuf);
}
}
/*关闭客户端的异步通道*/
this.sc.close();
}

/*网上抄袭的代码,据说可以只保存当前的数据而删除以前的数据,实测可用*/
public void clearPrePositionData(ByteBuffer byteBuffer) {
ByteBuffer buffer2 = byteBuffer.slice();
byteBuffer.clear();
if (buffer2.capacity() > 0) {

byteBuffer.put(buffer2);
} else {
byteBuffer.mark();
byteBuffer.reset();
}
byteBuffer.flip();
}

/*主方法调用*/
public static void main(String args[]) throws Exception {
new Client().startWork();
}


/*其他set,get不再赘述*/
public ByteBuffer getReadBuf() {
return readBuf;
}

public void setReadBuf(ByteBuffer readBuf) {
this.readBuf = readBuf;
}

public AsynchronousSocketChannel getSc() {
return sc;
}

public void setSc(AsynchronousSocketChannel sc) {
this.sc = sc;
}

public Thread getInputThread() {
return inputThread;
}

public void setInputThread(Thread inputThread) {
this.inputThread = inputThread;
}

public ByteBuffer getWriteBuf() {
return writeBuf;
}

public void setWriteBuf(ByteBuffer writeBuf) {
this.writeBuf = writeBuf;
}

public Charset getCharset() {
return charset;
}

public void setCharset(Charset charset) {
this.charset = charset;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

—————————————————————————The        End—————————————————————————————

 

上一篇:为什么Netty使用NIO而不是AIO


下一篇:一站式学习Java网络编程 全面理解BIO/NIO/AIO完整版