Netty学习笔记(1) NIO基础-2

文章目录


1. 前言

笔记基于黑马的Netty教学,视频地址:黑马Netty


2. 网络编程(单线程)

1、阻塞

  • 在没有数据可读是,包括数据复制过程中,线程必须阻塞等待,不会占用cpu,但线程相当于闲置
  • 32位jvm一个线程320k,64位jvm一个线程1024k,为了减少线程数,需要采用线程池技术
  • 但即使使用了线程池,如果有很多连接建立,但长时间inactive,会阻塞线程池中所有的线程。

下面的例子:使用客户端和服务端演示阻塞

  1. 1个客户端的情况,但是下面的代码无法处理一个客户端发送多条请求的情况,因为在发送完成的时候 SocketChannel sc = ssc.accept()再次堵塞,这时候需要一个新的客户端才可以继续运行下去。阻塞模式下单线程不能很好的工作。
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //使用NIO来理解阻塞模式
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //1、创建一个服务器对象
        ServerSocketChannel ssc = ServerSocketChannel.open();

        //2、绑定端口
        ssc.bind(new InetSocketAddress(8080));

        //3、建立连接的集合
        List<SocketChannel> channels = new LinkedList<>();

        while (true) {
            //4、建立与客户端的一个连接accept
            //SocketChannel 用来和客户端之间进行通信
            log.debug("connecting...");
            /**
             accept默认阻塞,单线程情况下线程停止运行,连接建立之后才可以继续指向
             */
            SocketChannel sc = ssc.accept();
            log.debug("connected... {}", sc);
            channels.add(sc);
            for (SocketChannel channel : channels) {
                //connected local=/127.0.0.1:8080 remote=/127.0.0.1:61314
                log.debug("before read... {}", channel);
                //5、接受客户端发送的数据
                /**
                 read也是阻塞方法,线程停止运行,客户端没有发送数据那么这里就不会继续走下去
                 */
                channel.read(buffer);
                //读模式
                buffer.flip();
                debugAll(buffer);
                //写模式
                buffer.clear();
                log.debug("after read... {}", channel);
            }
        }
    }
}
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        System.out.println("waiting");
    }
}
  1. 当然解决方法可以是一个客户端用一个连接去处理


2、非阻塞

  • 在某个Channel没有可读事件的时候,线程不必阻塞,它可以去处理其他有可读事件的Channel
  • 数据复制过程中,线b程实际还是阻塞的(AIO改进的地方)
  • 写数据的时候,线程只是等待数据写入Channel即可,无需等待Channel通过网络把数据发送出去

下面的例子:使用客户端和服务端演示非阻塞

  1. 非阻塞模式下使用configureBlocking(false)来指定,此时的accept方法和read方法都是非阻塞的
  2. 但是也有一个弊端,就是cpu占用率太高了,无论什么情况都在死循环,效率很低,所以我们需要改进,能不能等到有连接的时候才调用。
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //使用NIO来理解阻塞模式
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //1、创建一个服务器对象
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);   //切换成非阻塞模式,accept方法变成非阻塞的
        //2、绑定端口
        ssc.bind(new InetSocketAddress(8080));

        //3、建立连接的集合
        List<SocketChannel> channels = new LinkedList<>();

        while (true) {
            //4、建立与客户端的一个连接accept
            //SocketChannel 用来和客户端之间进行通信
            log.debug("connecting...");
            /**
             accept默认阻塞,单线程情况下线程停止运行,连接建立之后才可以继续指向
             非阻塞模式下,线程还会继续运行,如果没有连接建立,那么sc返回的是null
             */
            SocketChannel sc = ssc.accept();
            if(sc != null){
                log.debug("connected... {}", sc);
                channels.add(sc);
                /**
                    channel设置成非阻塞模式,那么下面的read就是非阻塞了
                 */
                sc.configureBlocking(false);
            }
            for (SocketChannel channel : channels) {
                log.debug("before read... {}", channel);
                //5、接受客户端发送的数据
                /**
                 此时的read是非阻塞的,会继续运行,没有读到数据会返回0
                 */
                int read = channel.read(buffer);
                if(read > 0){
                    //读到数据了
                    //读模式
                    buffer.flip();
                    debugAll(buffer);
                    //写模式
                    buffer.clear();
                    log.debug("after read... {}", channel);
                }
            }
        }
    }
}



