御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序

文章目录


前言

java 1.4版本推出了一种新型的IO API,与原来的IO具有相同的作用和目的;可代替标准java IO,只是实现的方式不一样,NIO面向缓冲区、基于通道的IO操作;通过NIO可以提高对文件的读写操作。基于这种优势,现在使用NIO的场景越来愈多,很多主流行的框架都使用到了NIO技术,如Tomcat、Netty、Jetty等;所以学习和掌握NIO技术已经是一个java开发的必备技能了。


一、IO与NIO

1.面向流与面向缓冲区

Java IO中读取数据和写入数据是**面向流(Stream)**的,这表示当我们从流中读取数据,写入数据时也将其写入流,流的含义在于没有缓存 ,就好像我们站在流水线前,所有的数据沿着流水线依次到达我们的面前,我们只能读取当前的数据。如果需要获取某个数据的前一项或后一项数据那就必须自己缓存数据,而不能直接从流中获取。

而在Java NIO中数据的读写是面向**缓冲区(Buffer)**的,读取时可以将整块的数据读取到缓冲区中,在写入时则可以将整个缓冲区中的数据一起写入。这就好像是将流水线传输变成了卡车运送,面向流的数据读写只提供了一个数据流切面,而面向缓冲区的IO则使我们能够看到数据的上下文,也就是说在缓冲区中获取某项数据的前一项数据或者是后一项数据十分方便。这种便利是有代价的,因为我们必须管理好缓冲区,这包括不能让新的数据覆盖了缓冲区中还没有被处理的有用数据;将缓冲区中的数据正确的分块,分清哪些被处理过哪些还没有等等。

2.阻塞与非阻塞

传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。因此,在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量客户端时,性能急剧下降。

Java NIO非阻塞模式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此,NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

二、TCP聊天程序

1.基于IO

IO服务端

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class IOServer {

    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {

        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //创建socket服务,监听8081端口
        ServerSocket server=new ServerSocket(8081);
        System.out.println("服务器启动!");
        int count=0;
        while(true){
            //获取一个套接字(阻塞)
            final Socket socket = server.accept();
            System.out.println("欢迎第"+(++count)+"个同学");
            newCachedThreadPool.execute(new Runnable() {

                @Override
                public void run() {
                    //业务处理
                    handler(socket);
                }
            });

        }
    }

  
 //读取数据
    
    public static void handler(Socket socket){
        try {
            byte[] bytes = new byte[1024];
            InputStream inputStream = socket.getInputStream();

            while(true){
                //读取数据(阻塞)
                int read = inputStream.read(bytes);
                if(read != -1){
                    System.out.println(new String(bytes, 0, read));
                }else{
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                System.out.println("socket关闭");

                socket.close();
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}

IO客户端

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

public class IOClient {
    public static void main(String[] args) throws IOException {
        //发送十次
        for (int i=0;i<10;i++){
            Socket socket=new Socket("127.0.0.1", 8081);
            //写数据
            OutputStream os=socket.getOutputStream();
            os.write(("御神楽"+i).getBytes());
            //释放资源
            socket.close();
        }

    }

}

效果:
御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序

2.基于NIO

NIO服务端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NIOServer {
    // 通道管理器
    private Selector selector;


     //启动服务端测试

    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8081);
        server.listen();
    }



     // 获得一个ServerSocket通道,并对该通道做一些初始化的工作

    public void initServer(int port) throws IOException {
        // 获得一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverChannel.configureBlocking(false);
        // 将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道管理器
        this.selector = Selector.open();
        // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
        // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }


     //采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理

    public void listen() throws IOException {
        System.out.println("服务端启动成功!");
        // 轮询访问selector
        while (true) {
            // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞
            selector.select();
            // 获得selector中选中的项的迭代器,选中的项为注册的事件
            Iterator<?> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已选的key,以防重复处理
                ite.remove();

                handler(key);
            }
        }
    }


     //处理请求

    public void handler(SelectionKey key) throws IOException {

        // 客户端请求连接事件
        if (key.isAcceptable()) {
            handlerAccept(key);
            // 获得了可读的事件
        } else if (key.isReadable()) {
            handelerRead(key);
        }
    }


     // 处理连接请求

    public void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 获得和客户端连接的通道
        SocketChannel channel = server.accept();
        // 设置成非阻塞
        channel.configureBlocking(false);

        // 在这里可以给客户端发送信息哦
        System.out.println("检测到新客户连接");
        // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
        channel.register(this.selector, SelectionKey.OP_READ);
    }


     // 处理读的事件

    public void handelerRead(SelectionKey key) throws IOException {
        // 服务器可读取消息:得到事件发生的Socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 创建读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        if(read > 0){
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("用户名为:" + msg);

            //回写数据
            ByteBuffer outBuffer = ByteBuffer.wrap("服务器已接收".getBytes());
            channel.write(outBuffer);// 将消息回送给客户端
        }else{
            System.out.println("客户端关闭");
            key.cancel();
        }
    }
}

