使用Java NIO完成服务端代码的编写,代码写的不完善,本文主要想体现多路复用的几种编程模型和思想。
一、单线程版本
使用单线程+NIO完成服务端代码的编写,并且使用一个Selector注册器。在一个线程中处理ServerSocketChannel的accept、SocketChannel的read、write。
- Server
创建ServerSocketChannel,并将其注册到Selector中。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
*
* 使用NIO多路复用处理与客户端的通信
*
*/
public class Server {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Server(int port) throws Exception{
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void start() {
while (!Thread.interrupted()) {
try {
if (selector.select()>0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatch(selectionKey);
iterator.remove();
}
}
} catch (IOException e) {
try {
selector.close();
} catch (Exception ee) {
}
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey) {
if (selectionKey.isAcceptable()) {
new Acceptor(serverSocketChannel, selector).accept();
} else if (selectionKey.isReadable()) {
new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read();
} else if (selectionKey.isWritable()) {
new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write();
}
}
}
- Acceptor
接收客户端的连接。
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
*
* 负责接收客户端的连接
*
*/
public class Acceptor {
private final ServerSocketChannel serverSocketChannel;
private Selector selector;
public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}
public void accept() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- ReadHandler
接收客户端发送的内容。
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
*
* 负责读取客户端数据,即read
*
*/
public class ReadHandler {
private final SocketChannel socketChannel;
private final SelectionKey selectionKey;
public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
this.socketChannel = socketChannel;
this.selectionKey = selectionKey;
}
public void read() {
try {
// 读取客户端数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array()));
// 注册写事件,给客户端回消息
selectionKey.interestOps(SelectionKey.OP_WRITE);
} catch (Exception e) {
// 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听
// 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误
selectionKey.cancel();
e.printStackTrace();
}
}
}
- WriteHandler
向客户端发送内容。
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
*
* 负责向客户端响应数据,即wirte
*
*/
public class WriteHandler {
private final SocketChannel socketChannel;
private final SelectionKey selectionKey;
public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
this.socketChannel = socketChannel;
this.selectionKey = selectionKey;
}
public void write() {
try {
// 发送数据给客户端
String msg = "你好,欢迎"+socketChannel.getRemoteAddress();
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);
// 注册读事件,继续等待客户端的信息
selectionKey.interestOps(SelectionKey.OP_READ);
} catch (Exception e) {
selectionKey.cancel();
e.printStackTrace();
}
}
}
运行服务端:
/**
*
* 启动server服务
*/
public class Main {
public static void main(String[] args) throws Exception {
Server server = new Server(8800);
server.start();
}
}
二、多线程线程池版本
使用线程池在不同的线程中处理ServerSocketChannel的accept、SocketChannel的read、write。仍然使用一个Selector。
- Server
43行中让当前线程暂停500,目的是让dispatch方法逻辑执行完成之后再执行iterator.remove()将当前的SelectionKey从集合中移除。因为dispatch中使用了线程池异步处理,可能会存在代码先执行了iterator.remove(),后执行dispatch逻辑,这样会导致错误。(这种sleep方式处理是存在问题的)
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* 使用NIO多路复用处理与客户端的通信
*
*/
public class Server {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
private final ExecutorService executorService = Executors.newFixedThreadPool(1024);
public Server(int port) throws Exception{
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void start() {
while (!Thread.interrupted()) {
try {
if (selector.select()>0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatch(selectionKey);
try {
Thread.sleep(500);
} catch (Exception e) {}
iterator.remove();
}
}
} catch (IOException e) {
try {
selector.close();
} catch (Exception ee) {
}
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey) {
if (selectionKey.isAcceptable()) {
executorService.execute(new Acceptor(serverSocketChannel, selector));
} else if (selectionKey.isReadable()) {
executorService.execute(new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey));
} else if (selectionKey.isWritable()) {
executorService.execute(new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey));
}
}
}
- Acceptor
实现Runnable接口,重写run方法。
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
*
* 负责接收客户端的连接
*
*/
public class Acceptor implements Runnable{
private final ServerSocketChannel serverSocketChannel;
private Selector selector;
public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}
public void accept() {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
accept();
}
}
- ReadHandler
实现Runnable接口,重写run方法。
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
*
* 负责读取客户端数据,即read
*
*/
public class ReadHandler implements Runnable{
private final SocketChannel socketChannel;
private final SelectionKey selectionKey;
public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
this.socketChannel = socketChannel;
this.selectionKey = selectionKey;
}
public void read() {
try {
// 读取客户端数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array()));
// 注册写事件,给客户端回消息
selectionKey.interestOps(SelectionKey.OP_WRITE);
} catch (Exception e) {
// 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听
// 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误
selectionKey.cancel();
e.printStackTrace();
}
}
@Override
public void run() {
read();
}
}
- WriteHandler
实现Runnable接口,重写run方法。
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
/**
*
* 负责向客户端响应数据,即wirte
*
*/
public class WriteHandler implements Runnable{
private final SocketChannel socketChannel;
private final SelectionKey selectionKey;
public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) {
this.socketChannel = socketChannel;
this.selectionKey = selectionKey;
}
public void write() {
try {
// 发送数据给客户端
String msg = "你好,欢迎"+socketChannel.getRemoteAddress();
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);
// 注册读事件,继续等待客户端的信息
selectionKey.interestOps(SelectionKey.OP_READ);
} catch (Exception e) {
selectionKey.cancel();
e.printStackTrace();
}
}
@Override
public void run() {
write();
}
}
三、多Selector(主从)
一个Selector负责接收客户端的连接(ServerSocketChannel#accept),多个Selector负责客户端的读写数据(SocketChannel#read、SokcetChannel#write)。
- Server
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* 使用NIO多路复用处理与客户端的通信
*
*/
public class Server {
/**
* 主selector,负责监听accept事件,处理客户端的连接
*/
private final Selector masterSelector;
/**
* 存放从selector集合
*/
private Selector[] selectors;
private final ServerSocketChannel serverSocketChannel;
private final ExecutorService executorService = Executors.newFixedThreadPool(1024);
public Server(int port) throws Exception{
masterSelector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(masterSelector, SelectionKey.OP_ACCEPT);
// 创建两个从selector
Selector subSelector1 = Selector.open();
Selector subSelector2 = Selector.open();
selectors = new Selector[]{subSelector1, subSelector2};
}
/**
* 每个Selector放进单独的线程的进行循环,
* 避免select阻塞互相影响。
*/
public void start() {
executorService.execute(()->loop(masterSelector));
for (Selector selector : selectors) {
executorService.execute(()->loop(selector));
}
}
/**
* 因为Selector的select是阻塞方法,多个Selector在单线程中循环,
* 会造成互相等待的影响,所以每个Selector都另起一个线程。
* @param selector
*/
private void loop(Selector selector) {
while (!Thread.interrupted()) {
try {
// 这里最多阻塞1秒则直接返回
int select = selector.select(1000);
if (select>0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatch(selectionKey);
iterator.remove();
}
}
} catch (IOException e) {
try {
selector.close();
} catch (Exception ee) {
}
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey selectionKey) {
if (selectionKey.isAcceptable()) {
// 从子selector中随机取出一个作为参数
Random random = new Random();
int i = random.nextInt(selectors.length);
Selector subSelector = selectors[i];
new Acceptor(serverSocketChannel, subSelector).accept();
} else if (selectionKey.isReadable()) {
new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read();
} else if (selectionKey.isWritable()) {
new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write();
}
}
}
一主二从,一个主Selector负责accept客户端连接。两个从Selector负责与客户端read、write。
其他类的代码同单线程版本。