3、多路复用和事件处理

1. 事件处理

线程必须配合Selector才可以完成对多个Channel可读写事件的监控,这称之为多路复用,注意有事件才有Selector

  • 多路复用仅仅针对网络IO,普通文件IO没法利用多路复用
  • 如果不用Selector的非阻塞模式。那么Channel读取到的字节很多时候都是0,而Selector保证了可读事件才去读取
  • Channel输入的数据一旦准备好,会触发Selector的可读事件

下面的例子:使用客户端和服务端演示阻塞

  1. 第一步,首先演示监听客户端连接事件,步骤如下:
  • 创建selector,管理多个channel
  • 建立channel和selector之间的联系(注册)
  • selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
  • 处理事件,selectionKeys拿到所有发生的可读可写的事件

事件有4种类型:
1、accept-会在用连接请求的时候触发
2、connect-客户端连接建立后触发的事件
3、read-客户端发送数据了就会触发
4、write-可写事件
注意:select在事件未处理的时候,它是不会阻塞的。

具体流程就是首先创建一个Selector,然后创建一个通道ServerSocketChannel,设置为非阻塞的。把这个通道和Selector绑定起来,然后为这个通道设置感兴趣的事件为accept事件,接着绑定端口。调用Selector.select方法进行阻塞,当有事件发生的时候,从Selector获取到所有的事件,然后通过事件找出通道,比如连接的时候,就会找出对accept感兴趣的通道,然后再调用accept接受客户端。

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个channel
        Selector selector = Selector.open();

        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //2. 建立channel和selector之间的联系(注册)
        //SelectionKey:事件发生后通过这个可以获取到什么事件,还可以知道是那个channel发生的事件
        /**
            事件有4种类型:
                1.accept-会在用连接请求的时候触发
                2.connect-客户端连接建立后触发的事件
                3.read-客户端发送数据了就会触发
                4.write-可写事件
         */
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //表名我们这个key只关注accept事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));

        while (true) {
            //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
            //解决了白白循环浪费CPU的问题
            selector.select();

            //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            while(iterator.hasNext()){
            	//注意,如果事件不调用accept进行处理,那么不会阻塞,因为事件没被处理,就不能阻塞
            	//也就是说事件要么处理要么取消,不能不管
                SelectionKey key = iterator.next();
				//key.cancle():取消事件
                log.debug("key:{}", key);
                //拿到触发事件的channel
                ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                SocketChannel sc = channel.accept();
                log.debug("{}", sc);
            }
        }
    }
}

处理不同的事件
Netty学习笔记(1) NIO基础-2
按上面的图片,一开始的sscKey 全部存放在selector中,后面当有事件发生的时候,会从selector中复制一份到selectedKeys这个集合中去进行遍历。同时要注意几个问题:

  • selectKeys中的事件key在处理完成之后一定要关闭,否则会导致空指针异常,因为这时候的key拿到的channel 是null,在调用 sc.configureBlocking(false)的时候就会报错
  • 注意客户端断开的问题,客户端断开一定要手动cancle(),需要将key取消(从selector的key集合中真正删除)。其次要判断是异常断开还是正常的断开。否则关闭的时候发送的读请求很有可能会造成异常(客户端断开异常)
  • 处理的时候要将key分为多种类型,accept类型,事件读取类型等等,在客户端关闭的时候注意会自动发送一个读取的事件。
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个channel
        Selector selector = Selector.open();

        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //2. 建立channel和selector之间的联系(注册)
        //SelectionKey:事件发生后通过这个可以获取到什么事件,还可以知道是那个channel发生的事件
        /**
            事件有4种类型:
                1.accept-会在用连接请求的时候触发
                2.connect-客户端连接建立后触发的事件
                3.read-客户端发送数据了就会触发
                4.write-可写事件
         */
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //表名我们这个key只关注accept事件,所有客户端连接的时候消息都会进入这个通道
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));

        while (true) {
            //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
            //解决了白白循环浪费CPU的问题
            selector.select();

            //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            //多个key的时候,accept和read方法都会触发事件,所以要区分事件类型
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //处理key的时候要从绿色的selectkeys中删除,否则就报错
                iterator.remove();
                log.debug("key:{}", key);

                //5.区分事件类型
                if (key.isAcceptable()) {
                    //拿到触发事件的channel
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    SocketChannel sc = channel.accept();
                    //设置为非阻塞
                    sc.configureBlocking(false);
                    //scKey管sc的channel
                    SelectionKey scKey = sc.register(selector, 0, null);
                    //scKey关注读事件,也就是说客户端的通道关注可读事件
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                }else if(key.isReadable()){
                    //客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
                    try {
                        //可读事件
                        SocketChannel channel = (SocketChannel)key.channel();//触发事件的channel
                        ByteBuffer buffer1 = ByteBuffer.allocate(16);
                        //客户端正常断开,read返回值是-1
                        int read = channel.read(buffer1);
                        if(read == -1){
                            //正常断开
                            key.cancel();
                        }
                        buffer1.flip();
                        debugAll(buffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();   //客户端断开,需要将key取消(从selector的key集合中真正删除)
                    }
                }

            }
        }
    }
}



