IO四大模型
1、同步阻塞IO(Blocking IO)
2、同步非阻塞IO(Non-blocking IO)
3、IO多路复用模型(IO Multiplexing)
4、异步IO模型(Asynchronous IO)
BIO(blocking I/O) 同步阻塞
- 问题分析
1-每个请求创建独立线程,与对应的客户端进行数据Read,业务处理,数据Write
2-当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
3-连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在Read操作上,造成线程资源浪费。
----BIO进行通信
public class BIOServer {
public static void main(String[] args) throws Exception {
// 线程池
// 如果有客户端连狙就创建一个线程与之通信
// 创建一个线程池
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
// 创建serversocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了。。。");
while (true){
System.out.println("等待连接。。。");
// 监听等待客户端连接
final Socket socket = serverSocket.accept();
System.out.println("----连接:------------------------------------------");
System.out.println("连接到了一个客户端");
//创建一个线程与之通信
newCachedThreadPool.execute(new Runnable() {
@Override
public void run() {
// 可以和客户端通讯 handler(socket);
}
});
}
}
// 编写一个handler方法 和客户端通讯
public static void handler(Socket socket){
try {
System.out.println("线程信息 id="+Thread.currentThread().getId());
System.out.println("线程信息 name="+Thread.currentThread().getName());
System.out.println("等待读取。。。");
byte[] bytes = new byte[1024];
// 通过socket获取一个输入流
InputStream inputStream = socket.getInputStream();
// 循环读取客户端发送的数据
while (true){
System.out.println("----读取信息-------------------------------------");
System.out.println("线程信息 id="+Thread.currentThread().getId());
System.out.println("线程信息 name="+Thread.currentThread().getName());
int read = inputStream.read(bytes);
if (read != -1){
System.out.println(new String(bytes,0,read));
}else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
System.out.println("关闭cliant连接");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
NIO(non-blocking I/O) 同步非阻塞
-
三大核心部分
1-Selector(选择器)
2-Channel(通道)
3-Buffer(缓冲区) -
Selector、Channel、Buffer的关系说明:
1-每个channel都会对应一个Buffer
2-Selector对应一个线程,一个线程对应多个channel
3-程序切换哪个channel由事件决定,Event就是一个重要概念
4-selector会根据不同事件在各个通道切换
5-buffer就是要给内存块,底层是有一个数组
6-数据读取写入通过Buffer
,NIO的Buffer是可以读也可以写,需要flip方法切换。
,BIO中要么是输入流,或者是输出流,不能双向。
7-channerl是双向的,可以返回底层操作系统的情况,如linux底层的操作系统通道是双向的。
Buffer
-
重要属性
【capsity】容量大小
【limit】缓冲区当前终点
【position】位置
【mark】标志 -
buffer方法:
【flip()】就是把position的位置改变回0,以实现读写转换,读写实质就是position位置改变。
【clear()】把标志位重置一下
Channel
channel是Nio中的一个接口
【SoketChannel】 类似 Soket
【ServerSoketChannel】类似ServerSoket
【FileChannel】用于文件的数据读写,
【DatagramChannel】用于UDP的数据读写,
【ServerSocketChannel和 SocketChannel】用于TCP的数据读写。
-
Channel方法:
【.writer()】buffer内容写入channel
【.read()】buffer读取channel内容
【.transferFrom(oherChannel,startPosition,endPosition)】把其他channel数据复制到当前channel
【.transferTo(oherChannel,startPosition,endPosition)】把当前channel数据复制到其他channel -
文件拷贝
----通过buffer进行copy
public class NIOFileChannel03copy {
public static void main(String[] args) throws Exception{
FileInputStream fileInputStream = new FileInputStream("D:\\Users\\xiaoaiying\\IDEAProjects\\netty\\NettyPro\\1.txt");
FileChannel fileChannel_in = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("D:\\Users\\xiaoaiying\\IDEAProjects\\netty\\NettyPro\\2.txt");
FileChannel fileChannel_out = fileOutputStream.getChannel();
//buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
while (true){
// 重要步骤,把buffer标志位恢复到初始状态,否则循环读写时如果一次没读完可能会发生错误
byteBuffer.clear();
// 循环将读取到的数据放入buffer,
int read = fileChannel_in.read(byteBuffer);
if (read == -1){
break;
}
// 将buffer内数据写出
// 记得先反转buffer
byteBuffer.flip();
fileChannel_out.write(byteBuffer);
}
}
}
----通过channel封装好的方法copy
public class NIOFileChannel04copy {
public static void main(String[] args) throws Exception{
FileInputStream fileInputStream = new FileInputStream("D:\\Users\\xiaoaiying\\IDEAProjects\\netty\\NettyPro\\1.txt");
FileChannel fileChannel_in = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("D:\\Users\\xiaoaiying\\IDEAProjects\\netty\\NettyPro\\2.txt");
FileChannel fileChannel_out = fileOutputStream.getChannel();
fileChannel_in.transferTo(0, fileChannel_in.size(),fileChannel_out);
// 或者
// fileChannel_out.transferFrom(fileChannel_in,0,fileChannel_in.size());
fileChannel_in.close();
fileChannel_out.close();
fileInputStream.close();
fileOutputStream.close();
}
}
-
buffer只读
【.asReadonlyBuffer()】buffer方法,返回一个只读buffer -
MappedByteBuffer
public class MappedByteBufferTest {
public static void main(String[] args) throws Exception{
RandomAccessFile randomAccessFile = new RandomAccessFile("D:\\Users\\xiaoaiying\\IDEAProjects\\netty\\NettyPro\\1.txt","rw");
FileChannel channel = randomAccessFile.getChannel();
/**
* MappedByteBuffer,可以让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由NIO来完成
* 参数1:Filechannel.MapMode . READ_wRITE使用的读写模式
* 参数2:可以直接修改的起始位置
* 参数3:是映射到内存的大小,即将文件的多少个字节映射到内存
* 可以直接修改的范围就是0-5,不包括5
* 实际类型DirectByteBuffer
*/
MappedByteBuffer mappedByteBuffer = channel.map(
FileChannel.MapMode.READ_WRITE,
0,
5);
// 修改内容
mappedByteBuffer.put(0,(byte)‘H‘);
mappedByteBuffer.put(3,(byte)‘9‘);
}
}
- buffer的 分散和聚集
【Scattering】[分散] 将数据写入到buffer时,可以采用buffer数组,依次写入
【Gathering】[聚集] 从buffer读取数据时,可以采用buffer数组,依次读
public class ScatteringAndGatheringTest {
public static void main(String[] args) throws Exception {
// 使用
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
// 绑定端口到socket,并启动
serverSocketChannel.socket().bind(inetSocketAddress);
// 创建buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
// 等待客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 假定从客户端接收8字节
int messageLenght = 8;
// 循环读取
while (true){
int byteRead = 0;
while (byteRead < messageLenght){
long l = socketChannel.read(byteBuffers);
byteRead += l; // 累计读取的字节数
System.out.println("byteRead=" + byteRead);
// 使用流打印,看看当前这个buffer的position和limit
Arrays.asList(byteBuffers)
.stream()
.map(buffer -> "postion="+buffer.position()+",limit="+buffer.limit())
.forEach(System.out::println);
// 将所有buffer进行flip
Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());
// 将数据读出显示到客户端
long byteWrite = 0;
while (byteWrite < messageLenght){
long wlen = socketChannel.write(byteBuffers);
byteWrite += wlen;
}
// 将所有buffer进行flip
Arrays.asList(byteBuffers).forEach(buffer -> buffer.clear());
System.out.println("byteRead="+byteRead +"byteWrie="+byteWrite + ",messagelenght="+messageLenght);
}
}
}
}
selector
selector能够检测多个注册的通道上是否有事件发生,(多个channel以事件的方式可以注册到同一个selector)
- selector
selector方法:
【open()】得到一个选择器对象
【select()】
【selectedKeys()】获得selectKey集合
- selectionKey
selectionkey方法:
【selector()】得到与之关联的selector
【channel()】 得到与之关联的channel
【attachment()】 得到与之关联的buffer,即数据
public abstract SelectionKey interestOps(int ops); //设置或改变监听事件
public final boolean isAcceptable(); //是否可以accept
public final boolean isReadable();//是否可以读
public final boolean isWritable(); //是否可以写
selectionKey对应channel,channel绑定到selector
- nio操作
----服务端
public class NIOserver {
public static void main(String[] args) throws Exception {
// 创建serversoketChaneel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(6666)); // 绑定端口
serverSocketChannel.configureBlocking(false);// 设置非阻塞
// 得到一个selector对象 selector开始监听
Selector selector = Selector.open();
// 把serverSocketChannel注册到selector 关系事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 循环等待客户端连接
while (true){
// 等待1秒,如果没有事件发生
if (selector.select(3000) == 0){
System.out.println("服务器等待了3秒,无连接");
continue;
}
// 如果返回>0 获取到selectionKey集合
// selector.selectedKeys() 返回关注事件集合
// 反向获取通道
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()){// 遍历有操作的selectionkey
SelectionKey key = keyIterator.next();// 获取到selectionkey
if (key.isAcceptable()){ // 如果事件是OP_ACCEPT 有新客户端连接
// 给该客户端生成一个socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);// 将socketChannel设置为非阻塞
System.out.println("客户端连接成功 生成一个socketChannel:"+socketChannel.hashCode());
// 将当前socketChannel 注册到selector 关注事件为OP_READ 通过socketChannel关联一个buffer
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()){ // 如果事件是OP_READ
// 通过key 反向获取channel
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
// 获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
socketChannel.read(buffer);
System.out.println("form client:"+ new String(buffer.array()));
}
// 手动从集合中移动当前的selectionKey 防止重复操作
keyIterator.remove();
}
// System.out.println("无连接,关闭!!!");
}
}
}
----客户端
public class NIOclient {
public static void main(String[] args) throws Exception {
// 得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
// 设置非阻塞
socketChannel.configureBlocking(false);
// 提供服务器端的ip和端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
// 连接服务器
if (!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作。。。");
}
}
// 如果连接成功就放数据
String str = "hello,小艾";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
// 发动数据,将buffer数据写入channel
socketChannel.write(buffer);
System.in.read();
}
}