文章目录
使用nio包下的类简单实现一个服务端代码熟悉前面介绍的API
- 采用阻塞式
- 采用非阻塞
- 阻塞和非阻塞混用
1 阻塞式服务端
1.1 服务端
采用阻塞模式,用线程池中的工作线程处理每个客户连接。
- 当ServerSocketChannel与SocketChannel采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多个线程。在EchoServer类中,利用java.util.concurrent包中提供的线程池ExecutorService来处理与客户的连接。
- EchoServer类的构造方法负责创建线程池,启动服务器,把它绑定到一个本地端口。EchoServer类的service()方法负责接收客户的连接。每接收到一个客户连接,就把它交给线程池来处理,线程池取出一个空闲的线程,来执行Handler对象的run()方法。Handler类的handle()方法负责与客户通信。该方法先获得与SocketChannel关联的Socket对象,然后从Socket对象中得到输入流与输出流,再接收和发送数据。
package study.wyy.net.nio.server;
import lombok.extern.slf4j.Slf4j;
import study.wyy.net.nio.thread.RequestHandler;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author wyaoyao
* @date 2021/3/17 17:12
* 阻塞式服务端示例
*/
@Slf4j
public class BlockEchoServer {
/**
* 服务端口号
*/
private final int port;
private final ServerSocketChannel serverSocketChannel;
private final ServerSocket serverSocket;
private final ExecutorService executorService;
/**
* 线程池中工作的线程数目
*/
private final int POOL_MULTIPLE = 4;
public BlockEchoServer(int port) throws IOException {
this.port = port;
executorService = Executors.newFixedThreadPool(
// ava.lang.Runtime.availableProcessors() 方法: 返回可用处理器的Java虚拟机的数量。
Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE
);
// 打开通道
this.serverSocketChannel = ServerSocketChannel.open();
// 返回与ServerSocketChannel关联的ServerSocket对象,每个ServerSocketChannel对象都与一个ServerSocket对象关联
serverSocket = serverSocketChannel.socket();
// 使得在同一个主机上关闭了服务器,紧接着再启动服务器程序时,可以顺利绑定相同的端口
serverSocket.setReuseAddress(true);
// 与本地的端口绑定
serverSocket.bind(new InetSocketAddress(port));
log.info("the server has bind address is {}:{}", this.serverSocket.getInetAddress().getHostAddress(), this.port);
}
public void service() {
while (true) {
SocketChannel socketChannel;
try {
// 等待接收客户端连接,一旦有客户端连接,就会返会与当前客户端连接的SocketChannel的对象
socketChannel = serverSocketChannel.accept();
// 开启一个线程去处理当前客户端连接
executorService.submit(new RequestHandler(socketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package study.wyy.net.nio.thread;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.net.Socket;
import java.nio.channels.SocketChannel;
/**
* @author wyaoyao
* @date 2021/3/17 17:35
*/
@Slf4j
public class RequestHandler implements Runnable {
private final SocketChannel socketChannel;
private Socket socket;
public RequestHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
try {
// 获得与socketChannel关联的Socket对象
socket = socketChannel.socket();
log.info("new client connection from {}:{} accept", socket.getInetAddress(), socket.getPort());
// 获取输入流
BufferedReader reader = getReader(socket);
// 获取输出流
PrintWriter writer = getWriter(socket);
String msg = null;
while ((msg = reader.readLine()) != null) {
log.info("accept message from client is {}", msg);
// 返回响应给客户端
writer.println("echo: " + msg);
if (msg.contains("bye")) {
// 如果客户端发来的是bye,则退出当前会话
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socketChannel != null){
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public BufferedReader getReader(Socket socket) throws IOException {
InputStream inputStream = socket.getInputStream();
return new BufferedReader(new InputStreamReader(inputStream));
}
public PrintWriter getWriter(Socket socket) throws IOException {
return new PrintWriter(socket.getOutputStream(), true);
}
}
1.2 测试
- 为了测试先写一个客户端: 采用阻塞式
package study.wyy.net.nio.client;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
/**
* @author wyaoyao
* @date 2021/3/17 17:59
*/
@Slf4j
public class BlockEchoClient {
private final SocketChannel socketChannel;
private final String serverHost;
private final int serverPort;
public BlockEchoClient(String serverHost, int serverPort) throws IOException {
this.serverHost = serverHost;
this.serverPort = serverPort;
this.socketChannel = SocketChannel.open();
// 连接服务器
SocketAddress remote = new InetSocketAddress(serverHost, serverPort);
socketChannel.connect(remote);
log.info("connect echo server success");
}
public void send(String message) {
try {
BufferedReader reader = getReader(socketChannel.socket());
PrintWriter writer = getWriter(socketChannel.socket());
// 发送数据
writer.println(message);
log.info("send request success; content is {}", message);
// 读取服务端的响应
String s1 = reader.readLine();
log.info("get response success; response is {}", s1);
} catch (Exception e) {
e.printStackTrace();
}
}
public void close() throws IOException {
if(socketChannel != null){
socketChannel.close();
}
}
public BufferedReader getReader(Socket socket) throws IOException {
InputStream inputStream = socket.getInputStream();
return new BufferedReader(new InputStreamReader(inputStream));
}
public PrintWriter getWriter(Socket socket) throws IOException {
return new PrintWriter(socket.getOutputStream(), true);
}
}
- 启动服务端
public class BlockEchoServerTest {
public static void main(String[] args) throws IOException {
BlockEchoServer server = new BlockEchoServer(10010);
// 等待客户端连接
server.service();
}
}
- 启动客户端,模拟3个客户端同时访问
public static void main(String[] args) throws IOException {
Arrays.asList(1,2,3).stream().forEach(i->{
new Thread(()->{
BlockEchoClient client = null;
try {
client = new BlockEchoClient("localhost", 10010);
client.send("hello! from " + Thread.currentThread().getName());
client.send("你好! from " + Thread.currentThread().getName());
client.send("bye! from " + Thread.currentThread().getName());
} catch (IOException e) {
e.printStackTrace();
} finally {
if(client != null){
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
},"client" +i).start();
});
}
测试客户端输出结果:
07:56:29.945 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
07:56:29.945 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
07:56:29.945 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
07:56:29.949 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client1
07:56:29.949 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client2
07:56:29.949 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client3
07:56:29.981 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client1
07:56:29.981 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client1
07:56:30.993 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client1
07:56:30.993 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client1
07:56:31.981 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client3
07:56:31.981 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client3
07:56:32.002 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client3
07:56:32.003 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client3
07:56:32.016 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client1
07:56:32.017 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client3
07:56:32.982 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client2
07:56:32.983 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client2
07:56:33.004 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client2
07:56:33.005 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client2
07:56:37.027 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client2
由于采用多线程的方式,使得可以同时处理多个客户的连接。
如果服务端修改为不采用多线程:
public void service() {
while (true) {
SocketChannel socketChannel;
try {
// 等待接收客户端连接,一旦有客户端连接,就会返会与当前客户端连接的SocketChannel的对象
socketChannel = serverSocketChannel.accept();
// 开启一个线程去处理当前客户端连接
// executorService.submit(new RequestHandler(socketChannel));
// 这里不开启线程
new RequestHandler(socketChannel).run();
} catch (IOException e) {
e.printStackTrace();
}
}
}
在测试:
08:12:59.029 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
08:12:59.029 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
08:12:59.029 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - connect echo server success
08:12:59.036 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client3
08:12:59.036 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client2
08:12:59.036 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is hello! from client1
08:12:59.049 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client2
08:12:59.049 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client2
08:12:59.051 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client2
08:12:59.051 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client2
08:12:59.053 [client2] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client2
08:12:59.065 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client1
08:12:59.065 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client1
08:12:59.096 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client1
08:12:59.096 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client1
08:12:59.107 [client1] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client1
08:12:59.119 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: hello! from client3
08:12:59.119 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is 你好! from client3
08:12:59.140 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: 你好! from client3
08:12:59.140 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - send request success; content is bye! from client3
08:12:59.151 [client3] INFO study.wyy.net.nio.client.BlockEchoClient - get response success; response is echo: bye! from client3
三个客户端同时发送了是第一个请求,但是服务端只能依次一个客户端处理,比如这里先处理client2,直到把client2的三个请求(hello,你好,bye)才会去处理下一个客户端。
2 非阻塞式服务端
在非阻塞模式下,EchoServer只需要启动一个主线程,就能同时处理三件事:
- 接收客户的连接。
- 接收客户发送的数据。
- 向客户发回响应数据。
EchoServer委托Selector来负责监控接收连接就绪事件、读就绪事件和写就绪事件,如果有特定事件发生,就处理该事件。
EchoServer类的构造方法负责启动服务器,把它绑定到一个本地端口,代码如下:
- 构造方法
@Slf4j
public class NoBlockEchoServer {
/**
* 委托给Selector来负责接收连接就绪事件,读就绪事件,写就绪事件
*/
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
private final ServerSocket serverSocket;
private final int port;
public NoBlockEchoServer(int port) throws IOException {
// 创建一个Selector对象
this.selector = Selector.open();
// 创建一个ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
// 返回与ServerSocketChannel关联的ServerSocket对象,每个ServerSocketChannel对象都与一个ServerSocket对象关联
serverSocket = serverSocketChannel.socket();
// 使得在同一个主机上关闭了服务器,紧接着再启动服务器程序时,可以顺利绑定相同的端口
serverSocket.setReuseAddress(true);
// 设置serverSocketChannel为非阻塞工作模式
serverSocketChannel.configureBlocking(false);
// 绑定本地端口
this.port = port;
serverSocketChannel.bind(new InetSocketAddress(port));
log.info("the server has bind address is {}:{}", this.serverSocket.getInetAddress().getHostAddress(), this.port);
}
}
- service()方法负责处理本节开头所说的三件事,体现其主要流程的代码如下:
public void service() throws IOException {
// 注册一个连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// select()返回已经发生的SelectionKey对象的数量,该方法是阻塞的,如果一个也没有就进入阻塞,
while (selector.select() > 0) {
// 进入循环就说明事件发生
// 获取相关事件已经被Selector捕获的SelectionKey的集合
Set<SelectionKey> readyKes = selector.selectedKeys();
// 遍历处理这些已经捕获到的事件
Iterator<SelectionKey> iterator = readyKes.iterator();
while (iterator.hasNext()) {
SelectionKey key = null;
try {
// 取出一个SelectionKey,进行处理
key = iterator.next();
// 既然取出来,就可以从集合中删除了
iterator.remove();
// 判断事件类型
if (key.isAcceptable()) {
// 处理连接就绪事件
}
if (key.isReadable()) {
// 处理读就绪事件
}
if (key.isWritable()) {
// 处理写就绪事件
}
} catch (Exception e) {
e.printStackTrace();
if (null != null) {
// 失效掉这个key, selector不再感兴趣这个SelectionKey感兴趣的事件
key.cancel();
// 关闭与这个key关联的socketChannel
key.channel().close();
}
}
}
}
}
service方法中,首先由serverSocketChannel向Selector 注册连接就绪事件。如果Selector监控到该事件发生,就会把相应的SelectionKey对象加入到selected-keys集合中(相关事件已经被Selector捕获的SelectionKey的集合)。接下来第一层while循环,会不断的询问Selector已经发生的事件,然后依次处理这些事件。
其中获取已经发生的事件的SelectionKey个数,如果当前没有任何事件发生,这个方法就会阻塞下去,直到至少一件事情发生。selector.selectedKeys()这个方法返回已经被Selector捕获的SelectionKey的集合(selected-keys集合),selected-keys集合存放了相关事件已经发生的SelectionKey对象。
接下来就是遍历selected-keys集合,处理这些已经捕获到的事件。如果出现异常,就会失效这个SelectionKey,并且关闭与之关联的channel
2.1 处理连接事件
刚刚service方法已经通过if留出了每个事件类型处理的地方,现在就先处理连接事件,代码如下:
private void handleAcceptable(SelectionKey selectionKey) throws IOException {
// 获取与SelectionKey关联的serverSocketChannel,就是通过serverSocketChannel来传输数据的
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
// 获取与客户端连接的SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
log.info("accept client connection from {}:{} accept", socketChannel.socket().getInetAddress(), socketChannel.socket().getPort());
// 设置socketChannel为非阻塞
socketChannel.configureBlocking(false);
// 创建一个缓冲区,用于存放客户端发来的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// SocketChannel向selector注册读就绪事件和写就绪事件
// 并把byteBuffer作为附件注册进去,在读写事件发生的时候获取byteBuffer,进行数据读写
socketChannel.register(selector,SelectionKey.OP_READ | SelectionKey.OP_WRITE,byteBuffer);
}
如果isAcceptable方法返回true,就表示这个SelectionKey所有感兴趣的接收连接就绪事件已经发生了
首先通过SelectionKey的channel()方法获的与之关联的ServerSocketChannel,然后调用ServerSocketChannel的accpet方法获取与客户端连接的SocketChannel对象。这个SocketChannel对象默认是阻塞模式的,所以首先调用configureBlocking(fasle)方法将其设置为非阻塞模式。
SocketChannel调用register方法向selector注册读就绪事件和写就绪事件,并把byteBuffer作为附件与新建的这个SelectionKey关联。
2.2 处理读就绪事件
如果isReadable方法返回true,就表示这个SelectionKey所有感兴趣的读就绪事件已经发生了
private void handleReadable(SelectionKey selectionKey) throws IOException {
// 获取关联的的附件
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
// 获取与当前SelectionKey关联的SocketChannel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 创建ByteBuffer字节缓冲区,用于存放读取到的数据
ByteBuffer readBuffer = ByteBuffer.allocate(32);
socketChannel.read(readBuffer);
// flip():把极限设为位置,再把位置设为0
readBuffer.flip();
// 把buffer的极限设置为容量
buffer.limit(buffer.capacity());
// 把readBuffer中的数据拷贝到buffer中
// 假定buffer的容量足够大,不会出现缓冲区溢出的情况
buffer.put(readBuffer);
}