2. 处理消息的边界

对于上面的代码,将buffer的容量减低,比如下面的例子减低为4个字节,当发送了2个汉字的时候,此时由于字节超出边界,会导致读取不完整,有些汉字会被分开读取,也就导致了不能完整读出的问题。

ByteBuffer buffer = ByteBuffer.allocate(4);
// 解码并打印
System.out.println(StandardCharsets.UTF_8.decode(buffer));

//你�
//��

Netty学习笔记(1) NIO基础-2
在文本传输的时候也会出现半包和粘包的情况,那么如何解决呢?

  • 一种是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽。数据的长度内容不够要进行补齐,也就是和设置的最大长度一样
    Netty学习笔记(1) NIO基础-2
  • 另一种思路是按分隔符拆分,缺点是效率低,ByteBuffer需要一个一个字符地去匹配,根据分隔符去拆分
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,将数据的长度一起进行发送。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量。TLV文章
    1、Http 1.1 是TLV格式
    2、Http 2.0 是LTV格式
    3、Http中的请求头里面有一个content-type
    Netty学习笔记(1) NIO基础-2

下面是给出第二种处理边界的思路:

else if(key.isReadable()){
//客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
  try {
      //可读事件
      SocketChannel channel = (SocketChannel)key.channel();//触发事件的channel
      ByteBuffer buffer1 = ByteBuffer.allocate(16);
      //客户端正常断开,read返回值是-1
      int read = channel.read(buffer1);
      if(read == -1){
          //正常断开
          key.cancel();
      }else{
      //split
          split(buffer1);
      }
  } 
  
 //按\n拆分
    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            //找到一条完整的信息
            if (source.get(i) == '\n') {
                //一条消息完整的长度
                int length = i + 1 - source.position();
                //把这条完整消息存入一个新的byteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
    }

但是上面的处理是有问题的,因为发送的数据>16的时候由于split检测不到\n,所以没法输出,会不断读,直到读到\n才可以输出。
解决方法:

  • byteBuffer不可以是局部变量,因为如果是局部变量,那么在第二次进入的时候第一次的数据已经丢失了,扩容已经来不及了
  • ByteBuffer在字节不够的时候就要开始扩容了,可以使用HashMap中的处理,空间翻倍。
    Netty学习笔记(1) NIO基础-2

1、使用buffer作为附件和scKey连接起来。

//一个Buffer关联到SelectionKey中,防止多个channel同时使用一个buffer
ByteBuffer buffer1 = ByteBuffer.allocate(16);   //attachment:附件
SelectionKey scKey = sc.register(selector, 0, buffer1);

2、容量不足的时候进行扩容,同时别忘了重新复制attar

 split(buffer1);
 if(buffer1.position() == buffer1.limit()){
        //扩容
        ByteBuffer newBuffer = ByteBuffer.allocate(buffer1.capacity() * 2);
        buffer1.flip();
        newBuffer.put(buffer1);
        //替换附件
        key.attach(newBuffer);
    }

3、全部代码

@Slf4j
public class Server {

    //按\n拆分
    private static void split(ByteBuffer source) {
        source.flip();
        for (int i = 0; i < source.limit(); i++) {
            //找到一条完整的信息
            if (source.get(i) == '\n') {
                //一条消息完整的长度
                int length = i + 1 - source.position();
                //把这条完整消息存入一个新的byteBuffer
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(source.get());
                }
                debugAll(target);
            }
        }
        //compact切换写模式了
        //compact底层是把buffer中的字节变成未读的字节,但是由于我们没有读取,所以这里剩余还是16
        source.compact();
    }

    public static void main(String[] args) throws IOException {
        //1. 创建selector,管理多个channel
        Selector selector = Selector.open();

        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        //2. 建立channel和selector之间的联系(注册)
        //SelectionKey:事件发生后通过这个可以获取到什么事件,还可以知道是那个channel发生的事件
        /**
            事件有4种类型:
                1.accept-会在用连接请求的时候触发
                2.connect-客户端连接建立后触发的事件
                3.read-客户端发送数据了就会触发
                4.write-可写事件
         */
        SelectionKey sscKey = ssc.register(selector, 0, null);
        //表名我们这个key只关注accept事件,所有客户端连接的时候消息都会进入这个通道
        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        log.debug("register key:{}", sscKey);
        ssc.bind(new InetSocketAddress(8080));

        while (true) {
            //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
            //解决了白白循环浪费CPU的问题
            selector.select();

            //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            //多个key的时候,accept和read方法都会触发事件,所以要区分事件类型
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //处理key的时候要从绿色的selectkeys中删除,否则就报错
                iterator.remove();
                log.debug("key:{}", key);

                //5.区分事件类型
                if (key.isAcceptable()) {
                    //拿到触发事件的channel
                    ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                    SocketChannel sc = channel.accept();
                    //设置为非阻塞
                    sc.configureBlocking(false);
                    //scKey管sc的channel
                    //一个Buffer关联到SelectionKey中,防止多个channel同时使用一个buffer
                    ByteBuffer buffer1 = ByteBuffer.allocate(16);   //attachment:附件
                    SelectionKey scKey = sc.register(selector, 0, buffer1);
                    //scKey关注读事件,也就是说客户端的通道关注可读事件
                    scKey.interestOps(SelectionKey.OP_READ);
                    log.debug("{}", sc);
                }else if(key.isReadable()){
                    //客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
                    try {
                        //可读事件
                        SocketChannel channel = (SocketChannel)key.channel();//触发事件的channel
                        //从SelectionKey中获取到独有的ByteBuffer附件
                        ByteBuffer buffer1 = (ByteBuffer)key.attachment();
                        //客户端正常断开,read返回值是-1
                        int read = channel.read(buffer1);
                        if(read == -1){
                            //正常断开
                            key.cancel();
                        }else{
                            split(buffer1);
                            if(buffer1.position() == buffer1.limit()){
                                //扩容
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer1.capacity() * 2);
                                buffer1.flip();
                                newBuffer.put(buffer1);
                                //替换附件
                                key.attach(newBuffer);
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        key.cancel();   //客户端断开,需要将key取消(从selector的key集合中真正删除)
                    }
                }

            }
        }
    }
}



