客户端使用Java的阻塞IO
服务端使用Java的非阻塞NIO
package com.nio.echo; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Scanner; /**
* @author 作者 E-mail:
* @version 创建时间:2015-10-29 下午02:49:47 类说明
*/
public class EchoClient
{
public static final String REMOT_IP = "127.0.0.1"; public static final int REMOTE_PORT = 8080; public void connectServer() throws IOException
{
Socket socket = new Socket(); socket.connect(new InetSocketAddress(REMOT_IP, REMOTE_PORT)); if (socket.isConnected())
{
System.out.println("connect remote address success");
} // 启动线程监听server端消息
new Thread(new client2server(socket)).start();
Scanner scanner = new Scanner(System.in); OutputStream output = socket.getOutputStream();
while (true)
{
String str = scanner.nextLine(); if (str.equals("quit"))
{
socket.close();
break;
}
output.write(str.getBytes("UTF-8")); } } public static void main(String[] args) throws IOException
{
new EchoClient().connectServer();
}
} class client2server implements Runnable
{
private Socket socket = null; public client2server(Socket socket)
{
this.socket = socket;
} @Override
public void run()
{
InputStream inputStream;
try
{
inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
while (true)
{
int num = inputStream.read(bytes);
if (num != -1)
{
System.out.print(num + " ");
}
else
{
System.out.println("server is shutup");
break;
} String str = new String(bytes, 0, num, "UTF-8");
System.out.println("get data: " + str); }
}
catch(IOException e)
{
e.printStackTrace();
} }
}
package com.nio.echo; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set; /**
* @author 作者 E-mail:
* @version 创建时间:2015-10-29 下午02:49:12 类说明
*/
public class NIOEchoServer
{
private static ServerSocketChannel ssc = null; private static Selector selector = null; private static final int PORT = 8080; public static void startServer() throws IOException
{
ssc = ServerSocketChannel.open();
selector = Selector.open();
ssc.configureBlocking(false); // nio 对socket 和serverSocket进行了怎样封装
final ServerSocket serverSocket = ssc.socket(); serverSocket.bind(new InetSocketAddress(PORT));
serverSocket.setReuseAddress(true); final AcceptHandler acceptHandler = new AcceptHandler();
ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);
while (true)
{
int n = selector.select();
if (n == 0)
continue; final Set<SelectionKey> readyKeys = selector.selectedKeys();
final Iterator<SelectionKey> it = readyKeys.iterator();
while (it.hasNext())
{
final SelectionKey key = it.next();
final Handle handler = (Handle) key.attachment();
handler.doHandle(key);
it.remove();
}
}
} public static void main(String[] args) throws IOException
{
NIOEchoServer.startServer();
}
} interface Handle
{
void doHandle(SelectionKey key) throws IOException;
} class AcceptHandler implements Handle
{ @Override
public void doHandle(SelectionKey key) throws IOException
{ final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
final SocketChannel sc = ssc.accept();
final IOHandler handler = new IOHandler(key.selector(), sc);
System.out.println("server: connect success");
} } class IOHandler implements Handle
{
private final ByteBuffer readBuffer = ByteBuffer.allocate(1024); private OutputBuffer outputBuffer = new OutputBuffer(); private SocketChannel socketChannel = null; // private Selector selector = null; private SelectionKey key = null; public IOHandler(Selector selector, SocketChannel sc) throws IOException
{
// this.selector = selector;
this.socketChannel = sc;
socketChannel.configureBlocking(false);
key = socketChannel.register(selector, SelectionKey.OP_READ, this);
} /**
* 增加输出缓存
*
* @param writeData
* 要写出的数据
* @throws IOException
* @return 返回处理的字节数
*/
private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException
{
int prevPositon = bytebuffer.position();
outputBuffer.size += num; outputBuffer.writeBuffer.put(bytebuffer).flip();
int nowPosition = bytebuffer.position(); this.interestOps(0, SelectionKey.OP_WRITE); return nowPosition - prevPositon;
} /**
* 增加删除相应事件
*
* @param remove
* @param add
*/
private void interestOps(int remove, int add)
{
int cur = key.interestOps();
int ops = (cur & ~remove) | add;
if (cur != ops)
{
key.interestOps(ops);
key.selector().wakeup();
}
} /**
* ByteBuffer 转换 String
*
* @param buffer
* @return
*/
public static String getString(ByteBuffer buffer)
{
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
try
{
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
// charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
return charBuffer.toString();
}
catch(Exception ex)
{
ex.printStackTrace();
return "";
}
} @Override
public void doHandle(SelectionKey key) throws IOException
{
if (key.isReadable())
{
System.out.print("server: meet read event ,before read position = " + readBuffer.position()); int num = socketChannel.read(readBuffer); // 关闭
if (num == -1)
{
System.out.println("close the channel ");
key.channel();
key.channel().close();
return;
} // 将position置为0
readBuffer.flip(); System.out.print(" reveive data " + getString(readBuffer)); int dealsize = addWriteBuffer(readBuffer, num); System.out.println(" write to writeBuffer size = " + dealsize + " nowPostion = " + readBuffer.position()); // 将处理过的数据清除
readBuffer.compact();
}
else if (key.isWritable())
{
System.out.print("meet write event"); long num = socketChannel.write(outputBuffer.writeBuffer);
outputBuffer.size -= num; System.out.print("deal size = " + num + "left buffer size = " + outputBuffer.size);
if (outputBuffer.size == 0)
{
System.out.println(" deal over,cancel write event");
interestOps(SelectionKey.OP_WRITE, 0);
}
// 清除已经处理过的数据
outputBuffer.writeBuffer.compact(); }
}
} class OutputBuffer
{
public int size; public final ByteBuffer writeBuffer = ByteBuffer.allocate(1024); }
ByteBuffer没有提供有用数据的相关方法,只能自己写一个OutputBuffer来辅助处理
之前OutputBuffer只是封装了一个ByteBuffer以及一个size变量用于标示可以数据量
下面对OutputBuffer进行了重构,将size变量的修改以及数据的写入和写出操作都封装到方法中,其中output(SocketChannel socketChannel)
方法利用回调的思想,将socketChannel对象传入,在OutputBuffer当中实现数据的write输出
package com.nio.echo; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set; /**
* @author 作者 E-mail:
* @version 创建时间:2015-10-29 下午02:49:12 类说明
*/
public class NIOEchoServer
{
private static ServerSocketChannel ssc = null; private static Selector selector = null; private static final int PORT = 8080; public static void startServer() throws IOException
{
ssc = ServerSocketChannel.open();
selector = Selector.open();
ssc.configureBlocking(false); // nio 对socket 和serverSocket进行了怎样封装
final ServerSocket serverSocket = ssc.socket(); serverSocket.bind(new InetSocketAddress(PORT));
serverSocket.setReuseAddress(true); final AcceptHandler acceptHandler = new AcceptHandler();
ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);
while (true)
{
int n = selector.select();
if (n == 0)
continue; final Set<SelectionKey> readyKeys = selector.selectedKeys();
final Iterator<SelectionKey> it = readyKeys.iterator();
while (it.hasNext())
{
final SelectionKey key = it.next();
final Handle handler = (Handle) key.attachment();
handler.doHandle(key);
it.remove();
}
}
} public static void main(String[] args) throws IOException
{
NIOEchoServer.startServer();
}
} interface Handle
{
void doHandle(SelectionKey key) throws IOException;
} class AcceptHandler implements Handle
{ @Override
public void doHandle(SelectionKey key) throws IOException
{ final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
final SocketChannel sc = ssc.accept();
final IOHandler handler = new IOHandler(key.selector(), sc);
System.out.println("server: connect success");
} } class IOHandler implements Handle
{
private final ByteBuffer readBuffer = ByteBuffer.allocate(1024); private OutputBuffer outputBuffer = new OutputBuffer(); private SocketChannel socketChannel = null; // private Selector selector = null; private SelectionKey key = null; public IOHandler(Selector selector, SocketChannel sc) throws IOException
{
// this.selector = selector;
this.socketChannel = sc;
socketChannel.configureBlocking(false);
key = socketChannel.register(selector, SelectionKey.OP_READ, this);
} /**
* 增加输出缓存
*
* @param writeData
* 要写出的数据
* @throws IOException
* @return 返回处理的字节数
*/
private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException
{
int prevPositon = bytebuffer.position(); outputBuffer.put(bytebuffer, num); int nowPosition = bytebuffer.position(); this.interestOps(0, SelectionKey.OP_WRITE); return nowPosition - prevPositon;
} /**
* 增加删除相应事件
*
* @param remove
* @param add
*/
private void interestOps(int remove, int add)
{
int cur = key.interestOps();
int ops = (cur & ~remove) | add;
if (cur != ops)
{
key.interestOps(ops);
key.selector().wakeup();
}
} /**
* ByteBuffer 转换 String
*
* @param buffer
* @return
*/
public static String getString(ByteBuffer buffer)
{
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
try
{
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
// charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
return charBuffer.toString();
}
catch(Exception ex)
{
ex.printStackTrace();
return "";
}
} @Override
public void doHandle(SelectionKey key) throws IOException
{
if (key.isReadable())
{
System.out.print("server: meet read event ,before read position = " + readBuffer.position()); int num = socketChannel.read(readBuffer); // 关闭
if (num == -1)
{
System.out.println("close the channel ");
key.channel();
key.channel().close();
return;
} // 将position置为0
readBuffer.flip(); System.out.println(" reveive data " + getString(readBuffer)); int dealsize = addWriteBuffer(readBuffer, num); // 将处理过的数据清除
readBuffer.compact();
}
else if (key.isWritable())
{
System.out.print("meet write event"); // 写数据
outputBuffer.output(socketChannel); if (outputBuffer.size() == 0)
{
System.out.println(" deal over,cancel write event");
interestOps(SelectionKey.OP_WRITE, 0);
} }
}
} class OutputBuffer
{
private int size; private final ByteBuffer writeBuffer = ByteBuffer.allocate(1024); public void output(SocketChannel socketChannel) throws IOException
{
int num = socketChannel.write(writeBuffer);
writeBuffer.compact();
size -= num;
} public void put(ByteBuffer b, int num)
{
writeBuffer.put(b).flip();
this.size += num;
} public int size()
{
return this.size;
}
}
事实上在NIO网络编程中,写出数据的操作需要加入缓存才能保证效率,目的是为了写操作发生的时候不影响业务继续send消息,首先将send消息发送过来的数据缓存到A中,在写事件发生的时候将A中数据写出(此时仅短暂锁住A,将A中引用拿出,重新赋值新引用给A),这样写事件的处理过程和业务消息的send就可以高并发的进行。