完成了对NIO三大组件,Buffer,Channle,Selector的介绍,现在可以介绍一个完整的NIO EchoServer的例子了
这里再次重新介绍一下channel,因为无论服务端还是客户端, 在读数据的时候,channel都是从SelectionKey反向拿到的,可能第一次看不明白,其实这里反向拿到的channel,就是register时注册的channel
但是这段代码存在粘包和拆包的问题,所谓粘包和拆包的问题,是TCP协议下的Nagle算法,为了优化传输效率,可能将多个间隔时间端,且规模较小的包,合并成一个包进行发送。
Server
package org.scaventz.nio.mine; import io.netty.util.CharsetUtil; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.time.LocalDateTime; import java.util.Iterator; import java.util.Set; public class NioEchoServer { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); // 设置为非阻塞 serverSocketChannel.configureBlocking(false); // 获得一个 Selector 对象 Selector selector = Selector.open(); // 将我们的 serverSocketChannel 注册到 selector上,注册的事件为 ACCEPT事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 阻塞等待,也可以选择不阻塞 比如select(1000),等待1ms selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> keysIterator = keys.iterator(); while (keysIterator.hasNext()) { SelectionKey key = keysIterator.next(); if (key.isAcceptable()) { System.out.println("连接事件到来"); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(4096)); keysIterator.remove(); continue; } if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); channel.read(buffer); System.out.println("收到客户端信息[" + LocalDateTime.now() + "]: " + new String(buffer.array(), CharsetUtil.UTF_8)); // 回复客户端 buffer.clear(); channel.write(buffer); buffer.clear(); keysIterator.remove(); } } } } }
Client
package org.scaventz.nio.mine; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.time.LocalDateTime; import java.util.Iterator; import java.util.Set; public class NioEchoClient { private final String HOST = "localhost"; private final int PORT = 8080; SocketChannel socketChannel = null; Selector selector = null; public static void main(String[] args) throws IOException, InterruptedException { NioEchoClient client = new NioEchoClient(); client.go(); } public void go() throws IOException, InterruptedException { selector = Selector.open(); // 一个线程专门负责发送信息给服务器端 new Thread(() -> { try { socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); for (int i = 0; i < 50; i++) { String msg = "hello world" + i; socketChannel.write(ByteBuffer.wrap(msg.getBytes())); System.out.println("[" + LocalDateTime.now() + "]sent: " + msg); } } catch (Exception e) { e.printStackTrace(); try { socketChannel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } }).start(); // 一个线程专门负责处理服务端回送的消息 new Thread(() -> { while (true) { try { if (selector.select(2000) > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); // 这里假设要求每次交互发送的数据不能大于4K ByteBuffer buffer = ByteBuffer.allocate(4096); channel.read(buffer); System.out.println("[" + LocalDateTime.now() + "]recv: " + new String(buffer.array())); buffer.clear(); iterator.remove(); } } } else { continue; } } catch (Exception e) { e.printStackTrace(); try { socketChannel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } }).start(); } }