这是单线程的目录结构:
服务端Server类代码如下:
/**
* 单线程的反应器模式的弊端主要是在业务逻辑处理上
* 如果业务逻辑处理事件过长会造成长时间无法去执行select()
* 方法获取已就绪的事件集,间接的意味着客户端被阻塞
*
* @date 2020-01-26
* @since
*/
public class Server implements Runnable {
private final Selector selector;
private Server() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8888), 1024);
selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor(selector, selectionKey));
System.out.println("========= 服务端启动成功 =========");
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
int count = selector.select();
System.out.println(Thread.currentThread().getName() + "监听就绪事件个数:" + count);
if (count == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 遍历就绪事件,然后进行分发
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
Runnable attachment = (Runnable) key.attachment();
try {
attachment.run();
iterator.remove();
} catch (Exception e) {
System.out.println("客户端意外关闭");
iterator.remove();
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
Server server = new Server();
server.run();
}
}
连接处理器Acceptor类代码如下:
/**
* 接收连接就绪的事件
*
* @date 2020-01-26
* @since
*/
public class Acceptor implements Runnable {
private final Selector selector;
private final SelectionKey selectionKey;
public Acceptor(Selector selector, SelectionKey selectionKey) {
this.selector = selector;
this.selectionKey = selectionKey;
}
@Override
public void run() {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 注册读事件
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// 分发读事件
key.attach(new Dispatcher(key));
} catch (IOException e) {
e.printStackTrace();
}
}
}
事件分发器Dispatcher类代码如下:
/**
* 事件分发器,主要用来处理读写事件和业务逻辑,业务逻辑可进行抽离,看具体场景
*
* @date 2020-01-26
* @since
*/
public class Dispatcher implements Runnable {
private final SelectionKey selectionKey;
private int state;
public Dispatcher(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
this.state = 0;
}
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
try {
if (state == 0) {
read(socketChannel);
} else {
send(socketChannel);
}
} catch (Exception e) {
try {
System.out.println("客户端" + socketChannel.getRemoteAddress() + "关闭");
socketChannel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
private void read(SocketChannel socketChannel) throws Exception {
// 读取客户端的消息
ByteBuffer buffer = ByteBuffer.allocate(50);
socketChannel.read(buffer);
buffer.flip();
String msg = new String(buffer.array(), CharsetUtil.UTF_8);
System.out.println("接收到客户端的消息:" + msg);
// 业务逻辑处理
process();
// 设置状态为1
state = 1;
// 注册写事件,写事件一般不注册,因为可能出现服务器一支对写事件就绪
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
private void send(SocketChannel socketChannel) throws IOException {
ByteBuffer wrap = ByteBuffer.wrap("server receive client msg, thank you".getBytes());
socketChannel.write(wrap);
state = 0;
// 注册写事件
selectionKey.interestOps(SelectionKey.OP_READ);
}
/**
* 业务逻辑处理
*
* @date 2020-01-26
* @updateDate 2020-01-26
* @param
* @return
*/
private void process() throws InterruptedException {
// handle business logic,单线程的反应器模式的弊端主要是如果业务逻辑处理事件过长会造成长时间不无法处理select()方法
// 进而导致客户端阻塞,所以一般将业务逻辑交由线程池来进行异步处理,防止反应器被阻塞
TimeUnit.SECONDS.sleep(5);
}
}
客户端Client类代码实现:
/**
* react客户端
*
* @date 2020-01-26
* @since
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
while (!socketChannel.finishConnect()) {
// do nothing, 也可以将连接事件注册到socketChannel上,让复用器来监听连接是否就绪
}
socketChannel.register(selector, SelectionKey.OP_READ);
ByteBuffer wrap = ByteBuffer.wrap("hello server, i am client".getBytes());
socketChannel.write(wrap);
try {
while (!Thread.interrupted()) {
int count = selector.select();
System.out.println(Thread.currentThread().getName() + "客户端就绪事件个数为:" + count);
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(50);
channel.read(byteBuffer);
byteBuffer.flip();
System.out.println("client received server msg: " + new String(byteBuffer.array(), CharsetUtil.UTF_8));
iterator.remove();
System.out.print("please input your message:");
Scanner scanner = new Scanner(System.in);
String s = scanner.nextLine();
if (!StringUtils.isEmpty(s)) {
ByteBuffer buffer = ByteBuffer.wrap(s.getBytes());
socketChannel.write(buffer);
}
}
}
} catch (Exception e) {
socketChannel.close();
System.out.println("服务器意外关闭,客户端关闭连接");
}
}
}
直接运行可以发现,我设置代码逻辑process()方法睡眠5秒,然后启动多个客户端会发现,其他客户端陷入阻塞。优化方式可将业务逻辑处理用线程池来实现异步处理。这里就不展示代码了,比较简单。
chenm1xuexi 发布了40 篇原创文章 · 获赞 17 · 访问量 2万+ 私信 关注