3. ByteBuffer大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

分配大小的思路如下:

  1. 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现:分配地址
  2. 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗



4、write事件

1、初始的代码,使用while循环不断写入

public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        ssc.bind(new InetSocketAddress("localhost", 8080));

        while(true){
            selector.select();

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                if(key.isAcceptable()){
                    //这里其实是ssc,因为只有一个OP_ACCEPT
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    //向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for(int i = 0; i < 3000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    //返回值是实际写入的字节数
                    while(buffer.hasRemaining()){
                        //2. 写到客户端
                        int write = sc.write(buffer);
                        System.out.println("字节数:" + write);
                    }
                }
            }
        }
    }
}

客户端接受

public class WriteClinent {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
        int count = 0;
        //3. 接受数据
        while(true){
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

2、改进
上面的方法可以写入,但是我们有一个想法就是等缓冲区写入完全的时候再向客户端发送数据过去,而不是缓冲区写一点发送一点,这样CPU资源耗费就很大。
步骤思想就是:首先把50000000个字节的数据存入buffer中,然后一开始先写一次,把关注事件添加可写事件(可写是只要不阻塞都可写),第二次进入的时候key.isWritable判断成功,进入其中开始写buffer,后续就不断循环判断写buffer即可,当写完了之后,我们要把buffer从key中清除掉,attach设置为null,因为buffer内存太大的话会影响我们的效率和内存损耗。

public class WriteServer {
    public static void main(String[] args) throws IOException {
    	//创建一个通道
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //设置为非阻塞的
        ssc.configureBlocking(false);
		//选择器
        Selector selector = Selector.open();
        //进行注册绑定selector
        ssc.register(selector, SelectionKey.OP_ACCEPT);
		//绑定端口号
        ssc.bind(new InetSocketAddress("localhost", 8080));
		
        while(true){
        	//等待事件
            selector.select();
			//迭代
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){
            	//获取SelectionKey 
                SelectionKey key = iterator.next();
                iterator.remove();
                //连接事件
                if(key.isAcceptable()){
                    //这里其实是ssc,因为只有一个OP_ACCEPT
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, 0, null);

                    //向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for(int i = 0; i < 5000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    //2. 写到客户端
                    int write = sc.write(buffer);
                    System.out.println("字节数:" + write);
                    //3. 判断是不是还有剩余内容,因为channel一次能写入的数据是有限制的,buffer里面的内容可能写不完
                    //缓冲区还有数据没写完,就不要写了,直接保留等下一次凑够一波再写
                    if(buffer.hasRemaining()){
                        //4. 关注可写事件同时保留原来的事件
                        sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE);
                        //5. 把未写完的数据挂到selectionkey上
                        sckey.attach(buffer);
                    }
                    //可写事件,只要不阻塞其实就是可写事件
                }else if(key.isWritable()){
                //获取buffer ,buffer还有没写出去的一些数据
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    //获取通道
                    SocketChannel sc = (SocketChannel)key.channel();
					//写入buffer
                    int write = sc.write(buffer);
                    System.out.println(write);
                    //6. buffer写完了,就清除掉,防止占用内存
                    if(!buffer.hasRemaining()){
                        key.attach(null);   //清除buffer
                        key.interestOps(SelectionKey.OP_READ);//不再关注可写事件
                    }
                }
            }
        }
        //字节数:3276775
        //1179639
        //543586
    }
}
public class WriteClinent {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));
        ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
        int count = 0;
        //3. 接受数据
        while(true){
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}



