一、序言
在Java的软件设计开发中,通信架构是不可避免的,我们在进行不同系统或者不同进程之间的数据交互,或者在高并发下的通信场景都需要用到网络通信相关的技术。
1、通信技术整体解决的问题:
1)局域网内的通信要求;
2)多系统间的底层消息传递机制;
3)高并发下,大数据量的通信场景需要;
4)游戏行业。
2、在Java中,主要有三种IO模型,分别是:
1)同步阻塞IO(BIO);
2)同步非阻塞IO(NIO);
3)异步IO(AIO)。
二、同步阻塞IO(BIO)
Java BIO(blocking I/O):就是传统的java io编程,其相关的类和接口在java.io包下。同步并阻塞,服务器实现模式为一个连接一个线程,即每当客户端有连接请求时,服务端都需要启动一个线程进行处理,如下图。
在高并发的情况下,服务端会产生大量线程,线程间会发生竞争和上下文切换,同时要占用栈空间和CPU资源,而且其中有些线程可能什么事情都不会做,一直阻塞着,这些情况都会造成服务端性能下降。
所以BIO方式适合用于连接数目固定,而且比较小的架构,这种方式对服务器资源要求比较高,并发局限于应用中,但是程序简单易懂。
1、Java中的BIO分布式分为两种:
1)传统BIO:即上图中的一请求一应答;
2)伪异步IO:通过线程池固定线程的最大数量,可以防止资源的浪费。
2、BIO编程简单流程:
1)服务器启动一个ServerSocket;
2)客户端启动Socket请求与服务器连接,默认情况下服务器端需要对每个客户建立一个线程与之通信;
3)客户端发出请求之后,先咨询服务器是否有线程响应,如果没有则会等待,或者被服务端拒绝;
4)如果有响应,客户端线程会等待请求结束后,再继续执行。
3、使用BIO进行通信的简单案例
1)服务端代码
public class Server { public static void main(String[] args) throws IOException { //服务器端开启一个ServerSocket,并绑定6666端口 ServerSocket ss = new ServerSocket(6666); System.out.println("服务器已开启!"); while (true){ Socket socket = ss.accept(); System.out.println("来自" + socket.getRemoteSocketAddress() + "的连接"); new Handler(socket).start(); } } } //为每个客户端连接开启的线程 class Handler extends Thread { Socket socket; public Handler(Socket socket) { this.socket = socket; } @Override public void run() { try (InputStream inputStream = socket.getInputStream()) { try (OutputStream outputStream = socket.getOutputStream()) { handle(inputStream, outputStream); } } catch (IOException e) { try{ //关闭socket socket.close(); }catch (IOException e1){ } } System.out.println("客户端" + socket.getRemoteSocketAddress()+ "断开连接"); } private void handle(InputStream inputStream, OutputStream outputStream) throws IOException{ //获得一个字符输入流 var reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); //获得一个字符输出流 var writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)); writer.write("连接成功!\n"); writer.flush(); while(true){ //每次从管道中读入一行 String str = reader.readLine(); //当客户端传来"Bye"代表断开连接 if("Bye".equals(str)){ writer.write("Bye\n"); writer.flush(); break; } writer.write("已经收到:" + str + "\n"); writer.flush(); } } }
2)客户端代码
public class Client { public static void main(String[] args) throws IOException { Socket socket = new Socket("127.0.0.1", 6666); try (InputStream inputStream = socket.getInputStream()){ try (OutputStream outputStream = socket.getOutputStream()){ handle(inputStream, outputStream); } }catch (IOException e){ try{ socket.close(); }catch (IOException e1){ } } } private static void handle(InputStream inputStream, OutputStream outputStream) throws IOException{ var reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); var writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)); Scanner in = new Scanner(System.in); System.out.println("<<<" + reader.readLine()); while (true){ System.out.print(">>>"); String str = in.nextLine(); writer.write(str); writer.newLine(); writer.flush(); String resp = reader.readLine(); System.out.println("<<<" + resp); if("Bye".equals(str)){ break; } } } }
3)启动服务器端,再启动客户端,使用客户端与服务端进行通信
Server的控制台:
服务器已开启! 来自/127.0.0.1:51871的连接 客户端/127.0.0.1:51871断开连接
Client的控制台:
<<<连接成功! >>>你好 <<<已经收到:你好 >>>在吗 <<<已经收到:在吗 >>>吃了吗 <<<已经收到:吃了吗 >>>Bye <<<Bye
三、同步非阻塞IO(NIO)
Java NIO(New IO):也称java non-blocking IO,是从java1.4版本开始引入的一个新IO API,可以代替标准的java IO API。NIO与原来的IO具有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。NIO可以理解为非阻塞IO,传统IO的read和write只能阻塞执行,线程在读写IO期间不能干其他事。而NIO则可以配置socket为非阻塞模式。
Java NIO的阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用,则什么都不会获取,而不是保持线程阻塞,所以直至数据变得可用读取之前,该线程可以继续做其他事情。
工作示意图如下:
1、BIO 和 NIO 的区别?
1)BIO 以流的方式处理数据,而 NIO 以块的方式处理数据;
2)BIO 是阻塞的, NIO 是非阻塞的
3)BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区,或者从缓冲区写入到通道中。
2、NIO 的三大组件
1)Buffer 缓冲区:缓冲区本质是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成了 NIO Buffer 对象,并提供了一组方法,用来方便对该快的访问。
Buffer 就像一个数组,可以保存多个相同类型的数据。根据数据类型不同,有以下常用的 Buffer 子类:ByteBuffer, CharBuffer, ShortBuffer, IntBuffer, LongBuffer, FloatBuffer, DoubleBuffer。
创建方法如下:
static XxxBuffer allocate(int capacity) : 创建一个容量为capacity 的 XxxBuffer 对象
基本属性:
- 容量(capacity):作为一个内存块,Buffer具有一定的固定大小,也称为“容量”,缓冲区不能为负,并且创建后不能更改。
- 限制(limit):表示缓冲区中可以操作的数据的大小(位于limit后的数据不能进行读写)。缓冲区的Buffer也不能为负数,并且不能大于其容量。在写入模式下,limit大于capacity。在读取模式下,limit等于写入的数据量。
- 位置(position):下一个要读取或者写入的数据的索引。缓冲区的位置不能为负,并且不能大于limit。
- 标记(mark)与重置(reset):标记是一个索引,通过Buffer中得到mark()方法指定Buffer中的一个特定的position,之后可以调用reset()方法恢复到这个position。
常见方法:
Buffer clear() 清空缓冲区并返回对缓冲区的引用 Buffer flip() 为 将缓冲区的界限设置为当前位置,并将当前位置重置为 0 int capacity() 返回 Buffer 的 capacity 大小 boolean hasRemaining() 判断缓冲区中是否还有元素 int limit() 返回 Buffer 的界限(limit) 的位置 Buffer limit(int n) 将设置缓冲区界限为 n, 并返回一个具有新 limit 的缓冲区对象 Buffer mark() 对缓冲区设置标记 int position() 返回缓冲区的当前位置 position Buffer position(int n) 将设置缓冲区的当前位置为 n , 并返回修改后的 Buffer 对象 int remaining() 返回 position 和 limit 之间的元素个数 Buffer reset() 将位置 position 转到以前设置的 mark 所在的位置 Buffer rewind() 将位置设为为 0, 取消设置的 mark
读取操作:
Buffer 所有子类提供了两个用于数据操作的方法:get()put() 方法 取获取 Buffer中的数据 get() :读取单个字节 get(byte[] dst):批量读取多个字节到 dst 中 get(int index):读取指定索引位置的字节(不会移动 position) 放到 入数据到 Buffer 中 中 put(byte b):将给定单个字节写入缓冲区的当前位置 put(byte[] src):将 src 中的字节写入缓冲区的当前位置 put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)
2)通道(channel)
通道(Channel):由java.nio.channels包下定义的,表示IO源与目标打开的连接。Channel类似于传统的“流”。只不过Channel本身不能直接访问数据,Channel只能与Buffer进行交互。
NIO通道与流的区别:
- 通道可以同时进行读写,而流只能读或者只能写。
- 通道可以实现异步读取数据。
- 通道可以从缓冲中读数据,也可以写数据到缓冲。
Channel在NIO中是一个接口,常用的Channel实现类有:
- FileChannel:用于读取、写入、映射和操作文件的通道。
- DatagramChannel:通过UDP读写网络中的数据通道。
- SocketChannel:通过TCP读取网络中的数据。
- ServerSocketChannel:可以监听新进来的TCP连接,对每个新进来的连接都会创建一个SocketChannel。
3)选择器(Selector)
选择器(Selector)是SelectableChannle对象的多路复用器,Selector可以同时监控多个SelectorableChannel的IO状况,利用Selector可以使一个单独的线程管理多个Channel。Selector是NIO非阻塞的核心,其能检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件,然后针对每个事件作出相应的处理。
通过Selector.open()方法创建一个Selector:
Selector selector = Selector.open();
4)入门案例
/** 客户端 */ public class Client { public static void main(String[] args) throws Exception { //1. 获取通道 SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999)); //2. 切换非阻塞模式 sChannel.configureBlocking(false); //3. 分配指定大小的缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); //4. 发送数据给服务端 Scanner scan = new Scanner(System.in); while(scan.hasNext()){ String str = scan.nextLine(); buf.put((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(System.currentTimeMillis()) + "\n" + str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); } //5. 关闭通道 sChannel.close(); } } /** 服务端 */ public class Server { public static void main(String[] args) throws IOException { //1. 获取通道 ServerSocketChannel ssChannel = ServerSocketChannel.open(); //2. 切换非阻塞模式 ssChannel.configureBlocking(false); //3. 绑定连接 ssChannel.bind(new InetSocketAddress(9999)); //4. 获取选择器 Selector selector = Selector.open(); //5. 将通道注册到选择器上, 并且指定“监听接收事件” ssChannel.register(selector, SelectionKey.OP_ACCEPT); //6. 轮询式的获取选择器上已经“准备就绪”的事件 while (selector.select() > 0) { System.out.println("轮一轮"); //7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)” Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { //8. 获取准备“就绪”的是事件 SelectionKey sk = it.next(); //9. 判断具体是什么事件准备就绪 if (sk.isAcceptable()) { //10. 若“接收就绪”,获取客户端连接 SocketChannel sChannel = ssChannel.accept(); //11. 切换非阻塞模式 sChannel.configureBlocking(false); //12. 将该通道注册到选择器上 sChannel.register(selector, SelectionKey.OP_READ); } else if (sk.isReadable()) { //13. 获取当前选择器上“读就绪”状态的通道 SocketChannel sChannel = (SocketChannel) sk.channel(); //14. 读取数据 ByteBuffer buf = ByteBuffer.allocate(1024); int len = 0; while ((len = sChannel.read(buf)) > 0) { buf.flip(); System.out.println(new String(buf.array(), 0, len)); buf.clear(); } } //15. 取消选择键 SelectionKey it.remove(); } } } }