NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

1.前言

  以前使用 websocket来实现双向通信,如今深入了解了 NIO 同步非阻塞io模型 ,

优势是 处理效率很高,吞吐量巨大,能很快处理大文件,不仅可以 做 文件io操作,

还可以做socket通信 、收发UDP包、Pipe线程单向数据连接。

这一篇随笔专门讲解 NIO socket通信具体操作  

注意:这是重点!!!

    兴趣集合有4个事件,
分别是:
SelectionKey.OP_ACCEPT 【接收连接就绪,专用于服务端】
SelectionKey.OP_CONNECT 【连接就绪 , 专用于客户端】
SelectionKey.OP_READ 【读就绪 ,通知 对面端 读做读操作】
SelectionKey.OP_WRITE 【写就绪 , 通知 自己端 做写操作】 当信道向选择器注册感兴趣事件SelectionKey.OP_WRITE 时
即源码
sc.register(mselector, SelectionKey.OP_WRITE); 让自己的选择器 触发自己的 key.isWritable() && key.isValid()
然后是让自己做一个写操作,
最后再注册 读就绪事件,用来通知 对方端【可能是客户端 或 服务端,因为是相互的】
做读事件,即让对面的选择器触发 key.isReadable()。

-----------------------------------------------------------
我觉得这就是脱裤子放屁操作。。。
其实是自己通知自己触发的,我不明白为什么要有一个分开的感兴趣事件,
因为响应客户端直接在读操作后直接做写操作,然后注册读就绪事件就行了,没必要分开放写啊
 

2.操作

(1)目录结构

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

只需要红框部分 其他可有可无,这是个maven工程,在测试类实现,之所以使用maven是因为导入依赖包很方便

(2)导入依赖包,需要使用json的生成和解析工具

<!-- 用于生成或解析json-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<!-- 用于在单元测试类可以使用javabean注解-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

(3)服务端【源码里写了很详细的注释,我懒得再写一篇】

服务端实现类源码

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】
package com.example.javabaisc.nio.mysocket;

