package com.network; import java.io.EOFException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; public class NioNetwork { public static Selector readSelector; public static Selector writeSelector; public static final int RECEIVE_BUFFER_SIZE = 128; public static final int SEND_BUFFER_SIZE = 128; public static void main(String[] args) throws IOException { ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(8899)); serverSocket.configureBlocking(false); readSelector = Selector.open(); writeSelector = Selector.open(); new Thread(() -> { for (;;){ try { int select = readSelector.selectNow(); if( select > 0 ){ Iterator<SelectionKey> it = readSelector.selectedKeys().iterator(); while (it.hasNext()){ SelectionKey sk = it.next(); it.remove(); if( sk.isValid() ){ Handler handler = (Handler)sk.attachment(); try { handler.handle(); } catch (IOException e) { handler.onFailure(e); } } } } } catch (IOException e) { e.printStackTrace(); } } }).start(); new Thread(() -> { for (;;){ try { int select = writeSelector.selectNow(); if( select > 0 ){ Iterator<SelectionKey> it = writeSelector.selectedKeys().iterator(); while (it.hasNext()){ SelectionKey sk = it.next(); it.remove(); if( sk.isValid() ){ Handler handler = (Handler)sk.attachment(); try { handler.handle(); } catch (IOException e) { handler.onFailure(e); } }else{ System.out.println("invalid sk"); } } } } catch (IOException e) { e.printStackTrace(); } } }).start(); while (true){ SocketChannel clientChannel = serverSocket.accept(); if(clientChannel != null){ System.out.println("accept ...."); clientChannel.configureBlocking(false); clientChannel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE); clientChannel.socket().setSendBufferSize(SEND_BUFFER_SIZE); NioChannel nioChannel = new NioChannel(clientChannel); Reader reader = new Reader(nioChannel); Writer writer = new Writer(nioChannel); nioChannel.setReader(reader); nioChannel.setWriter(writer); SelectionKey selectionWriteKey2 = clientChannel.register(writeSelector, SelectionKey.OP_WRITE, writer); writer.setSk(selectionWriteKey2); SelectionKey selectionReadKey1 = clientChannel.register(readSelector, SelectionKey.OP_READ, reader); reader.setSk(selectionReadKey1); readSelector.wakeup(); } } } } class NioChannel{ private SocketChannel socketChannel; private Reader reader; private Writer writer; public NioChannel( SocketChannel channel) { this.socketChannel = channel; } public SocketChannel getChannel() { return socketChannel; } public Reader getReader() { return reader; } public void setReader(Reader reader) { this.reader = reader; } public Writer getWriter() { return writer; } public void setWriter(Writer writer) { this.writer = writer; } } class Reader extends Handler{ private ByteBuffer buffer; private Packet packet ; public String getName(){ return "Reader"; } public Reader(NioChannel channel) { super(channel); interestOps = SelectionKey.OP_READ; } @Override protected String getMsg() { return "From Read;"; } @Override public void handle() throws IOException { if(buffer == null){ buffer = ByteBuffer.allocate(NioNetwork.RECEIVE_BUFFER_SIZE); } int readBytes = getChannel().read(buffer); if( readBytes <= 0 ){ if( readBytes == -1 ){ throw new EOFException("remote socket closed"); } return; } buffer.flip(); while( buffer.hasRemaining() ){ if( packet == null ){ packet = new Packet(); } Boolean complete = packet.readFrom(buffer); if( complete ){ doHandle(packet); packet = null; }else{ break; } } compactOrClear(buffer); } private void doHandle( Packet packet ) throws ClosedChannelException { Packet packet1 = new Packet(); packet1.setType(Packet.Type.RESPONSE); getNioChannel().getWriter().write(packet1); } } class Writer extends Handler{ private ConcurrentLinkedDeque<Packet> writeQueue = new ConcurrentLinkedDeque<>(); private ByteBuffer outputBuffer; private Packet currentPacket; private AtomicBoolean scheduled = new AtomicBoolean(false); public String getName(){ return "Writer"; } public Writer(NioChannel channel) { super(channel); interestOps = SelectionKey.OP_WRITE; } @Override protected String getMsg() { return "From Write;"; } public void write( Packet packet) throws ClosedChannelException { writeQueue.offer( packet ); schedule(); } private void schedule() throws ClosedChannelException { if( scheduled.get()){ return; } if( !scheduled.compareAndSet(false, true)){ return; } //绑定读就绪 getNioChannel().getChannel().register(NioNetwork.writeSelector, getInterestOps(),this ); } @Override public void handle() throws IOException { if( outputBuffer == null ){ outputBuffer = ByteBuffer.allocate(NioNetwork.SEND_BUFFER_SIZE); } //do something if( currentPacket == null ){ currentPacket = writeQueue.poll(); } while (currentPacket != null ){ if ( !onWrite(currentPacket, outputBuffer)){ return ; } currentPacket = writeQueue.poll(); } if( outputBuffer !=null && outputBuffer.position() > 0){ outputBuffer.flip(); getChannel().write(outputBuffer); compactOrClear(outputBuffer); }else{ int interestOps = getSk().interestOps(); if( (interestOps & getInterestOps()) != 0 ){ getSk().interestOps(interestOps & ~getInterestOps()); } } } private boolean onWrite( Packet packet , ByteBuffer dst ){ return packet.writeTo(dst); } } class Handler{ private SocketChannel channel; private NioChannel nioChannel; private ByteBuffer buffer; private SelectionKey sk; protected int interestOps; protected String getName(){ return ""; } public Handler(NioChannel channel) { this.channel = channel.getChannel(); this.nioChannel = channel; } public NioChannel getNioChannel() { return nioChannel; } protected String getMsg(){ return ""; }; public void handle() throws IOException { if( buffer == null ){ buffer = ByteBuffer.allocate(128); } int readBytes = channel.read(buffer); if( readBytes <=0 ){ if (readBytes == -1) { throw new EOFException("Remote socket closed!"); } return; } buffer.flip(); System.out.println(getName()+"buf:"+buffer.getChar()); channel.write(buffer); buffer.clear(); } public SocketChannel getChannel() { return channel; } public void onFailure( Throwable cause ){ Socket socket = channel.socket(); try { cause.printStackTrace(); socket.close(); System.out.println("Error client Closed :"+cause.getMessage()); } catch (IOException e) { e.printStackTrace(); } } public SelectionKey getSk() { return sk; } public void setSk(SelectionKey sk) { this.sk = sk; } public int getInterestOps() { return interestOps; } public void compactOrClear(ByteBuffer bb) { if (bb.hasRemaining()) { bb.compact(); } else { bb.clear(); } } }
Client:
package com.network; import sun.nio.ch.DirectBuffer; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; public class ClientNetwork { public static Selector selector; private static volatile ClientNetwork instance; static { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } private void ClientNetwork(){ } public static ClientNetwork getInstance() throws IOException { if( instance == null ){ synchronized (ClientNetwork.class){ if( instance == null ){ instance = new ClientNetwork(); } } } return instance; } private ByteBuffer readByte; private ByteBuffer writeByte; private ConcurrentLinkedQueue<Packet> sendQueue = new ConcurrentLinkedQueue<>(); public ClientNetwork() throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress(8899)); SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); sk.attach( this ); new Thread(() -> { try { int select = selector.select(1000); if( select > 0 ){ Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey next = iterator.next(); iterator.remove(); if( next.isValid() ) { handler(next); } } } } catch (IOException e) { e.printStackTrace(); } }).run(); } public void handler(SelectionKey sk ){ if(sk.isReadable()){ read(); }else{ send( new Packet() ); } } public boolean send( Packet packet ){ sendQueue.offer(packet); return true; } public Packet read(){ return null; } public static void main(String[] args) throws IOException, InterruptedException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress(8899)); socketChannel.configureBlocking(false); Packet packet = new Packet(); ByteBuffer byteBuffer = ByteBuffer.allocate(128); boolean writeOk = packet.writeTo(byteBuffer); if(!writeOk){ System.out.println("writeFailed"); } byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.flip(); for(;;){ if(socketChannel.read(byteBuffer) > 0){ byteBuffer.rewind(); Packet packet1 = new Packet(); packet1.readFrom(byteBuffer); System.out.println(packet1.getType()); break; } } } }