传统的BIO模式
对于传统IO模型,其主要是一个Server对接N个客户端,在客户端连接之后,为每个客户端都分配一个执行线程。
特点:
① 每个客户端连接到达之后,都会分配一个线程给客户端。用于读写数据,编码解码以及业务计算。
② 服务端创建的连接数与服务端可创建线程数量呈线性关系
缺点(根据特点而定):
① 服务端的并发量与服务器可创建线程数有依赖关系
② 服务端线程不仅要IO读写数据,还要业务计算
③ 服务端先进行获取客户端连接,然后读取数据,最后写入数据的这些过程中都是阻塞的。在网络不好的情况下,降低了对线程的利用率,减少了服务器的吞吐量。
IO练习练习一
单发单收:客户端发送一行数据,服务端接收一行数据,通信架构的规则服务端与客户端的通信机制要一致
public class client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1",9999);
OutputStream os = socket.getOutputStream();
//因为接受的时候是接受一行字符打印
//使用打印流比较好
PrintStream ps = new PrintStream(os);
ps.println("Hello world");
ps.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class Server {
public static void main(String[] args) {
try {
//1.创建ServerSocket对象用作服务端端口注册
ServerSocket ss = new ServerSocket(9999);
//2.创建服务端socket
Socket server = ss.accept();
//3.创建服务端输入流
InputStream is = server.getInputStream();
//创建字符输入流读取client传入的字符数据
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
if ((msg = br.readLine()) != null){
System.out.println("服务端接收到的信息" + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
IO练习二
多发多收:客户端发送多条消息,服务端接收多条消息(通过创建线程的方式实现)
缺点:
① 每个Socket接收到都会创建一个线程,线程的竞争,切换上下文影响性能
② 每个线程都会占用栈空间和CPU资源
③ 并不是每个Socket都进行IO操作,无意义的线程处理
④ 客户端并发增加时,服务端呈现线性线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务
package javabasic.moreclient;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 9:21
*/
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1",9999);
OutputStream os = socket.getOutputStream();
PrintStream ps = new PrintStream(os);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.printf("客户端:");
String msg = scanner.nextLine();
ps.println(msg);
ps.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package javabasic.moreclient;
import javabasic.iodemo.Server;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 9:08
*/
/*
* 需求:实现服务端收取多个客户端发送的请求信息
* 思路:服务端每接收到一个客户端的请求的时候都建立一个线程给予通信
* */
public class server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(9999);
while (true){
Socket socket = ss.accept();
ServerThreadSocket threadSocket = new ServerThreadSocket(socket);
threadSocket.run();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package javabasic.moreclient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 9:15
*/
public class ServerThreadSocket extends Thread{
private Socket socket;
public ServerThreadSocket(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null){
System.out.printf("服务端接收到的消息为" + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
IO练习三:改进IO练习二
多发多收:客户端发送多条消息,服务端接收多条消息(通过创建线程池的消息队列实现伪异步io)
package javabasic.fakenio;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 10:09
*/
public class client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1",9999);
OutputStream os = socket.getOutputStream();
PrintStream ps = new PrintStream(os);
Scanner scanner = new Scanner(System.in);
while (true){
System.out.printf("客户端:");
String msg = scanner.nextLine();
ps.println(msg);
ps.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package javabasic.fakenio;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 9:58
*/
public class HandlerSocketPool {
private ExecutorService executorService;
/*
*public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
* */
public HandlerSocketPool(int maximumPoolSize,int queueSize){
executorService = new ThreadPoolExecutor(3,maximumPoolSize,120
, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable target){
executorService.execute(target);
}
}
package javabasic.fakenio;
import jdk.net.Sockets;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 9:56
*/
/*
* 实现伪异步io通信
* */
public class server {
public static void main(String[] args) {
try {
//1.创建服务端注册端口
ServerSocket ss = new ServerSocket(9999);
//2.创建线程池对象
HandlerSocketPool pool = new HandlerSocketPool(10,12);
while (true){
//3.接收到客户端的请求,连接对象
Socket socket = ss.accept();
ServerRunnableTarget target = new ServerRunnableTarget(socket);
pool.execute(target);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package javabasic.fakenio;
import jdk.net.Sockets;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 10:06
*/
public class ServerRunnableTarget implements Runnable{
private Socket socket;
public ServerRunnableTarget(Socket socket){
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null){
System.out.println("服务端接收到的数据是" + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
IO练习四
基于BIO模式下实现文件上传的案例,服务端使用根据连接数线性创建线程
思路:先用data流读取文件后缀名,然后再读取文件的数据。
package javabasic.fileupload;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 10:27
*/
/*
* 需求:实现客户端发送任意类型的文件数据给服务端保存起来
*
* */
public class Client {
public static void main(String[] args) {
FileInputStream fis = null;
DataOutputStream dos = null;
try {
fis = new FileInputStream("D:\\照片\\01.txt");
Socket socket = new Socket("127.0.0.1",9999);
OutputStream os = socket.getOutputStream();
dos = new DataOutputStream(os);
dos.writeUTF(".txt");
byte[] buffer = new byte[1024];
int len = 0;
while ((len = fis.read(buffer)) > 0){
dos.write(buffer,0,len);
}
dos.flush();
socket.shutdownOutput();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(fis != null){
try {
fis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(dos != null){
try {
dos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
package javabasic.fileupload;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 10:27
*/
/*
* 需求:服务端可以接收客户端任意类型数据的文件,并保存到服务端的磁盘中去
* */
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(9999);
while (true){
Socket socket = ss.accept();
SocketThread thread = new SocketThread(socket);
thread.run();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package javabasic.fileupload;
import java.io.DataInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.UUID;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/12 11:24
*/
public class SocketThread extends Thread{
private Socket socket;
public SocketThread(Socket socket){
this.socket = socket;
}
@Override
public void run() {
FileOutputStream fos = null;
DataInputStream dis = null;
try {
InputStream is = socket.getInputStream();
dis = new DataInputStream(is);
String suffix = dis.readUTF();
fos = new FileOutputStream("D:\\照片\\server\\" + UUID.randomUUID() + suffix);
int len = 0;
byte[] buffer = new byte[1024];
while ((len = dis.read(buffer)) > 0){
fos.write(buffer,0,len);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if(fos != null){
try {
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
IO练习练习常用方法
public class Demo1 {
public static void main(String[] args) {
}
//递归地列出一个目录下所有文件:
private static void listAllFiles(File dir){
if(dir == null || !dir.exists()){
return;
}
if(dir.isFile()){
System.out.println(dir.getName());
return;
}
for (File file:dir.listFiles()) {
listAllFiles(file);
}
}
//复制文件
public static void copyFile(String src,String dest) throws IOException {
FileInputStream in = new FileInputStream(src);
FileOutputStream out = new FileOutputStream(dest);
byte[] buffer = new byte[20*1024];
while (in.read(buffer,0,buffer.length) != -1){
out.write(buffer);
}
in.close();
out.close();
}
//实现逐行输出文本文件的内容
public static void readFileContent(String filePath) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line ="";
while(br.readLine() != null){
System.out.println(line = br.readLine());
}
//为什么只需要关闭一个窗口即可因为bufferReader采用的是装饰器模式
//调用BufferReader的close方法也会调用FileReader的close方法
br.close();
}
}
NIO三大重点
一、通道
通道与流的不同之处在于
流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。 通道包括以下类型:
① FileChannel: 从文件中读写数据;
② DatagramChannel: 通过 UDP 读写网络中数据;
③ SocketChannel: 通过 TCP 读写网络中数据;
④ ServerSocketChannel: 可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
二、缓冲区
发送给一个通道的所有数据都必须首先放到缓冲区中,同样地,从通道中读取的任何数据都要先读到缓冲区中。也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。 缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。
public static ByteBuffer allocate(int capacity)
public static ByteBuffer allocateDirect(int capacity)
第一种分配方式产生的内存开销是在JVM中的,而另外一种的分配方式产生的开销在JVM之外,以就是系统级的内存分配。当Java程序接收到外部传来的数据时,首先是被系统内存所获取,然后在由系统内存复制复制到JVM内存*Java程序使用。
区别:数据量小第一种比较快因为直接就分配到JVM内存中,但当数据量大的时候使用后者较好,因为第一种还要从系统内存当中复制到JVM内存中。
public class demo2 {
public static void main(String[] args) throws IOException {
FileInputStream fio = new FileInputStream("src");
//获取输入字节流的通道
FileChannel fiochannel = fio.getChannel();
FileOutputStream fos = new FileOutputStream("dist");
//获取输出字节流的通道
FileChannel foschannel = fos.getChannel();
//为缓冲区分配1024字节
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
while (true){
//从输入通道中读取数据到缓冲区中
int r = fiochannel.read(byteBuffer);
if(r == -1)
break;
//缓冲区切换读写
byteBuffer.flip();
//由输出通道去写缓冲区的内容
foschannel.write(byteBuffer);
//清空缓冲区的内容
byteBuffer.clear();
}
}
}
三、选择器
NIO 实现了 IO 多路复用中的 Reactor 模型,一个线程 Thread 使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件,从而让一个线程就可以处理多个事件。
ServerSocketChannel ssChannel = ServerSocketChannel.open();
ssChannel.configureBlocking(false);
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行
在将通道注册到选择器上时,还需要指定要注册的具体事件,主要有以下几类:
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
① SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
② SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功
③ SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
④ SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)
1.当向通道中注册SelectionKey.OP_READ事件后,如果客户端有向缓存中write数据,下次轮询时,则会 isReadable()=true;
2.当向通道中注册SelectionKey.OP_WRITE事件后,这时你会发现当前轮询线程中isWritable()一直为ture,如果不设置为其他事件
4.IO多路复用
NIOClient实例
package javabasic.niodemo;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/18 11:39
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
OutputStream out = socket.getOutputStream();
String s = "hello world";
//前后IO传输格式一致,byte字节
out.write(s.getBytes());
out.close();
}
}
NIOServer实例
package javabasic.niodemo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author Lyyyys
* @version 1.0
* @date 2021/7/18 11:39
*/
public class NIOServer {
public static void main(String[] args) throws Exception {
//1.创建选择器
Selector selector = Selector.open();
//2.讲通道注册到选择器上
ServerSocketChannel sschannel = ServerSocketChannel.open();
sschannel.register(selector, SelectionKey.OP_ACCEPT);
sschannel.configureBlocking(false);
//设置绑定ServerSocket的地址
ServerSocket ssocket = sschannel.socket();
InetSocketAddress address = new InetSocketAddress("127.0.0.1",8888);
ssocket.bind(address);
//4.事件循环
while (true){
//3.监听事件
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
if(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isAcceptable()){
ServerSocketChannel sschannel1 = (ServerSocketChannel) key.channel();
//服务器为每一个连接创建SocketChannel
SocketChannel sChannel = sschannel1.accept();
sChannel.configureBlocking(false);
sChannel.register(selector,SelectionKey.OP_READ);
}else if(key.isReadable()){
SocketChannel sChannel = (SocketChannel) key.channel();
System.out.println(readDataFromSocketChannel(sChannel));
sChannel.close();
}
iterator.remove();
}
}
}
private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuffer data = new StringBuffer();
while (true){
int n = sChannel.read(buffer);
if(n == -1){
break;
}
buffer.flip();
int limit = buffer.limit();
char[] dst = new char[limit];
for (int i = 0; i < limit; i++) {
dst[i] = (char) buffer.get(i);
}
data.append(dst);
buffer.clear();
}
return data.toString();
}
}