import com.example.javabaisc.nio.mysocket.service.EatService;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; @RunWith(SpringRunner.class)
@SpringBootTest
public class ServerSocket { //main方法启动
// public static void main(String[] args) throws IOException {
// //配置选择器
// selector();
// //监听
// listen();
// }
@Test
//单元测试方法启动
public void serverSocket() throws IOException {
//配置选择器
selector();
//监听
listen();
} //服务层接口
@Autowired
private EatService eatService; //选择器作为全局属性
private Selector selector = null; //存储信道对象 ,静态公用的全局变量
//key是ip和端口,如 /127.0.0.1:64578
//value 是 储信道对象
private static final ConcurrentMap<String, SocketChannel> socketChannelMap = new ConcurrentHashMap<>();
//存储ip和端口
// key 是用户名
// value 是ip和端口,如 /127.0.0.1:64578
private static final ConcurrentMap<String, String> ipMap = new ConcurrentHashMap<>();
//存储用户名
// key 是ip和端口
// value 是用户名
private static final ConcurrentMap<String, String> usernameMap = new ConcurrentHashMap<>(); /**
* 配置选择器
* 如果使用 main 启动 ,那么 selector() 需要设为静态,因为main 函数是static的,都在报错
*/
private void selector() throws IOException {
//服务信道
ServerSocketChannel channel = null;
//开启选择器
selector = Selector.open();
//开启服务信道
channel = ServerSocketChannel.open();
//把该channel设置成非阻塞的,【需要手动设置为false】
channel.configureBlocking(false);
//开启socket 服务,由信道开启,绑定端口 8080
channel.socket().bind(new InetSocketAddress(8080));
//管道向选择器注册信息----接收连接就绪
channel.register(selector, SelectionKey.OP_ACCEPT); } /**
* 写了监听事件的处理逻辑
*/
private void listen() throws IOException {
//进入无限循环遍历
while (true) {
//这个方法是阻塞的,是用来收集有io操作通道的注册事件【也就是选择键】,需要收到一个以上才会往下面执行,否则一直等待到超时,超时时间是可以设置的,
//直接输入参数数字即可,单位毫秒 ,如果超时后仍然没有收到注册信息,那么将会返回0 ,然后往下面执行一次后又循环回来
//不写事件将一直阻塞下去
// selector.select();
//这里设置超时时间为3000毫秒
if (selector.select(3000) == 0) {
//如果超时后返回结果0,则跳过这次循环
continue;
}
//使用迭代器遍历选择器里的所有选择键
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
//当迭代器指针指向下一个有元素是才执行内部代码块
while (ite.hasNext()) {
//获取选择键
SelectionKey key = ite.next();
//选择键操作完成后,必须删除该元素【选择键】,否则仍然存在选择器里面,将会在下一轮遍历再执行一次,形成了脏数据,因此必须删除
ite.remove();
//当选择键是可接受的
if (key.isAcceptable()) {
acceptableHandler(key);
}
//当选择键是可读的
else if (key.isReadable()) {
readHandler(key);
}
//当选择键是可写的且是有效的【其实是自己通知自己触发的,我不明白为什么要有一个分开的感兴趣事件,
// 因为响应客户端直接在读操作后直接做写操作,然后注册读就绪事件就行了,没必要分开放在这里写啊】
//为了演示我还是写了
else if (key.isWritable() && key.isValid()) {
writeHandler(key);
}
//当选择键是可连接的【其实这个是在客户端才会被触发,为了演示这里也可以写,我才写的】
else if (key.isConnectable()) {
System.out.println("选择键是可连接的,key.isConnectable() 是 true");
} }
}
} //当选择键是可接受的处理逻辑
//static静态,可用可不用
private void acceptableHandler(SelectionKey key) throws IOException {
System.out.println("当选择键是可接受的处理逻辑");
//从选择键获取服务信道,需要强转
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//服务信道监听新进来的连接,返回一个信道
SocketChannel sc = serverSocketChannel.accept(); //信道不为空才执行
if (sc != null) {
//到了这里说明连接成功
//
//获取本地ip地址与端口号
// SocketAddress socketAddress = sc.getLocalAddress();
// System.out.println(socketAddress.toString());
// /127.0.0.1:8080
//获取远程ip地址与端口号
SocketAddress ra = sc.getRemoteAddress();
System.out.println(ra.toString());
// /127.0.0.1:64513
//存储信道对象
socketChannelMap.put(ra.toString(), sc);
System.out.println("当前在线人数:" + socketChannelMap.size());
//将该信道设置为非阻塞
sc.configureBlocking(false);
//获取选择器
Selector mselector = key.selector();
//信道注册到选择器 ---- 读操作就绪
sc.register(mselector, SelectionKey.OP_READ);
//在这里设置字节缓冲区的 关联关系,但是我设置会在读操作报空指针异常,原因未知
// sc.register(mselector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
} //当选择键是可读的处理逻辑
private void readHandler(SelectionKey key) throws IOException {
SocketChannel sc = null;
/*
每当客户端强制关闭了连接,就会发送一条数据过来这里说
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
因此需要这里销毁连接,即关闭该socket通道即可
*/
try {
System.out.println("当选择键是可读的处理逻辑");
//获取信道,需要强转
sc = (SocketChannel) key.channel();
//key 获取 通道 关联的 缓冲区【这里使用是报错,read读操作报空指针异常,奇了怪了】
// ByteBuffer buffer = (ByteBuffer) key.attachment();
//只能自定义了
ByteBuffer buffer = ByteBuffer.allocate(1024);
//获取选择器
Selector mselector = key.selector();
//信道做读操作 ,返回读取数据的字节长度
long mreadSize;
//存储从心得解析出来的字符串
String jsonstr = "";
//当字节长度大于零,说明还有信息没有读完
while ((mreadSize = sc.read(buffer)) > 0) {
System.out.println("=======");
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//获取该索引间的数据 ,buffer.get()返回的是节数
byte[] b = buffer.array();
//指定编码将字节流转字符串
jsonstr = new String(b, StandardCharsets.UTF_8);
//打印
System.out.println(jsonstr);
}
//当字节长度为-1时,也就是没有数据可读取了,那么就关闭信道
if (mreadSize == -1) {
sc.close();
}
//检查字符串是否为空
if (!jsonstr.isEmpty()) {
//数据发送过来不为空
//进入业务层
eatService.food(mselector, sc, buffer, jsonstr, ipMap, usernameMap, socketChannelMap);
} } catch (Exception e) {
// e.printStackTrace();
System.out.println("//远程客户端强迫关闭了连接。关闭客户端已经关闭,服务端继续运行"); //发生异常才关闭
if (sc != null) {
//获取ip地址与端口号
// SocketAddress socketAddress = sc.getLocalAddress();
// System.out.println(socketAddress.toString());
// /127.0.0.1:8080
//
//获取远程ip地址与端口号
SocketAddress ra = sc.getRemoteAddress();
System.out.println(ra.toString());
//移除
socketChannelMap.remove(ra.toString());
System.out.println(socketChannelMap);
//
String username = usernameMap.get(ra.toString());
System.out.println("用户名叫:" + username + " 的客户端下线");
usernameMap.remove(ra.toString());
ipMap.remove(username);
//
System.out.println("当前在线人数:" + socketChannelMap.size());
//
System.out.println("打印当前用户信息");
System.out.println(ipMap);
System.out.println(usernameMap);
//
sc.close();
}
//取消该选择键
key.channel();
}
} //当选择键是可写的且是有效的处理逻辑
private void writeHandler(SelectionKey key) throws IOException {
System.out.println("当选择键是可写的且是有效的处理逻辑 ,我被自己通知来写东西啦,虽然不知道为什么要分开读写");
//获取信道,需要强转
SocketChannel sc = (SocketChannel) key.channel();
//设置字节缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//
String str = "我要写东西,你看到了吗" + System.currentTimeMillis();
//清除索引信息【即position = 0 ;capacity = limit】
buffer.clear();
//将字符转成字节流放入缓冲中
buffer.put(str.getBytes(StandardCharsets.UTF_8));
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//如果 position < limit ,即仍有缓冲区的数据未写到信道中
while (buffer.hasRemaining()) {
//信道做写操作
sc.write(buffer);
}
//整理索引【即position定位到缓冲区未读的数据末尾 ,capacity = limit】
buffer.compact();
//获取选择器
Selector mselector = key.selector();
//注册读就绪事件
sc.register(mselector, SelectionKey.OP_READ); } }

服务层接口

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】
package com.example.javabaisc.nio.mysocket.service;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ConcurrentMap; public interface EatService {
public void food(Selector mselector, SocketChannel sc, ByteBuffer buffer, String jsonstr,
ConcurrentMap<String, String> ipMap,
ConcurrentMap<String, String> usernameMap,ConcurrentMap<String, SocketChannel> socketChannelMap) throws IOException;
}

服务层接口实现类

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】
package com.example.javabaisc.nio.mysocket.service;

import com.alibaba.fastjson.JSON;
import org.springframework.stereotype.Service; import java.io.IOException;
import java.net.SocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; /**
* 业务层
*/
@Service
public class EatServiceImpl implements EatService {
@Override
public void food(Selector mselector, SocketChannel sc, ByteBuffer buffer, String jsonstr,
ConcurrentMap<String, String> ipMap,
ConcurrentMap<String, String> usernameMap, ConcurrentMap<String, SocketChannel> socketChannelMap) throws IOException { //解析json串成map
Map<String, Object> map = JSON.parseObject(jsonstr);
System.out.println(map);
int type = (Integer) map.get("type");
if (type == 1) {
//返回结果
String res = "apple,好好吃,我好饿";
Map<String, Object> map2 = new HashMap<>();
map2.put("r-type", 1);
map2.put("data", res);
String jsonStr = JSON.toJSONString(map2);
//
System.out.println(jsonStr);
//
//清除索引信息【即position = 0 ;capacity = limit】
buffer.clear();
//指定编码将符串转字字节流
buffer.put(jsonStr.getBytes(StandardCharsets.UTF_8));
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//如果 position < limit ,即仍有缓冲区的数据未写到信道中
while (buffer.hasRemaining()) {
//写操作
sc.write(buffer);
}
//整理索引【即position定位到缓冲区未读的数据末尾 ,capacity = limit】
//这里用不到
//buffer.compact();
//注册读就绪事件,【让客户端读】
sc.register(mselector, SelectionKey.OP_READ);
} else if (type == 3) {
try {
//客户端回应
System.out.println(" //客户端回应 业务类型3,//到了这里不再传输数据,懒得写,以免无限循环");
//获取远程ip地址与端口号
SocketAddress ra = sc.getRemoteAddress();
//获取该客户端的用户名
String username = usernameMap.get(ra.toString());
//懒得写新接口,直接判断如果该客户端如果是cen,则向yue发送信息
if (username.equals("cen")) {
//判断guo是否在线
////获取guo的ip
String ip = ipMap.get("guo");
System.out.println("guo 的ip:" + ip);
if (ip == null || ip.isEmpty()) {
System.out.println("guo 不存在,未上线");
return;
}
System.out.println("向 guo 发送信息");
//存在
//
SocketChannel mchannel = socketChannelMap.get(ip);
String res = "我是cen,我向guo发送消息,看到了吗" + new Date();
//
System.out.println(res);
//
Map<String, Object> map3 = new HashMap<>();
map3.put("r-type", 6);
map3.put("data", res);
String jsonStr = JSON.toJSONString(map3);
//清除索引信息【即position = 0 ;capacity = limit】
ByteBuffer buffer2 = ByteBuffer.allocate(1024);
buffer2.clear();
//指定编码将符串转字字节流
buffer2.put(jsonStr.getBytes(StandardCharsets.UTF_8));
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer2.flip();
System.out.println(new String(buffer2.array()));
//如果 position < limit ,即仍有缓冲区的数据未写到信道中
while (buffer2.hasRemaining()) {
//写操作
mchannel.write(buffer2);
}
//整理索引【即position定位到缓冲区未读的数据末尾 ,capacity = limit】
//这里用不到
// buffer.compact();
//注册读就绪事件,【让客户端读】
sc.register(mselector, SelectionKey.OP_READ);
System.out.println("发送成功"); }
} catch (Exception e) {
e.printStackTrace();
} } else if (type == 0) {
System.out.println("type是0");
String username = (String) map.get("username");
System.out.println("用户名叫:" + username + " 的客户端上线");
//注册用户信息[根据用户名获取ip]
ipMap.put(username, sc.getRemoteAddress().toString());
//注册用户信息[根据ip获取用户名]
usernameMap.put(sc.getRemoteAddress().toString(), username);
System.out.println("打印当前用户信息");
System.out.println(ipMap);
System.out.println(usernameMap);
//向选择器注册写就绪事件,是通知自己写东西【让自己写】,一般不会注册OP_WRITE,为了展示用法我才这样写
sc.register(mselector, SelectionKey.OP_WRITE);
}
}
}

分别做了两个客户端,与服务端的代码很相似,但是再小的区别也不能粗心大意,不然直接报错

源码一样,区别是用户名不同【用于测试两个客户端之间发送消息】

用户名为 cen 的客户端

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】
package com.example.javabaisc.nio.mysocket;

import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; public class ClientSocket { //用户名
String username = "cen"; @Test
//单元测试方法启动
public void clientSocket() throws IOException {
//配置选择器
selector();
//监听
listen();
} //选择器作为全局属性
private Selector selector = null; /**
* 配置选择器
* 如果使用 main 启动 ,那么 selector() 需要设为静态,因为main 函数是static的,都在报错
*/
private void selector() throws IOException {
//信道
SocketChannel channel = null;
//开启选择器
selector = Selector.open();
//开启信道
channel = SocketChannel.open();
//把该channel设置成非阻塞的,【需要手动设置为false】
channel.configureBlocking(false);
//管道连接互联网socket地址,输入ip和端口号
channel.connect(new InetSocketAddress("localhost", 8080));
//管道向选择器注册信息----连接就绪
channel.register(selector, SelectionKey.OP_CONNECT);
} /**
* 写了监听事件的处理逻辑
*/
private void listen() throws IOException {
out:
//进入无限循环遍历
while (true) {
//这个方法是阻塞的,是用来收集有io操作通道的注册事件【也就是选择键】,需要收到一个以上才会往下面执行,否则一直等待到超时,超时时间是可以设置的,
//直接输入参数数字即可,单位毫秒 ,如果超时后仍然没有收到注册信息,那么将会返回0 ,然后往下面执行一次后又循环回来
//不写事件将一直阻塞下去
// selector.select();
//这里设置超时时间为3000毫秒
if (selector.select(3000) == 0) {
//如果超时后返回结果0,则跳过这次循环
continue;
}
//使用迭代器遍历选择器里的所有选择键
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
//当迭代器指针指向下一个有元素是才执行内部代码块
while (ite.hasNext()) {
//获取选择键
SelectionKey key = ite.next();
//选择键操作完成后,必须删除该元素【选择键】,否则仍然存在选择器里面,将会在下一轮遍历再执行一次,形成了脏数据,因此必须删除
ite.remove();
//当选择键是可连接的
if (key.isConnectable()) {
if ( connectableHandler(key)) {
System.out.println("//远程主机未上线,退出循环,关闭客户端");
//退出循环,关闭客户端
break out;
}
}
//当选择键是可读的
else if (key.isReadable()) {
if (readHandler(key)) {
System.out.println("//远程主机强迫关闭了连接。退出循环,关闭客户端");
//退出循环,关闭客户端
break out;
}
}
//当选择键是可写的且是有效的【其实是自己通知自己触发的,我不明白为什么要有一个分开的感兴趣事件,
// 因为响应客户端直接在读操作后直接做写操作,然后注册读就绪事件就行了,没必要分开放在这里写啊】
//为了演示我还是写了
else if (key.isWritable() && key.isValid()) {
writeHandler(key);
}
}
}
} //当选择键是可连接的处理逻辑
//static静态,可用可不用
private boolean connectableHandler(SelectionKey key) throws IOException {
System.out.println("当选择键是可连接的处理逻辑");
SocketChannel sc =null;
/*
每当连接远程主机发现未上线,则会在这里报异常
java.net.ConnectException: Connection refused: no further information
*/
try {
sc = (SocketChannel) key.channel();
//如果管道是连接悬挂
if (sc.isConnectionPending()) {
//管道结束连接
sc.finishConnect();
ByteBuffer buffer = ByteBuffer.allocate(1024);
//=============
Map<String, Object> map = new HashMap<>();
map.put("type", 0);
map.put("date", "socket首次握手成功,你好");
map.put("username", username);
String jsonstr = JSON.toJSONString(map);
//清除索引信息【即position = 0 ;capacity = limit】
buffer.clear();
//指定编码将符串转字字节流
buffer.put(jsonstr.getBytes(StandardCharsets.UTF_8));
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//如果 position < limit ,即仍有缓冲区的数据未写到信道中
while (buffer.hasRemaining()) {
//写操作
sc.write(buffer);
}
sc.register(selector, SelectionKey.OP_READ);
}
return false;
}catch (Exception e){
// e.printStackTrace();
//取消该选择键
key.channel();
//发生异常才关闭
if (sc != null) {
sc.close();
}
return true;
} } //当选择键是可读的处理逻辑
private boolean readHandler(SelectionKey key) throws IOException {
SocketChannel sc = null;
/*
每当服务端强制关闭了连接,就会发送一条数据过来这里说
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
因此需要这里销毁连接,即关闭该socket通道即可
*/
try {
System.out.println("当选择键是可读的处理逻辑");
//获取信道,需要强转
sc = (SocketChannel) key.channel();
//key 获取 通道 关联的 缓冲区【这里使用是报错,read读操作报空指针异常,奇了怪了】
// ByteBuffer buffer = (ByteBuffer) key.attachment();
//只能自定义了
ByteBuffer buffer = ByteBuffer.allocate(1024);
//获取选择器
Selector mselector = key.selector();
//信道做读操作 ,返回读取数据的字节长度
long mreadSize;
//存储从心得解析出来的字符串
String jsonstr = "";
//当字节长度大于零,说明还有信息没有读完
while ((mreadSize = sc.read(buffer)) > 0) {
System.out.println("=======");
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//获取该索引间的数据 ,buffer.get()返回的是节数
byte[] b = buffer.array();
//指定编码将字节流转字符串
jsonstr = new String(b, StandardCharsets.UTF_8);
//打印
System.out.println(jsonstr);
}
//当字节长度为-1时,也就是没有数据可读取了,那么就关闭信道
if (mreadSize == -1) {
sc.close();
}
//检查字符串是否为空
if (!jsonstr.isEmpty()) {
//数据发送过来不为空
//进入业务层 【与服务端的一样写法,我这里就演示服务层了】
// eatService.food(mselector, sc, buffer, jsonstr);
//为了演示响应,我直接用做写就绪事件响应
//注册写就绪事件 ,这句话等同于 直接调用 writeHandler(SelectionKey key)
sc.register(mselector, SelectionKey.OP_WRITE); } return false;
} catch (Exception e) {
// e.printStackTrace();
//取消该选择键
key.channel();
//发生异常才关闭
if (sc != null) {
sc.close();
}
//关闭客户端
return true;
}
} //当选择键是可写的且是有效的处理逻辑
private void writeHandler(SelectionKey key) throws IOException {
System.out.println("当选择键是可写的且是有效的处理逻辑 ,我被自己通知来写东西啦,虽然不知道为什么要分开读写");
//获取信道,需要强转
SocketChannel sc = (SocketChannel) key.channel();
//设置字节缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//=============
Map<String, Object> map = new HashMap<>();
map.put("type", 3);
map.put("date", "我要写东西,你看到了吗" + System.currentTimeMillis());
String jsonstr = JSON.toJSONString(map);
//清除索引信息【即position = 0 ;capacity = limit】
buffer.clear();
//将字符转成字节流放入缓冲中
buffer.put(jsonstr.getBytes(StandardCharsets.UTF_8));
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//如果 position < limit ,即仍有缓冲区的数据未写到信道中
while (buffer.hasRemaining()) {
//信道做写操作
sc.write(buffer);
}
//整理索引【即position定位到缓冲区未读的数据末尾 ,capacity = limit】
buffer.compact();
//获取选择器
Selector mselector = key.selector();
//注册读就绪事件
sc.register(mselector, SelectionKey.OP_READ); } }

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

用户名为 guo 的客户端

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】
package com.example.javabaisc.nio.mysocket;

import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; public class ClientSocket2 { //用户名
String username = "guo"; @Test
//单元测试方法启动
public void clientSocket() throws IOException {
//配置选择器
selector();
//监听
listen();
} //选择器作为全局属性
private Selector selector = null; /**
* 配置选择器
* 如果使用 main 启动 ,那么 selector() 需要设为静态,因为main 函数是static的,都在报错
*/
private void selector() throws IOException {
//信道
SocketChannel channel = null;
//开启选择器
selector = Selector.open();
//开启信道
channel = SocketChannel.open();
//把该channel设置成非阻塞的,【需要手动设置为false】
channel.configureBlocking(false);
//管道连接互联网socket地址,输入ip和端口号
channel.connect(new InetSocketAddress("localhost", 8080));
//管道向选择器注册信息----连接就绪
channel.register(selector, SelectionKey.OP_CONNECT);
} /**
* 写了监听事件的处理逻辑
*/
private void listen() throws IOException {
out:
//进入无限循环遍历
while (true) {
//这个方法是阻塞的,是用来收集有io操作通道的注册事件【也就是选择键】,需要收到一个以上才会往下面执行,否则一直等待到超时,超时时间是可以设置的,
//直接输入参数数字即可,单位毫秒 ,如果超时后仍然没有收到注册信息,那么将会返回0 ,然后往下面执行一次后又循环回来
//不写事件将一直阻塞下去
// selector.select();
//这里设置超时时间为3000毫秒
if (selector.select(3000) == 0) {
//如果超时后返回结果0,则跳过这次循环
continue;
}
//使用迭代器遍历选择器里的所有选择键
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
//当迭代器指针指向下一个有元素是才执行内部代码块
while (ite.hasNext()) {
//获取选择键
SelectionKey key = ite.next();
//选择键操作完成后,必须删除该元素【选择键】,否则仍然存在选择器里面,将会在下一轮遍历再执行一次,形成了脏数据,因此必须删除
ite.remove();
//当选择键是可连接的
if (key.isConnectable()) {
if ( connectableHandler(key)) {
System.out.println("//远程主机未上线,退出循环,关闭客户端");
//退出循环,关闭客户端
break out;
}
}
//当选择键是可读的
else if (key.isReadable()) {
if (readHandler(key)) {
System.out.println("//远程主机强迫关闭了连接。退出循环,关闭客户端");
//退出循环,关闭客户端
break out;
}
}
//当选择键是可写的且是有效的【其实是自己通知自己触发的,我不明白为什么要有一个分开的感兴趣事件,
// 因为响应客户端直接在读操作后直接做写操作,然后注册读就绪事件就行了,没必要分开放在这里写啊】
//为了演示我还是写了
else if (key.isWritable() && key.isValid()) {
writeHandler(key);
}
}
}
} //当选择键是可连接的处理逻辑
//static静态,可用可不用
private boolean connectableHandler(SelectionKey key) throws IOException {
System.out.println("当选择键是可连接的处理逻辑");
SocketChannel sc =null;
/*
每当连接远程主机发现未上线,则会在这里报异常
java.net.ConnectException: Connection refused: no further information
*/
try {
sc = (SocketChannel) key.channel();
//如果管道是连接悬挂
if (sc.isConnectionPending()) {
//管道结束连接
sc.finishConnect();
ByteBuffer buffer = ByteBuffer.allocate(1024);
//=============
Map<String, Object> map = new HashMap<>();
map.put("type", 0);
map.put("date", "socket首次握手成功,你好");
map.put("username", username);
String jsonstr = JSON.toJSONString(map);
//清除索引信息【即position = 0 ;capacity = limit】
buffer.clear();
//指定编码将符串转字字节流
buffer.put(jsonstr.getBytes(StandardCharsets.UTF_8));
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//如果 position < limit ,即仍有缓冲区的数据未写到信道中
while (buffer.hasRemaining()) {
//写操作
sc.write(buffer);
}
sc.register(selector, SelectionKey.OP_READ);
}
return false;
}catch (Exception e){
// e.printStackTrace();
//取消该选择键
key.channel();
//发生异常才关闭
if (sc != null) {
sc.close();
}
return true;
} } //当选择键是可读的处理逻辑
private boolean readHandler(SelectionKey key) throws IOException {
SocketChannel sc = null;
/*
每当服务端强制关闭了连接,就会发送一条数据过来这里说
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
因此需要这里销毁连接,即关闭该socket通道即可
*/
try {
System.out.println("当选择键是可读的处理逻辑");
//获取信道,需要强转
sc = (SocketChannel) key.channel();
//key 获取 通道 关联的 缓冲区【这里使用是报错,read读操作报空指针异常,奇了怪了】
// ByteBuffer buffer = (ByteBuffer) key.attachment();
//只能自定义了
ByteBuffer buffer = ByteBuffer.allocate(1024);
//获取选择器
Selector mselector = key.selector();
//信道做读操作 ,返回读取数据的字节长度
long mreadSize;
//存储从心得解析出来的字符串
String jsonstr = "";
//当字节长度大于零,说明还有信息没有读完
while ((mreadSize = sc.read(buffer)) > 0) {
System.out.println("=======");
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//获取该索引间的数据 ,buffer.get()返回的是节数
byte[] b = buffer.array();
//指定编码将字节流转字符串
jsonstr = new String(b, StandardCharsets.UTF_8);
//打印
System.out.println(jsonstr);
}
//当字节长度为-1时,也就是没有数据可读取了,那么就关闭信道
if (mreadSize == -1) {
sc.close();
}
//检查字符串是否为空
if (!jsonstr.isEmpty()) {
//数据发送过来不为空
//进入业务层 【与服务端的一样写法,我这里就演示服务层了】
// eatService.food(mselector, sc, buffer, jsonstr);
//为了演示响应,我直接用做写就绪事件响应
//注册写就绪事件 ,这句话等同于 直接调用 writeHandler(SelectionKey key)
sc.register(mselector, SelectionKey.OP_WRITE); } return false;
} catch (Exception e) {
// e.printStackTrace();
//取消该选择键
key.channel();
//发生异常才关闭
if (sc != null) {
sc.close();
}
//关闭客户端
return true;
}
} //当选择键是可写的且是有效的处理逻辑
private void writeHandler(SelectionKey key) throws IOException {
System.out.println("当选择键是可写的且是有效的处理逻辑 ,我被自己通知来写东西啦,虽然不知道为什么要分开读写");
//获取信道,需要强转
SocketChannel sc = (SocketChannel) key.channel();
//设置字节缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//=============
Map<String, Object> map = new HashMap<>();
map.put("type", 3);
map.put("date", "我要写东西,你看到了吗" + System.currentTimeMillis());
String jsonstr = JSON.toJSONString(map);
//清除索引信息【即position = 0 ;capacity = limit】
buffer.clear();
//将字符转成字节流放入缓冲中
buffer.put(jsonstr.getBytes(StandardCharsets.UTF_8));
//定位到有效字符的索引【即limit = position ,position = 0 ,capacity 不变】
buffer.flip();
//如果 position < limit ,即仍有缓冲区的数据未写到信道中
while (buffer.hasRemaining()) {
//信道做写操作
sc.write(buffer);
}
//整理索引【即position定位到缓冲区未读的数据末尾 ,capacity = limit】
buffer.compact();
//获取选择器
Selector mselector = key.selector();
//注册读就绪事件
sc.register(mselector, SelectionKey.OP_READ); } }

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

3.测试

(1)分别启动服务端、cen客户端、guo客户端

服务端控制台打印

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

cen客户端

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

guo客户端

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

因为我在服务层设置了如果guo客户端不在线,则不发消息,

不如在线,cen客户端 会发消息给guo客户端,因为cen先上线,因此当时guo还没上线

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

(2)现在换过来

分别启动服务端、guo客户端,【先不启动cen客户端】

服务端控制台打印

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

guo客户端

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

好了,现在启动cen客户端,

希望向guo客户端发送消息

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

【cen客户端控制台打印没什么变化,因为本来就没做什么业务】

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

现在看看guo控制台打印,出来了,多出了几句话,包括cen客户端发来的数据

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

现在查看服务端控制台打印

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

【源码里面的注释够详细了,我懒得再说什么】

(3)现在测试客户端下线,服务端捕获的效果

关闭cen客户端

查看服务端控制台

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

可见,捕获客户端下线成功

源码位置截图 【是在 “当选择键是可读的处理逻辑 ” 方法处捕获的】

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

为什么写在这里?

我在源码的注释详细说明了原因,这里懒得写

(4)反过来,如果服务端突然关闭,客户端会如何

分别开启服务端、cen客户端, 然后在关闭服务端

查看cen客户端控制台打印 【是在 客户端 “当选择键是可读的处理逻辑 ” 方法处捕获的】

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

可检测出服务端关闭了,客户端关闭了【我故意设计的,当服务关闭,客户端也会跟着关闭,其实也可以不关闭,改一下就好了】

源码截图 【是在 客户端 “当选择键是可读的处理逻辑 ” 方法处捕获的】

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

(5)测试当服务端未开启,客户端请求连接会如何

关闭服务端,开启cen客户端

查看cen客户端控制台打印 【是在 客户端 “当选择键是可连接的处理逻辑 ” 方法处捕获的】

NIO【同步非阻塞io模型】关于 NIO socket 的详细总结【Java客户端+Java服务端 + 业务层】【可以客户端间发消息】

------------------------

参考博文原址:

https://www.cnblogs.com/fswhq/p/9788008.html#_label0_2

https://blog.csdn.net/shulianghan/article/details/106411546

https://www.jianshu.com/p/119b11ff837a

https://blog.csdn.net/zhanglong_4444/article/details/89002242

上一篇:c++ 网络编程(九)LINUX/windows-IOCP模型 多线程超详细教程及多线程实现服务端


下一篇:C#开发BIMFACE系列41 服务端API之模型对比