1.简述
Java NIO是一种同步非阻塞IO(指的是网络编程中客户端与服务端连接交换数据的过程是非阻塞的,普通的文件读写依然是阻塞的。)。NIO是为了弥补IO操作的不足而诞生的,NIO的一些新特性有:非阻塞I/O,选择器,缓冲以及管道。通道(Channel),缓冲(Buffer) ,选择器(Selector)是其主要特征。提供基于缓冲区(buffer)的块写入/读取,而以前的I/O是基于流(Stream)的方式,NIO基于块的IO操作,将最耗时的缓存区读取和填充交由底层操作系统实现,因此速度上要快得多。
2.NIO三大核心
(1)Buffer缓冲
Buffer是一个可以读写数据的内存块,对象里提供了一组方法可以更轻松使用内存块,程序读写数据都必须经过Buffer。Buffer根据数据类型不同(boolean 除外),提供了相应类型的缓冲区。
Buffer类定义了所有的缓冲区都具有的四个属性:
- position:位置,下一个要被读或写的元素的索引,每次读写缓存区数据时都会改变这个值,为下次读写作准备。
- limit:缓冲区终点,不能对超过极限的位置进行读写操作,可以修改。
- capacity:可以容纳的最大数据量,缓冲区创建时就被设定且不可改变。
- mark:标记。
- 这4个属性总会满足的关系:mark <= position <= limit <= capacity。
- position和limit的意义依赖于当前Buffer是处于读模式还是写模式。capacity的含义无论读写模式都是相同的。
Buffer是一个顶层父类,它是一个抽象类,常用的Buffer的抽象子类有:
- ByteBuffer(常用):存储字节数据到缓冲区
- IntBuffer:存储整数数据到缓冲区
- CharBuffer:存储字符数据到缓冲区
- LongBuffer:存储长整型数据到缓冲区
- DoubleBuffer:存储高精度小数到缓冲区
- FloatBuffer:存储小数到缓冲区
- ShortBuffer:存储字符串数据到缓冲区
常用方法简介:
/**ByteBuffer类提供了4个静态工厂方法来获得ByteBuffer的实例 */ //从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,称为非直接缓冲区,缓冲区建立在JVM的内存中 static ByteBuffer allocate(int capacity); //从堆外内存中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,称为直接缓冲区,缓冲区建立在操作系统的物理内存(系统磁盘)中 static ByteBuffer allocateDirect(int capacity); //通过包装的方法创建的缓冲区保留了被包装数组内保存的数据 static ByteBuffer wrap(byte[] array); //通过包装的方法创建的缓冲区保留了被包装数组内保存的数据,offset也就是包装后byteBuffer的position,而length呢就是limit-position的大小,从而我们可以得到limit的位置为length+position(offset) static ByteBuffer wrap(byte[] array, int offset, int length); /**常用的方法 */ //flip方法将Buffer从写入模式切换到读取模式,将 position设置回0,并将limit置为刚才的位置。 final Buffer flip(); //该方法做的很简单就是 position置为0,limit置为capacity,mark置为-1,buffer内容 并没有清空。 final Buffer clear(); //将所有未读的数据复制到Buffer的开头。 ByteBuffer compact(); //把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方。 final Buffer reset(); //把position设为0,mark设为-1,不改变limit的值 final Buffer rewind(); //limit和position之间相对位置差 final int remaining(); //是否还有未读内容 final boolean hasRemaining(); //相对读,从position位置读取一个byte,并将position+1,为下次读写作准备 byte get(); //绝对读,读取byteBuffer底层的bytes中下标为index的byte,不改变position byte get(int index); //从position位置开始相对读,读length个byte,并写入dst下标从offset到offset+length的区域 ByteBuffer get(byte[] dst, int offset, int length); //相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备 ByteBuffer put(byte b); //绝对写,向byteBuffer底层的bytes中下标为index的位置插入byte b,不改变position ByteBuffer put(int index, byte b); //相对写,把src中可读的部分(也就是position到limit)写入此byteBuffer ByteBuffer put(ByteBuffer src); //从src数组中的offset到offset+length区域读取数据并使用相对写写入此byteBuffer ByteBuffer put(byte[] src, int offset, int length); //获取缓冲区终点 final int limit(); //设置缓冲区终点 final Buffer limit(int newLimit);View Code
获得ByteBuffer的实例的示例如下:
/** * 测试类 */ public class Test{ public static void main(String[] args) throws IOException { //不止需要多大的内容才会产生变化 System.out.println("----------测试JVM、系统内存创建ByteBuffer--------"); //JVM System.out.println("使用alocate创建前JVM剩余容量:" + Runtime.getRuntime().freeMemory()); ByteBuffer buffer = ByteBuffer.allocate(204800); System.out.println("buffer内容:" + buffer); System.out.println("使用alocate创建后JVM剩余容量:" + Runtime.getRuntime().freeMemory()); //系统内存 File file = File.listRoots()[0]; System.out.println("使用allocateDirect创建前系统磁盘剩余容量:" + file.getFreeSpace()); ByteBuffer directBuffer = ByteBuffer.allocateDirect(204800); System.out.println("directBuffer内容:" + directBuffer); file = File.listRoots()[0]; System.out.println("使用allocateDirect创建后系统磁盘剩余容量:" + file.getFreeSpace()); System.out.println("----------使用wrap创建ByteBuffer--------"); byte[] bytes = new byte[32]; buffer = ByteBuffer.wrap(bytes); System.out.println("wrap(byte by)创建的内容:" + buffer); buffer = ByteBuffer.wrap(bytes, 5, 10); System.out.println("wrap(byte[] array, int offset, int length)创建的内容:" + buffer); } }View Code
具体方法简介可以到https://blog.csdn.net/mrliuzhao/article/details/89453082查看
(2)Channel通道
Channel通道表示IO源与目标打开的连接,类似于传统的“流”。通道(Channel)不能单独存在,它永远需要绑定一个缓存区(Buffer),所有的数据只会存在于缓存区(Buffer)中,无论你是写或是读,必然是缓存区(Buffer)通过通道(Channel)到达磁盘文件,或是磁盘文件通过通道(Channel)到达缓存区(Buffer)。
Channel通道和流非常相似,主要有以下几点区别:
- 通道可以读也可以写,流一般来说是单向的(只能读或者写)。
- 通道可以异步读写。
- 通道总是基于缓冲区Buffer来读写。
Channel通道之间的数据传输:
- transferFrom:transferFrom方法把数据从通道源传输到FileChannel
- transferTo:transferTo方法把FileChannel数据传输到另一个channel
Channel接口提供的最主要实现类如下:
- FileChannel:用于读取、写入、映射和操作文件。
- SocketChannel :通过TCP读写网络中的数据,客户端。
-
ServerSocketChannel:可以监听新进来的
TCP
连接,并对每个链接创建对应的SocketChannel,服务器端。
- DatagramChannel :通过UDP读写网络中的数据。
Scatter、Gather多个buffer读写:
- scattering read:是把数据从单个Channel写入到多个buffer。
- gathering write:把多个buffer的数据写入到同一个channel中。
FileChannel读写示例如下:
/** * 测试类 */ public class Data { public static void main(String[] args) throws IOException { //1.创建文件读写对象,这个类比较陈旧,正式项目中还是使用FileInputStream RandomAccessFile file = new RandomAccessFile("d:/Desktop/hello.txt", "rw"); //2.获取文件读写通道 FileChannel fileChannel = file.getChannel(); //3.创建缓冲区 ByteBuffer buffer = ByteBuffer.allocate(512); /*System.out.println("开始文件写入..."); buffer.put("FileChannel测试".getBytes("utf-8"));//将内容写入缓冲区 buffer.flip();//切换缓冲区至可读模式 fileChannel.write(buffer);//把内容写入文件 System.out.println("完成文件写入...");*/ //4.读取文件内容填充到缓冲区 System.out.println("开始文件读取..."); fileChannel.read(buffer); buffer.flip();//切换缓冲区至可写模式 byte data [] = new byte[buffer.limit()]; buffer.get(data);//将数据写至 byte数组 System.out.println(new String(data)); System.out.println("完成文件读取..."); //关闭文件读写流和通道 file.close(); fileChannel.close(); } }View Code
通过上面的示例可以看出,FileChannel必须被打开 ,但是你无法直接打开FileChannel(FileChannel是抽象类)。需要通过InputStream,OutputStream或RandomAccessFile获取FileChannel。
SocketChannel读写示例如下:
/** * 测试类 */ public class Test{ public static void main(String[] args) throws IOException { //1.通过SocketChannel的open()方法创建一个SocketChannel对象,客户端 SocketChannel socketChannel = SocketChannel.open(); //2.连接到远程服务器(连接此通道的socket) socketChannel.connect(new InetSocketAddress("127.0.0.1", 3333)); // 3.创建写数据缓存区对象,往服务器发送数据 ByteBuffer writeBuffer = ByteBuffer.allocate(128); writeBuffer.put("SocketChannel测试".getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); //创建读数据缓存区对象,读取传输数据 ByteBuffer readBuffer = ByteBuffer.allocate(128); socketChannel.read(readBuffer); //String 字符串常量,不可变;StringBuffer 字符串变量(线程安全),可变;StringBuilder 字符串变量(非线程安全),可变 StringBuilder stringBuffer = new StringBuilder(); //4.将Buffer从写模式变为可读模式 readBuffer.flip(); while (readBuffer.hasRemaining()) { stringBuffer.append((char) readBuffer.get()); } System.out.println("从服务端接收到的数据:"+stringBuffer); socketChannel.close(); } }View Code
通过上面的示例可以看出,SocketChannel用于创建基于tcp协议的客户端对象,因为SocketChannel中不存在accept()方法,所以它不能成为一个服务端程序。通过 connect()方法 ,SocketChannel对象可以连接到其他tcp服务器程序。
ServerSocketChannel读写示例如下:
/** * 测试类 */ public class Test{ public static void main(String[] args) throws IOException { try { //1.通过ServerSocketChannel 的open()方法创建一个ServerSocketChannel对象,open方法的作用:打开套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); //2.通过ServerSocketChannel绑定ip地址和port(端口号) ssc.socket().bind(new InetSocketAddress("127.0.0.1", 3333)); //通过ServerSocketChannelImpl的accept()方法创建一个SocketChannel对象用户从客户端读/写数据 SocketChannel socketChannel = ssc.accept(); //3.创建写数据的缓存区对象 ByteBuffer writeBuffer = ByteBuffer.allocate(128); writeBuffer.put("ServerSocketChannel测试".getBytes()); writeBuffer.flip(); socketChannel.write(writeBuffer); //创建读数据的缓存区对象 ByteBuffer readBuffer = ByteBuffer.allocate(128); //读取缓存区数据 socketChannel.read(readBuffer); StringBuilder stringBuffer = new StringBuilder(); //4.将Buffer从写模式变为可读模式 readBuffer.flip(); while (readBuffer.hasRemaining()) { stringBuffer.append((char) readBuffer.get()); } System.out.println("从客户端接收到的数据:"+stringBuffer); socketChannel.close(); ssc.close(); } catch (IOException e) { e.printStackTrace(); } } }View Code
通过上面的示例可以看出,ServerSocketChannel用于创建基于tcp协议的服务器端对象,ServerSocketChannel允许我们监听TCP链接请求,通过ServerSocketChannelImpl的 accept()方法 可以创建一个SocketChannel对象用户从客户端读/写数据。
DatagramChannel读写示例如下:
/** * 测试类 */ public class Test{ public static void main(String[] args) throws IOException { try { //1.通过DatagramChannel 的open()方法创建一个DatagramChannel对象 DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(9999)); //2.接收服务器发送的数据 ByteBuffer buf = ByteBuffer.allocate(1024); buf.clear(); channel.receive(buf); //3.往指定服务器发送数据 String newData = "New String to wrte to file..."; buf.clear(); buf.put(newData.getBytes()); buf.flip(); channel.send(buf, new InetSocketAddress("127.0.0.1", 8888)); channel.close(); } catch (IOException e) { e.printStackTrace(); } } }View Code
通过上面的示例可以看出,类似于java 网络编程的DatagramSocket类。使用UDP进行网络传输, UDP是无连接,面向数据报文段的协议,对传输的数据不保证安全与完整。
(3)Selector选择器
Selector称为选择器,当然你也可以翻译为多路复用器。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
为了实现Selector管理多个SocketChannel,必须将具体的SocketChannel对象注册到Selector,并声明需要监听的事件。
一共有4种事件:
- OP_CONNECT:客户端连接服务端事件,对应值8
- OP_ACCEPT:服务端接收客户端连接事件,对应值16
- OP_READ:读事件,对应值1
- OP_WRITE:写事件,对应值4
这4种事件很好理解,可以理解为每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,最后写回数据。所以,当SocketChannel有对应的事件发生时,Selector都可以观察到,并进行相应的处理。
Selector、SelectableChannel、SelectionKey的含义:
- Selector(选择器):管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
- SelectableChannel(可选择通道):这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。因为FileChannel类没有继承SelectableChannel因此不是可选通道,而所有socket通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,哪种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
- SelectionKey(选择键):选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。
常用方法:
/**Selector的常用方法 */ //创建一个Selector对象,一定要使用configureBlocking(false)把Channel设置成非阻塞模式,否则会出现异常 static Selector open(); //获取Selector对象是否为开启 abstract boolean isOpen(); //是个阻塞方法,有通道就绪才会返回。 abstract int select(); //最多阻塞timeout毫秒,即使没有通道就绪也会返回,若超时返回,则当前线程中断标志位被设置。若阻塞时间内有通道就绪,就提前返回。 abstract int select(long timeout); //是个非阻塞方法,即使没有通道就绪也是立即返回。 abstract int selectNow(); //获取已选择的键集合 abstract Set<SelectionKey> selectedKeys(); //该方法让处在阻塞状态的select()方法立刻返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。 abstract Selector wakeup(); //该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。 abstract void close(); /**SelectionKey的常用方法 */ //返回此选择键所关联的通道,即使此key已经被取消,仍然会返回。 abstract SelectableChannel channel(); //返回此选择键所关联的选择器,即使此键已经被取消,仍然会返回。 abstract Selector selector(); //检测此key是否有效,当key被取消,或者通道被关闭,或者selector被关闭,都将导致此key无效。 abstract boolean isValid(); //请求将此键取消注册,一旦返回成功,那么该键就是无效的,被添加到selector的cancelledKeys中, abstract void cancel(); //获得此键的interes集合数 abstract int interestOps(); //将此键的interst设置为指定值,此操作会对ops和channel、validOps进行校验。如果此ops不会当前channel支持,将抛出异常。 abstract SelectionKey interestOps(int ops); //获取此键上ready操作集合,即在当前通道上已经就绪的事件。 abstract int readyOps(); //测试此键的通道是否已完成其套接字连接操作。 final boolean isconnectable(); //测试此键的通道是否已准备好接受新的套接字连接。 final boolean isacceptable(); //检测此键是否为read读事件 final boolean isReadable(); //检测此键是否为write写事件 final boolean isWritable(); //将给定的对象作为附件添加到此key上,在key有效期间,附件可以在多个ops事件中传递。 final Object attach(Object ob); //获取一个channel的附件,可以再当前Channel(或者说是SelectionKey)生命周期*享,但是attachment数据不会作为socket数据在网络中传输。 final Object attachment();View Code
3.NIO的实现示例
先启动服务器程序,后启动客户端程序,可以看到结果。
(1)简单示例
服务端示例如下:
/** * 测试类 */ public class Server{ public static void main(String[] args) throws IOException { startServer(); } public static void startServer() { ServerSocketChannel serverChannel = null; Selector selector = null; try { // 1、获取Selector选择器 selector = Selector.open(); // 2、获取通道 serverChannel = ServerSocketChannel.open(); // 3.设置为非阻塞模式 serverChannel.configureBlocking(false); // 4、绑定连接端口 serverChannel.bind(new InetSocketAddress(8888)); // 5、将通道注册到选择器上,并注册的操作为:接收操作 serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { System.out.println("NIO服务器创建失败,失败原因为:"+e.getMessage()); return; } while (true) { try { //6.查询获取准备就绪的注册过的操作,如果等于0则不进行以下操作,因为调用该serverSocket的accept方法,会阻塞 if(selector.select() == 0) continue; //7、获取当前选择器中所有注册的选择键(已经准备就绪的操作) Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { // 8、获取准备就绪SelectionKey的相关数据,并移除选择键 SelectionKey key = iter.next(); iter.remove(); // 9、判断key是具体的什么事件 if (key.isAcceptable()) { // 10、若接受的事件是接收就绪操作,就获取客户端连接 SocketChannel socketChannel = serverChannel.accept(); // 11、切换为非阻塞模式 socketChannel.configureBlocking(false); // 12、将该通道注册到selector选择器上 socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // 13、获取该选择器上的读就绪状态的通道 SocketChannel socketChannel = (SocketChannel) key.channel(); // 14、读取数据 ByteBuffer buffer = ByteBuffer.allocate(1024); String content = ""; while (socketChannel.read(buffer) > 0) { buffer.flip(); content += decode(buffer); buffer.clear(); socketChannel.write(buffer); } System.out.println(content); socketChannel.close(); } } } catch (IOException e) { e.printStackTrace(); } } //关闭连接,因为此示例即便是异常也不会退出,所以不需要关闭 /*try { serverChannel.close(); } catch (IOException e) { e.printStackTrace(); }*/ } /**数据字符编码处理 */ private static Charset charset = Charset.forName("UTF-8"); public static String decode(ByteBuffer buffer) { try { return String.valueOf(charset.newDecoder().decode(buffer)); } catch (Exception ex) { ex.printStackTrace(); return null; } } }View Code
客户端示例如下:
public class Client{ public static void main(String[] args) { // 创建 20 个线程, 跟服务器发起连接 for (int i = 0; i < 20; i++) { new Thread(new Runnable() { @Override public void run() { try { Socket socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1", 8888)); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); writer.write ("当前线程名称为:" + Thread.currentThread().getName()); writer.flush (); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); for (String line = reader.readLine(); line != null; line = reader.readLine()) { System.out.println("接收服务器回复内容为: " + line); } socket.close(); } catch (IOException e) { e.printStackTrace (); } } }).start (); } } }View Code
(2)应用实例(简易版)
服务端示例如下:
/** * 测试类 */ public class Server{ public static void main(String[] args) throws IOException { NioTcpManager.getInstance().startServerPort(); } } /**SelectionKey附件实体,用于存储写入客户端的数据 */ class KeyAttach { private String sn;//序号,唯一编码 private int commandType;//命令类型 private List<byte[]> commands = new ArrayList<byte[]>();//等待写的数据集合 public String getSn() { return sn; } public void setSn(String sn) { this.sn = sn; } public int getCommandType() { return commandType; } public void setCommandType(int commandType) { this.commandType = commandType; } public List<byte[]> getCommands() { return commands; } public void setCommands(List<byte[]> commands) { this.commands = commands; } } /**NIO管理类 */ class NioTcpManager implements Runnable { private static ServerSocketChannel serverChannel = null;//服务器TCP通道 private static Selector selector = null;//选择器对象 private static int PORT = 4042;//tcp监听端口 public static NioTcpManager nioTcpManager = new NioTcpManager(); private static List<SelectionKey> rpool = new LinkedList<SelectionKey>(); //读取队列 private static List<SelectionKey> wpool = new LinkedList<SelectionKey>(); //写队列 private NioTcpManager() { //创建读线程 for (int i = 0; i < 10; i++) new Thread(new NioTcpReader()).start(); //创建写线程 for (int i = 0; i < 10; i++) new Thread(new NioTcpWriter()).start(); } public static NioTcpManager getInstance() { return nioTcpManager; } /**开启TCP端口监听 */ public void startServerPort() { if (serverChannel != null && serverChannel.isOpen())//检查TCP是否已经开启监听 return; try { System.out.println("启动绑定TCP端口..."); selector = Selector.open();//获取Selector选择器 serverChannel = ServerSocketChannel.open();//获取通道 serverChannel.socket().bind(new InetSocketAddress(PORT));//绑定连接端口 serverChannel.configureBlocking(false);//设置为非阻塞模式 serverChannel.register(selector, SelectionKey.OP_ACCEPT);//将通道注册到选择器上,注册的操作为:接收操作 new Thread(this).start();//监听线程启动 } catch (IOException e) { System.out.println("该TCP端口已被使用..."); } } public void run() { System.out.println("启动TCP监听线程..."); while (true) { try { int keyAdded = selector.select();//获取准备就绪且注册过的操作 if (keyAdded > 0) {//存在准备就绪操作 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();//获取当前选择器中已经准备就绪的选择键 while (iter.hasNext()) {//循环已就绪的选择键 SelectionKey key = (SelectionKey)iter.next();//取出一个就绪SelectionKey的相关数据 iter.remove();//移除已经取出选择键 if (key.isAcceptable()) {//是否为接收就绪操作 ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();//获取客户端连接 SocketChannel socketChannel = serverChannel.accept();//获取连接通道 socketChannel.configureBlocking(false);//设置通道为非阻塞模式 socketChannel.register(selector, SelectionKey.OP_READ);//将通道注册到选择器上,注册的操作为:读操作 System.out.println("客户端发送请求连接,地址: " + socketChannel.getRemoteAddress()); selector.wakeup(); } else if (key.isReadable()) {//是否为读就绪操作 KeyAttach keyAttach = (KeyAttach)key.attachment();//SelectionKey附件实体对象 if (keyAttach != null && (!keyAttach.getCommands().isEmpty())) {//判断附件中是否存在需要写入客户端数据 NioTcpWriter.processRequest(key);//添加到TCP写入队列处理 key.cancel();//取消此键的注册 } else { NioTcpReader.processRequest(key);//添加到TCP读取队列处理 key.cancel();//取消此键的注册 } } else if (key.isWritable()) {//是否为写就绪操作 NioTcpWriter.processRequest(key);//添加到TCP写入队列处理 key.cancel();//取消此键的注册 } } } else {//不存在准备就绪操作,则注册新的通道 addRegister(); } } catch (Exception e) { e.printStackTrace(); } } } /**添加新的通道注册 */ private void addRegister() { synchronized (wpool) { while (!wpool.isEmpty()) { SelectionKey key = wpool.remove(0); SocketChannel schannel = (SocketChannel) key.channel(); try { schannel.register(selector, SelectionKey.OP_WRITE, key.attachment());//将通道注册到选择器上,注册的操作为:写操作 } catch (Exception e) { System.out.println("注册写入通道失败..."); } } } synchronized (rpool) { while (!rpool.isEmpty()) { SelectionKey key = rpool.remove(0); SocketChannel schannel = (SocketChannel) key.channel(); try { schannel.register(selector, SelectionKey.OP_READ, key.attachment());//将通道注册到选择器上,注册的操作为:读操作 } catch (Exception e) { System.out.println("注册读取通道失败..."); } } } } /**添加新的客户端读请求,并唤醒所有等待的读线程 */ public static void processReadRequest(SelectionKey key) { synchronized (rpool) { if (!rpool.contains(key)) { rpool.add(rpool.size(), key);//添加新的SelectionKey到队列中,等待处理 rpool.notifyAll();//唤醒所有等待(对象的)读线程 } } selector.wakeup();//解除selector的阻塞状态,以便注册新的通道 } /**添加新的客户端写请求,并唤醒所有等待的写线程 */ public static void processWriteRequest(SelectionKey key) { synchronized (wpool) { if (!wpool.contains(key)) { wpool.add(wpool.size(), key);//添加新的SelectionKey到队列中,等待处理 wpool.notifyAll();//唤醒所有等待(对象的)写线程 } } selector.wakeup(); //解除selector的阻塞状态,以便注册新的通道 } } /**NIO写入类 */ class NioTcpWriter implements Runnable { private static List<SelectionKey> pool = new LinkedList<SelectionKey>(); private boolean busy = true; //当前线程是否正在处理写事务 public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { if (busy) busy = false; //释放当前的锁,进入等待状态。 pool.wait(); } if (!busy) busy = true; //从队列中取出一个SelectionKey来进行处理 key = pool.remove(0); } //写入数据到客户端 write(key); } catch (Exception e) { continue; } } } /**向客户端发送数据 */ public void write(SelectionKey key) { //获取SelectionKey中存储的自定义KeyAttach数据 KeyAttach keyAttach = (KeyAttach) key.attachment(); if (keyAttach != null && !keyAttach.getCommands().isEmpty()) {//发送命令的数据不为空 try { //获取通道 SocketChannel socketChannel = (SocketChannel) key.channel(); synchronized (keyAttach.getCommands()) { for (byte[] command : keyAttach.getCommands()) {//循环写 //创建缓冲区,并把需要写入客户端数据存入缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(command.length); byteBuffer.put(command); byteBuffer.flip(); //发送命令 socketChannel.write(byteBuffer); if (byteBuffer.remaining() == 0)//判断缓存区数据是否写完,写完则清空 byteBuffer.clear(); } } } catch (IOException e) { // e.printStackTrace(); System.out.println("写入客户端数据失败..."); } finally { keyAttach.getCommands().clear(); } } NioTcpManager.processReadRequest(key); } /**处理需要往客户端写入的数据队列 */ public static void processRequest(SelectionKey key) { synchronized (pool) { pool.add(pool.size(), key);//添加新的SelectionKey到队列中,等待处理 pool.notify();//唤醒一个等待(对象的)写线程并使该线程开始执行 } } } /**NIO读取类 */ class NioTcpReader implements Runnable { public static NioTcpAnalyzer nioTcpAnalyzer = NioTcpAnalyzer.getInstance(); private static List<SelectionKey> pool = new LinkedList<SelectionKey>(); private boolean busy = true; //当前线程是否正在处理读事务 @Override public void run() { while (true) { try { SelectionKey key; synchronized (pool) { while (pool.isEmpty()) { if (busy) busy = false; //释放当前的锁,进入等待状态。 pool.wait(); } if (!busy) busy = true; //从队列中取出一个SelectionKey来进行处理 key = pool.remove(0); } //读取客户端数据 read(key); } catch (Exception e) { continue; } } } /**读取客户端发送数据 */ public void read(SelectionKey key) { ByteBuffer buffer = ByteBuffer.allocate(1024);//创建缓冲区 SocketChannel socketChannel = (SocketChannel) key.channel();//获取连接通道 KeyAttach keyAttach = (KeyAttach) key.attachment();//获取SelectionKey中存储的自定义KeyAttach数据 try { int size = socketChannel.read(buffer);//读取客户端发送内容 String content = ""; while (size == 1024) { buffer.flip(); content += decode(buffer); buffer.clear(); content = nioTcpAnalyzer.analyzerTcpData(key, socketChannel, content);//处理客户端数据 size = socketChannel.read(buffer); } buffer.flip(); if (size > 0) {//存在未读完数据,但是不满足1024 content += decode(buffer); buffer.clear(); nioTcpAnalyzer.analyzerTcpData(key, socketChannel, content); //如果不需要回复,则重新注册读事件 keyAttach = (KeyAttach) key.attachment();//获取SelectionKey中存储的自定义KeyAttach数据 if (keyAttach == null) { NioTcpManager.processReadRequest(key); } else { synchronized (keyAttach.getCommands()) { if (keyAttach.getCommands().isEmpty()) { NioTcpManager.processReadRequest(key); } } } } else if (size < 0)//客户端的数据发送完毕,并且主动的关闭连接,则服务器也移除连接 nioTcpAnalyzer.removeChannel(key);//移除客户端连接 } catch (Exception e) { nioTcpAnalyzer.removeChannel(key);//移除客户端连接 System.out.println(keyAttach.getSn() + "客户端强迫关闭了一个连接 ..."); //e.printStackTrace(); } } /**数据编码处理 */ private Charset charset = Charset.forName("UTF-8"); public String decode(ByteBuffer buffer) { try { return String.valueOf(charset.newDecoder().decode(buffer)); } catch (Exception ex) { ex.printStackTrace(); return null; } } /**处理需要读取客户端的数据队列 */ public static void processRequest(SelectionKey key) { synchronized (pool) { pool.add(pool.size(), key);//添加新的SelectionKey到队列中,等待处理 pool.notify();//唤醒一个等待(对象的)读线程并使该线程开始执行 } } } /**NIO数据解析类 */ class NioTcpAnalyzer { // 初始化本类 public static NioTcpAnalyzer nioTcpAnalyzer = new NioTcpAnalyzer(); public static NioTcpAnalyzer getInstance() {//获取本类实例 return nioTcpAnalyzer; } /**解析收到的数据 */ public synchronized String analyzerTcpData(SelectionKey key, SocketChannel socketChannel, String content) { try { //以下为随意弄的一个测试解析,可以根据需求,修改 System.out.println(content); String[] data = content.split(","); int commandType = Integer.valueOf(data[1].substring(data[1].length()-1, data[1].length()));//获取命令类型 if(commandType == 1){ // 未识别(未知DataLog序列号)的Socket,全新的socket连接 if (key.attachment() == null) { // 给当前通道添加附加对象 KeyAttach keyAttach = new KeyAttach(); keyAttach.setSn(data[0]); keyAttach.setCommandType(commandType); key.attach(keyAttach); } String info = "服务器回复:"+data[0].substring(3) + "," + data[1] + ",内容:连接正常"; writeByteData(info.getBytes("utf-8"), key);//回复客户端数据 } } catch (Exception e) { } return content; } /**回复数据到对应通道(客户端) */ private void writeByteData(byte[] data, SelectionKey key) { KeyAttach keyAttach = (KeyAttach) key.attachment(); synchronized (keyAttach.getCommands()) { keyAttach.getCommands().add(data); } NioTcpManager.processWriteRequest(key); } /**移除TCP通道连接 */ public boolean removeChannel(SelectionKey key) { try { SocketChannel socketChannel = (SocketChannel) key.channel(); socketChannel.finishConnect(); socketChannel.socket().close(); socketChannel.close(); key.cancel(); } catch (Exception e) { // e.printStackTrace(); System.out.println("移除远程连接失败 , 有可能并不存在此连接..."); return false; } return true; } }View Code
客户端示例如下:
public class Client{ public static void main(String[] args) { // 创建 20 个线程, 跟服务器发起连接 //for (int i = 0; i < 20; i++) { new Thread(new Runnable() { @Override public void run() { SocketChannel socketChannel = null; Selector selector = null; ByteBuffer writeBuffer = ByteBuffer.allocate(1024); ByteBuffer readBuffer = ByteBuffer.allocate(1024); try { socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("127.0.0.1", 4042)); selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); if (socketChannel.finishConnect()) { String info = "客户端" + Thread.currentThread().getName() + ",命令类型:1,内容:连接是否正常"; while (true) { writeBuffer.clear(); writeBuffer.put(info.getBytes("utf-8")); writeBuffer.flip(); while (writeBuffer.hasRemaining()) { socketChannel.write(writeBuffer); } int bytesRead = socketChannel.read(readBuffer); if (bytesRead > 0) { readBuffer.flip(); byte[] bytes = new byte[bytesRead]; readBuffer.get(bytes, 0, bytesRead); String str = new String(bytes); System.out.println(str); readBuffer.clear(); } Thread.sleep(2000); } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { if (socketChannel != null) { socketChannel.close(); System.out.println("服务器为启动..."); } } catch (IOException e) { e.printStackTrace(); } } } }).start (); // } } }View Code