JAVA Nio demo

package com.network;

import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;

public class NioNetwork {

    public static Selector readSelector;
    public static Selector writeSelector;
    public static final int RECEIVE_BUFFER_SIZE = 128;
    public static final int SEND_BUFFER_SIZE = 128;


    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(8899));
        serverSocket.configureBlocking(false);
        readSelector = Selector.open();
        writeSelector = Selector.open();

        new Thread(() -> {
            for (;;){
                try {
                    int select = readSelector.selectNow();
                    if( select > 0 ){
                        Iterator<SelectionKey> it = readSelector.selectedKeys().iterator();
                        while (it.hasNext()){
                            SelectionKey sk = it.next();
                            it.remove();
                            if( sk.isValid() ){
                                Handler handler = (Handler)sk.attachment();
                                try {
                                    handler.handle();
                                } catch (IOException e) {
                                    handler.onFailure(e);
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        new Thread(() -> {
            for (;;){
                try {
                    int select = writeSelector.selectNow();
                    if( select > 0 ){
                        Iterator<SelectionKey> it = writeSelector.selectedKeys().iterator();
                        while (it.hasNext()){
                            SelectionKey sk = it.next();
                            it.remove();
                            if( sk.isValid() ){
                                Handler handler = (Handler)sk.attachment();
                                try {
                                    handler.handle();
                                } catch (IOException e) {
                                    handler.onFailure(e);
                                }
                            }else{
                                System.out.println("invalid sk");
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        while (true){
            SocketChannel clientChannel = serverSocket.accept();
            if(clientChannel != null){
                System.out.println("accept ....");
                clientChannel.configureBlocking(false);
                clientChannel.socket().setReceiveBufferSize(RECEIVE_BUFFER_SIZE);
                clientChannel.socket().setSendBufferSize(SEND_BUFFER_SIZE);

                NioChannel nioChannel = new NioChannel(clientChannel);
                Reader reader = new Reader(nioChannel);
                Writer writer = new Writer(nioChannel);

                nioChannel.setReader(reader);
                nioChannel.setWriter(writer);

                SelectionKey selectionWriteKey2 = clientChannel.register(writeSelector, SelectionKey.OP_WRITE, writer);
                writer.setSk(selectionWriteKey2);

                SelectionKey selectionReadKey1 = clientChannel.register(readSelector, SelectionKey.OP_READ, reader);
                reader.setSk(selectionReadKey1);

                readSelector.wakeup();
            }
        }
    }

}

class NioChannel{
    private SocketChannel socketChannel;
    private Reader reader;
    private Writer writer;

    public NioChannel( SocketChannel channel) {
        this.socketChannel = channel;
    }

    public SocketChannel getChannel() {
        return socketChannel;
    }

    public Reader getReader() {
        return reader;
    }

    public void setReader(Reader reader) {
        this.reader = reader;
    }

    public Writer getWriter() {
        return writer;
    }

    public void setWriter(Writer writer) {
        this.writer = writer;
    }
}

class Reader extends Handler{

    private ByteBuffer buffer;
    private Packet packet ;

    public String getName(){
        return "Reader";
    }

    public Reader(NioChannel channel) {
        super(channel);
        interestOps = SelectionKey.OP_READ;
    }

    @Override
    protected String getMsg() {
        return "From Read;";
    }

    @Override
    public void handle() throws IOException {

        if(buffer == null){
            buffer = ByteBuffer.allocate(NioNetwork.RECEIVE_BUFFER_SIZE);
        }
        int readBytes = getChannel().read(buffer);
        if( readBytes <= 0  ){
            if( readBytes == -1 ){
                throw new EOFException("remote socket closed");
            }
            return;
        }

        buffer.flip();
        while( buffer.hasRemaining() ){
            if( packet == null ){
                packet = new Packet();
            }
            Boolean complete = packet.readFrom(buffer);
            if( complete ){
                doHandle(packet);
                packet = null;
            }else{
                break;
            }
        }
        compactOrClear(buffer);
    }

    private void doHandle( Packet packet ) throws ClosedChannelException {
        Packet packet1 = new Packet();
        packet1.setType(Packet.Type.RESPONSE);
        getNioChannel().getWriter().write(packet1);
    }
}

class Writer extends Handler{
    private ConcurrentLinkedDeque<Packet> writeQueue = new ConcurrentLinkedDeque<>();
    private ByteBuffer outputBuffer;
    private Packet currentPacket;
    private AtomicBoolean scheduled = new AtomicBoolean(false);

    public String getName(){
        return "Writer";
    }

    public Writer(NioChannel channel) {
        super(channel);
        interestOps = SelectionKey.OP_WRITE;
    }

    @Override
    protected String getMsg() {
        return "From Write;";
    }

    public void write( Packet packet) throws ClosedChannelException {
        writeQueue.offer( packet );
        schedule();
    }

    private void schedule() throws ClosedChannelException {
        if( scheduled.get()){
            return;
        }

        if( !scheduled.compareAndSet(false, true)){
            return;
        }
        //绑定读就绪
        getNioChannel().getChannel().register(NioNetwork.writeSelector, getInterestOps(),this );
    }

    @Override
    public void handle() throws IOException {

        if( outputBuffer == null ){
            outputBuffer = ByteBuffer.allocate(NioNetwork.SEND_BUFFER_SIZE);
        }
        //do something
        if( currentPacket == null ){
            currentPacket = writeQueue.poll();
        }
        while (currentPacket != null ){

            if ( !onWrite(currentPacket, outputBuffer)){
                return ;
            }
            currentPacket = writeQueue.poll();
        }

        if( outputBuffer !=null && outputBuffer.position() > 0){
            outputBuffer.flip();
            getChannel().write(outputBuffer);
            compactOrClear(outputBuffer);
        }else{
            int interestOps = getSk().interestOps();
            if( (interestOps & getInterestOps()) != 0 ){
                getSk().interestOps(interestOps & ~getInterestOps());
            }
        }

    }

    private boolean onWrite( Packet packet , ByteBuffer dst ){
        return packet.writeTo(dst);
    }

}

class Handler{
    private SocketChannel channel;
    private NioChannel nioChannel;
    private ByteBuffer buffer;
    private SelectionKey sk;
    protected int interestOps;
    protected String getName(){
        return "";
    }

    public Handler(NioChannel channel) {
        this.channel = channel.getChannel();
        this.nioChannel = channel;
    }

    public NioChannel getNioChannel() {
        return nioChannel;
    }

    protected String getMsg(){
        return  "";
    };
    public void handle() throws IOException {
        if( buffer == null ){
            buffer = ByteBuffer.allocate(128);
        }
        int readBytes = channel.read(buffer);
        if( readBytes <=0 ){
            if (readBytes == -1) {
                throw new EOFException("Remote socket closed!");
            }
            return;
        }
        buffer.flip();
        System.out.println(getName()+"buf:"+buffer.getChar());

        channel.write(buffer);
        buffer.clear();
    }

    public SocketChannel getChannel() {
        return channel;
    }

    public void onFailure( Throwable cause ){
        Socket socket = channel.socket();
        try {
            cause.printStackTrace();
            socket.close();
            System.out.println("Error client Closed :"+cause.getMessage());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public SelectionKey getSk() {
        return sk;
    }

    public void setSk(SelectionKey sk) {
        this.sk = sk;
    }

    public int getInterestOps() {
        return interestOps;
    }

    public void compactOrClear(ByteBuffer bb) {
        if (bb.hasRemaining()) {
            bb.compact();
        } else {
            bb.clear();
        }
    }
}

Client: 

 

package com.network;


import sun.nio.ch.DirectBuffer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ClientNetwork {

    public static Selector selector;

    private static volatile ClientNetwork instance;

    static {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void ClientNetwork(){

    }

    public static ClientNetwork getInstance() throws IOException {
        if( instance == null ){
            synchronized (ClientNetwork.class){
                if( instance == null ){
                    instance = new ClientNetwork();
                }
            }
        }
        return instance;
    }

    private ByteBuffer readByte;
    private ByteBuffer writeByte;

    private ConcurrentLinkedQueue<Packet> sendQueue = new ConcurrentLinkedQueue<>();

    public ClientNetwork() throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(8899));
        SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        sk.attach( this );

        new Thread(() -> {
            try {
                int select = selector.select(1000);
                if( select > 0 ){
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()){
                        SelectionKey next = iterator.next();
                        iterator.remove();
                        if( next.isValid() ) {
                            handler(next);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).run();
    }

    public void handler(SelectionKey sk ){
        if(sk.isReadable()){
            read();
        }else{
            send( new Packet() );
        }
    }

    public boolean send( Packet packet ){
        sendQueue.offer(packet);
        return true;
    }

    public Packet read(){
        return null;
    }

    public static void main(String[] args) throws IOException, InterruptedException {


            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress(8899));
            socketChannel.configureBlocking(false);
            Packet packet = new Packet();
            ByteBuffer byteBuffer =  ByteBuffer.allocate(128);
            boolean writeOk = packet.writeTo(byteBuffer);
            if(!writeOk){
                System.out.println("writeFailed");
            }
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            byteBuffer.flip();
            for(;;){
                if(socketChannel.read(byteBuffer) > 0){
                    byteBuffer.rewind();
                    Packet packet1 = new Packet();
                    packet1.readFrom(byteBuffer);
                    System.out.println(packet1.getType());
                    break;
                }
            }
    }
}

  

上一篇:NIO、BIO编程模型与零拷贝


下一篇:Echarts(一)