NIO客户端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {
    public static void main(String[] args) throws Exception {
        final int count[]=new int[1];
        count[0]=1;
        for(int i=0;i<5;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    SocketChannel socketChannel = null;
                    //发送的数据
                    String str = "御神楽"+count[0]++;
                    ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());

                    //接受的数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    try {
                        //建立连接
                        socketChannel = SocketChannel.open();
                        socketChannel.configureBlocking(false);
                        if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8081))) {
                            //等待连接
                            while (!socketChannel.finishConnect()) {
                            }
                        }
                        //写入数据
                        socketChannel.write(byteBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    //10s后自动断开连接
                    int time=1;
                    while (time<10){
                        time++;
                        try {
                            //读取数据
                            int read=socketChannel.read(buffer);
                            if(read > 0) {
                                byte[] data = buffer.array();
                                String msg = new String(data).trim();
                                System.out.println("客户端收到信息:" + msg);
                            }
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            Thread.sleep(100);
        }
    }

}

测试效果:
服务器:
御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序
客户端:
御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序

3.基于Netty

Netty服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class NettyServer {
    public static void main(String[] args) {
        //用于处理服务器端接收客户端连接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //进行网络通信(读写)
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //辅助工具类,用于服务器通道的一系列配置
            ServerBootstrap bootstrap = new ServerBootstrap();
            //绑定两个线程组
            bootstrap.group(bossGroup,workerGroup)
                    //设置boss selector建立channel使用的对象
                    .channel(NioServerSocketChannel.class)
                    //boss 等待连接的 队列长度
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //处理消息对象
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //创建管道
                            ChannelPipeline pipeline = ch.pipeline();
                            //解码方式
                            pipeline.addLast("decoder",new StringDecoder());
                            //编码方式
                            pipeline.addLast("encoder",new StringEncoder());
                            //自定义处理消息对象
                            pipeline.addLast(new ServerHandler());
                        }
                    });
            System.out.println("服务器正在启动");
            //绑定端口号
            ChannelFuture cf = bootstrap.bind(8083).sync();

            cf.addListener(cd->{
                if(cd.isSuccess()){
                    System.out.println("启动成功");
                }else{
                    System.out.println("启动失败");
                }
            });
            //服务端给所有客户端发信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()){
                String msg = scanner.nextLine();
                ServerHandler.sendAll(msg);
            }
            //阻塞当前线程
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }



}
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.Date;

public class ServerHandler extends SimpleChannelInboundHandler<String> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+" == " +msg);
        channelGroup.forEach(ch->{
            if (channel!=ch) {
                ch.writeAndFlush("[ 客户端 ]" + channel.remoteAddress() + "发送了消息 : " + msg + "\n");
            }else{
                ch.writeAndFlush("[ 客户 ] 发送了消息: " + msg + "\n");
            }
        });

    }
    //用于服务端发信息给所有客户端
    public static void sendAll(String msg){
        channelGroup.forEach(channel -> {
            channel.writeAndFlush("服务器: "+msg+"\n");
        });
    }


     // 当有新的用户连接触发

    public void channelActive(ChannelHandlerContext ctx){
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ 客户端]"+channel.remoteAddress()+" 已连接 "+sf.format(new Date())+"\n");
        //把新来的连接加入
        channelGroup.add(channel);
        System.out.println(ctx.channel().remoteAddress()+" 上线了" + "\n");
    }


     //当用户断开连接触发

    public void channelInactive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[ 客户端 ] " +channel.remoteAddress()+ " 断开连接"+"\n");
        System.out.println(channel.remoteAddress()+" 下线了.\n");
        System.out.println("channelGroup size = "+ channelGroup.size());
    }


}

Netty客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.ArrayList;
import java.util.List;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        List<ChannelFuture> channelFutures = new ArrayList<ChannelFuture>();
        try {
            Bootstrap bootstrap = new Bootstrap();
            //服务器可以主动断开连接
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            //地址复用
            bootstrap.option(ChannelOption.SO_REUSEADDR, true);
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            pipeline.addLast(new ClientHandler());
                        }
                    });
            final int count[] =new int[1];
            count[0]=0;
            for(int i=0;i<3;i++){
                //添加连接
                channelFutures.add(bootstrap.connect("127.0.0.1",8083).sync());
                //新建线程模拟多用户
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        int index=count[0]++;
                        //获取对应管道
                        Channel channel = channelFutures.get(index).channel();
                        System.out.println( "======"+channel.localAddress()+"======");
                        int time=0;
                        while (time++<3){
                            //发送数据
                            String msg =" 御神楽 "+(index)+": "+time;
                            channel.writeAndFlush(msg);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        //关闭连接
                        channel.close();

                    }
                }).start();

            }

            //阻塞主线程,否则会直接执行finally关闭EventLoopGroup
            int time=0;
            while (time++<5){
                Thread.sleep(1000);
            }

        } finally {
            //关闭EventLoopGroup
            group.shutdownGracefully();
        }

    }

}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }

}

演示
服务端:
御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序
客户端
御神楽的学习记录之基于IO、NIO、Netty的TCP聊天程序


参考

https://blog.csdn.net/linjpg/article/details/80962453
https://blog.csdn.net/qq_47281915/article/details/121802536

上一篇:迅为RK3399开发板Android 固件烧写(一)


下一篇:# fireflyrk3399 linux4.19移植笔记