5、小结

1. 绑定Channel

也称之为注册事件,绑定的事件selector才会关心

//创建一个通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置为非阻塞的
ssc.configureBlocking(false);
//选择器
Selector selector = Selector.open();
//进行注册绑定selector
ssc.register(selector, SelectionKey.OP_ACCEPT);
  • channel必须工作在非阻塞模式
  • FileChannel没有非阻塞模式,因此不能配合selector一起使用
  • 绑定的事件类型可以有
    1、connect:客户端连接成功的时候触发
    2、accept:服务端成功接受连接的时候触发
    3、read:数据可读入时触发,有因为接受能力弱,数据暂不能读入的情况
    4、write:数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况



2. 监听Channel事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少channel发生了事件
方法1、阻塞知道绑定事件发送:

int count = selector.select();

方法2、阻塞直到绑定事件发生,或是超时(单位ms):

int count = selector.select(long timeout);

方法3、不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值判断有没有事件:

int count = selector.selectNow();

select何时不阻塞 ?

  • 事件发生的时候
    1、客户端发起连接请求,会触发accept事件
    2、客户端发送数据过来的时候,客户端正常,异常关闭的时候都会触发read事件,另外如果发送的数据大于buffer缓冲区,会触发多此read事件
    3、channel可写,会触发write事件,只要不阻塞都可写。
    4、在linux下 nio bug发生的时候
  • 调用selector.wakeup()
  • 调用selector.close()
  • selector所在线程interrupt
上一篇:08. C Pro File Operation


下一篇:elasticsearch(四) index 跨集群迁移