转载:http://blog.csdn.net/clarkkentyang/article/details/52529785
第一章(略)
第二章 NIO入门
2.1传统的BIO编程(同步阻塞I/O服务端通信模型【一客户一线程】)
网络编程的基本模型:Client/Server模型,也就是2个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。
传统同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。
缺点:缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,由于线程是Java虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。
服务端代码:
- <span style="white-space:pre"> </span>public static void main(String[] args) throws IOException {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默认值
- }
- }
- ServerSocket server = null;
- try {
- server = new ServerSocket(port);
- System.out.println("The time server is start in port : " + port);
- Socket socket = null;
- while (true) {
- socket = server.accept();
- new Thread(new TimeServerHandler(socket)).start();
- }
- } finally {
- if (server != null) {
- System.out.println("The time server close");
- server.close();
- server = null;
- }
- }
- }
TimeServerHandler代码:
- public class TimeServerHandler implements Runnable {
- private Socket socket;
- public TimeServerHandler(Socket socket) {
- this.socket = socket;
- }
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- BufferedReader in = null;
- PrintWriter out = null;
- try {
- in = new BufferedReader(new InputStreamReader(
- this.socket.getInputStream()));
- out = new PrintWriter(this.socket.getOutputStream(), true);
- String currentTime = null;
- String body = null;
- while (true) {
- body = in.readLine();
- if (body == null)
- break;
- System.out.println("The time server receive order : " + body);
- currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
- System.currentTimeMillis()).toString() : "BAD ORDER";
- out.println(currentTime);
- }
- } catch (Exception e) {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- if (out != null) {
- out.close();
- out = null;
- }
- if (this.socket != null) {
- try {
- this.socket.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- this.socket = null;
- }
- }
- }
- }
客户端代码:
- <span style="white-space:pre"> </span>public static void main(String[] args) {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默认值
- }
- }
- Socket socket = null;
- BufferedReader in = null;
- PrintWriter out = null;
- try {
- socket = new Socket("127.0.0.1", port);
- in = new BufferedReader(new InputStreamReader(
- socket.getInputStream()));
- out = new PrintWriter(socket.getOutputStream(), true);
- out.println("QUERY TIME ORDER");
- System.out.println("Send order 2 server succeed.");
- String resp = in.readLine();
- System.out.println("Now is : " + resp);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (out != null) {
- out.close();
- out = null;
- }
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- in = null;
- }
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- socket = null;
- }
- }
- }
2.2伪异步I/O编程
在BIO的基础上进行优化,后端通过一个线程池来处理多个客户端的请求接入,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。
服务端代码:
- public static void main(String[] args) throws IOException {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默认值
- }
- }
- ServerSocket server = null;
- try {
- server = new ServerSocket(port);
- System.out.println("The time server is start in port : " + port);
- Socket socket = null;
- TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(
- 50, 10000);// 创建IO任务线程池
- while (true) {
- socket = server.accept();
- singleExecutor.execute(new TimeServerHandler(socket));
- }
- } finally {
- if (server != null) {
- System.out.println("The time server close");
- server.close();
- server = null;
- }
- }
- }
连接池代码:
- public class TimeServerHandlerExecutePool {
- private ExecutorService executor;
- public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
- executor = new ThreadPoolExecutor(Runtime.getRuntime()
- .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS,
- new ArrayBlockingQueue<java.lang.Runnable>(queueSize));
- }
- public void execute(java.lang.Runnable task) {
- executor.execute(task);
- }
- }
优点:避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。
缺点:但是由于它底层的通信依然采用同步阻塞模型,因此无法从跟本上解决问题。
2.3 NIO编程
NIO为New I/O的简称,也是非阻塞I/O。
INO提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现。这两种新增的通道都支持阻塞和非阻塞两种模式。开发人员可以根据自己需求选择合适的模式。一般来说,低负载,低并发的应用程序可以选择同步阻塞I/O以降低编程复杂度,但是对于高负载,高并发的网络应用,需要使用NIO的非阻塞模式进行开发。
2.3.1 NIO类库解析
1.缓冲区Buffer
最常用的缓冲区为ByteBuffer,提供了一组功能用于操作数组。除此之外,还有CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer。
2.通道Channel
是一个通道,主要通过它读取和写入数据。
Channel主要分为网络读写SelectableChannel和文件操作FileChannel
Netty主要涉及ServerSocketChannel和SocketChannel都是SelectableChannel的子类
3.多路复用器Selector
不断的扫描新的TCP连接接入、读和写事件的Channel,如果有,Channel就会处于就绪状态,被Selector轮询出来,然后通过selectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
2.3.2 NIO源码
1.服务端代码
- public static void main(String[] args) throws IOException {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默认值
- }
- }
- MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
- new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
2.MultiplexerTimeServer类
- public class MultiplexerTimeServer implements Runnable {
- private Selector selector;
- private ServerSocketChannel servChannel;
- private volatile boolean stop;
- /**
- * 初始化多路复用器、绑定监听端口
- *
- * @param port
- */
- public MultiplexerTimeServer(int port) {
- try {
- selector = Selector.open();
- servChannel = ServerSocketChannel.open();
- servChannel.configureBlocking(false);
- servChannel.socket().bind(new InetSocketAddress(port), 1024);
- servChannel.register(selector, SelectionKey.OP_ACCEPT);
- System.out.println("The time server is start in port : " + port);
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(1);
- }
- }
- public void stop() {
- this.stop = true;
- }
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- while (!stop) {
- try {
- selector.select(1000);
- Set<SelectionKey> selectedKeys = selector.selectedKeys();
- Iterator<SelectionKey> it = selectedKeys.iterator();
- SelectionKey key = null;
- while (it.hasNext()) {
- key = it.next();
- it.remove();
- try {
- handleInput(key);
- } catch (Exception e) {
- if (key != null) {
- key.cancel();
- if (key.channel() != null)
- key.channel().close();
- }
- }
- }
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
- // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
- if (selector != null)
- try {
- selector.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void handleInput(SelectionKey key) throws IOException {
- if (key.isValid()) {
- // 处理新接入的请求消息
- if (key.isAcceptable()) {
- // Accept the new connection
- ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
- SocketChannel sc = ssc.accept();
- sc.configureBlocking(false);
- // Add the new connection to the selector
- sc.register(selector, SelectionKey.OP_READ);
- }
- if (key.isReadable()) {
- // Read the data
- SocketChannel sc = (SocketChannel) key.channel();
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- int readBytes = sc.read(readBuffer);
- if (readBytes > 0) {
- readBuffer.flip();
- byte[] bytes = new byte[readBuffer.remaining()];
- readBuffer.get(bytes);
- String body = new String(bytes, "UTF-8");
- System.out.println("The time server receive order : "
- + body);
- String currentTime = "QUERY TIME ORDER"
- .equalsIgnoreCase(body) ? new java.util.Date(
- System.currentTimeMillis()).toString()
- : "BAD ORDER";
- doWrite(sc, currentTime);
- } else if (readBytes < 0) {
- // 对端链路关闭
- key.cancel();
- sc.close();
- } else
- ; // 读到0字节,忽略
- }
- }
- }
- private void doWrite(SocketChannel channel, String response)
- throws IOException {
- if (response != null && response.trim().length() > 0) {
- byte[] bytes = response.getBytes();
- ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
- writeBuffer.put(bytes);
- writeBuffer.flip();
- channel.write(writeBuffer);
- }
- }
- }
3.客户端代码
- public static void main(String[] args) {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默认值
- }
- }
- new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001")
- .start();
- }
4.TimeClientHandle类
- public class TimeClientHandle implements Runnable {
- private String host;
- private int port;
- private Selector selector;
- private SocketChannel socketChannel;
- private volatile boolean stop;
- public TimeClientHandle(String host, int port) {
- this.host = host == null ? "127.0.0.1" : host;
- this.port = port;
- try {
- selector = Selector.open();
- socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(1);
- }
- }
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- try {
- doConnect();
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(1);
- }
- while (!stop) {
- try {
- selector.select(1000);
- Set<SelectionKey> selectedKeys = selector.selectedKeys();
- Iterator<SelectionKey> it = selectedKeys.iterator();
- SelectionKey key = null;
- while (it.hasNext()) {
- key = it.next();
- it.remove();
- try {
- handleInput(key);
- } catch (Exception e) {
- if (key != null) {
- key.cancel();
- if (key.channel() != null)
- key.channel().close();
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- System.exit(1);
- }
- }
- // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
- if (selector != null)
- try {
- selector.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void handleInput(SelectionKey key) throws IOException {
- if (key.isValid()) {
- // 判断是否连接成功
- SocketChannel sc = (SocketChannel) key.channel();
- if (key.isConnectable()) {
- if (sc.finishConnect()) {
- sc.register(selector, SelectionKey.OP_READ);
- doWrite(sc);
- } else
- System.exit(1);// 连接失败,进程退出
- }
- if (key.isReadable()) {
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- int readBytes = sc.read(readBuffer);
- if (readBytes > 0) {
- readBuffer.flip();
- byte[] bytes = new byte[readBuffer.remaining()];
- readBuffer.get(bytes);
- String body = new String(bytes, "UTF-8");
- System.out.println("Now is : " + body);
- this.stop = true;
- } else if (readBytes < 0) {
- // 对端链路关闭
- key.cancel();
- sc.close();
- } else
- ; // 读到0字节,忽略
- }
- }
- }
- private void doConnect() throws IOException {
- // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
- if (socketChannel.connect(new InetSocketAddress(host, port))) {
- socketChannel.register(selector, SelectionKey.OP_READ);
- doWrite(socketChannel);
- } else
- socketChannel.register(selector, SelectionKey.OP_CONNECT);
- }
- private void doWrite(SocketChannel sc) throws IOException {
- byte[] req = "QUERY TIME ORDER".getBytes();
- ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
- writeBuffer.put(req);
- writeBuffer.flip();
- sc.write(writeBuffer);
- if (!writeBuffer.hasRemaining())
- System.out.println("Send order 2 server succeed.");
- }
- }
2.4 AIO编程
JDK7的产物,即NIO2.0。
1.服务端代码
- public class TimeServer {
- /**
- * @param args
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默认值
- }
- }
- AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
- new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
- }
- }
2.AsyncTimeServerHandler类
- public class AsyncTimeServerHandler implements Runnable {
- private int port;
- CountDownLatch latch;
- AsynchronousServerSocketChannel asynchronousServerSocketChannel;
- public AsyncTimeServerHandler(int port) {
- this.port = port;
- try {
- asynchronousServerSocketChannel = AsynchronousServerSocketChannel
- .open();
- asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
- System.out.println("The time server is start in port : " + port);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- latch = new CountDownLatch(1);
- doAccept();
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public void doAccept() {
- asynchronousServerSocketChannel.accept(this,
- new AcceptCompletionHandler());
- }
- }
3.AcceptCompletionHandler类
- public class AcceptCompletionHandler implements
- CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {
- @Override
- public void completed(AsynchronousSocketChannel result,
- AsyncTimeServerHandler attachment) {
- attachment.asynchronousServerSocketChannel.accept(attachment, this);
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- result.read(buffer, buffer, new ReadCompletionHandler(result));
- }
- @Override
- public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
- exc.printStackTrace();
- attachment.latch.countDown();
- }
- }
4.ReadCompletionHandler类
- public class ReadCompletionHandler implements
- CompletionHandler<Integer, ByteBuffer> {
- private AsynchronousSocketChannel channel;
- public ReadCompletionHandler(AsynchronousSocketChannel channel) {
- if (this.channel == null)
- this.channel = channel;
- }
- @Override
- public void completed(Integer result, ByteBuffer attachment) {
- attachment.flip();
- byte[] body = new byte[attachment.remaining()];
- attachment.get(body);
- try {
- String req = new String(body, "UTF-8");
- System.out.println("The time server receive order : " + req);
- String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
- System.currentTimeMillis()).toString() : "BAD ORDER";
- doWrite(currentTime);
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- private void doWrite(String currentTime) {
- if (currentTime != null && currentTime.trim().length() > 0) {
- byte[] bytes = (currentTime).getBytes();
- ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
- writeBuffer.put(bytes);
- writeBuffer.flip();
- channel.write(writeBuffer, writeBuffer,
- new CompletionHandler<Integer, ByteBuffer>() {
- @Override
- public void completed(Integer result, ByteBuffer buffer) {
- // 如果没有发送完成,继续发送
- if (buffer.hasRemaining())
- channel.write(buffer, buffer, this);
- }
- @Override
- public void failed(Throwable exc, ByteBuffer attachment) {
- try {
- channel.close();
- } catch (IOException e) {
- // ingnore on close
- }
- }
- });
- }
- }
- @Override
- public void failed(Throwable exc, ByteBuffer attachment) {
- try {
- this.channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
5.客户端代码
- public class TimeClient {
- /**
- * @param args
- */
- public static void main(String[] args) {
- int port = 8080;
- if (args != null && args.length > 0) {
- try {
- port = Integer.valueOf(args[0]);
- } catch (NumberFormatException e) {
- // 采用默认值
- }
- }
- new Thread(new AsyncTimeClientHandler("127.0.0.1", port),
- "AIO-AsyncTimeClientHandler-001").start();
- }
- }
6.AsyncTimeClientHandler
- public class AsyncTimeClientHandler implements
- CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
- private AsynchronousSocketChannel client;
- private String host;
- private int port;
- private CountDownLatch latch;
- public AsyncTimeClientHandler(String host, int port) {
- this.host = host;
- this.port = port;
- try {
- client = AsynchronousSocketChannel.open();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void run() {
- latch = new CountDownLatch(1);
- client.connect(new InetSocketAddress(host, port), this, this);
- try {
- latch.await();
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- try {
- client.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void completed(Void result, AsyncTimeClientHandler attachment) {
- byte[] req = "QUERY TIME ORDER".getBytes();
- ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
- writeBuffer.put(req);
- writeBuffer.flip();
- client.write(writeBuffer, writeBuffer,
- new CompletionHandler<Integer, ByteBuffer>() {
- @Override
- public void completed(Integer result, ByteBuffer buffer) {
- if (buffer.hasRemaining()) {
- client.write(buffer, buffer, this);
- } else {
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- client.read(
- readBuffer,
- readBuffer,
- new CompletionHandler<Integer, ByteBuffer>() {
- @Override
- public void completed(Integer result,
- ByteBuffer buffer) {
- buffer.flip();
- byte[] bytes = new byte[buffer
- .remaining()];
- buffer.get(bytes);
- String body;
- try {
- body = new String(bytes,
- "UTF-8");
- System.out.println("Now is : "
- + body);
- latch.countDown();
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void failed(Throwable exc,
- ByteBuffer attachment) {
- try {
- client.close();
- latch.countDown();
- } catch (IOException e) {
- // ingnore on close
- }
- }
- });
- }
- }
- @Override
- public void failed(Throwable exc, ByteBuffer attachment) {
- try {
- client.close();
- latch.countDown();
- } catch (IOException e) {
- // ingnore on close
- }
- }
- });
- }
- @Override
- public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
- exc.printStackTrace();
- try {
- client.close();
- latch.countDown();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
2.5 4中I/O对比