Socket网络编程学习笔记 (9)TCP数据发送与接收并行

主要实现:

  • 多线程收发并行
  • TCP多线程收发协作

1. TCP 服务端收发并行重构

1.1 启动main方法重构

原有的main逻辑如下:

Socket网络编程学习笔记 (9)TCP数据发送与接收并行

 重构后如下:

/**
 * @ClassName Server
 * @Description TODO
 * @Author wushaopei
 * @Date 2022/2/27 13:00
 * @Version 1.0
 */
public class Server {

    public static void main(String[] args) throws IOException {

        TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);
        boolean isSucceed = tcpServer.start();
        if(!isSucceed){
            System.out.println("Start TCP server failed.");
        }
        UDPProvider.start(TCPConstants.PORT_SERVER);

        // 键盘输入:
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        String str;
        do {
            str = bufferedReader.readLine();
            tcpServer.broadcast(str);
        } while (!"00bye00".equalsIgnoreCase(str));

        UDPProvider.stop();
        tcpServer.stop();
    }
}

重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。

1.2 重构分离收发消息的操作

创建 ClientHandler.java 重构收发消息操作:

/**
 * @ClassName ClientHandler
 * @Description TODO
 * @Author wushaopei
 * @Date 2022/2/27 16:44
 * @Version 1.0
 */
public class ClientHandler {
    private final Socket socket;
    private final ClientReadHandler readHandler;
    private final ClientWriteHandler writeHandler;
    private final CloseNotiry closeNotiry;

    public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {
        this.socket = socket;
        this.readHandler = new ClientReadHandler(socket.getInputStream());
        this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
        this.closeNotiry = closeNotiry;
        System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort());
    } 
}

重构接收消息的操作:

    /**
     * 接收数据
     */
    class ClientReadHandler extends Thread {

        private boolean done = false;
        private final InputStream inputStream;

        ClientReadHandler(InputStream inputStream){
            this.inputStream = inputStream;
        }
        @Override
        public void run(){
            super.run();
            try {
                // 得到输入流,用于接收数据
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    // 客户端拿到一条数据
                    String str = socketInput.readLine();
                    if(str == null){
                        System.out.println("客户端已无法读取数据!");
                        // 退出当前客户端
                        ClientHandler.this.exitBySelf();
                        break;
                    }
                    // 打印到屏幕
                    System.out.println(str);
                }while (!done);
                socketInput.close();
            }catch (IOException e){
                if(!done){
                    System.out.println("连接异常断开");
                    ClientHandler.this.exitBySelf();
                }
            }finally {
                // 连接关闭
                CloseUtils.close(inputStream);
            }
        }
        void exit(){
            done = true;
            CloseUtils.close(inputStream);
        }
    }

创建一个单独的线程进行接收消息,该线程不需要关闭。

重构发送消息:

    /**
     * 发送数据
     */
    class ClientWriteHandler {
        private boolean done = false;
        private final PrintStream printStream;
        private final ExecutorService executorService;

        ClientWriteHandler(OutputStream outputStream) {
            this.printStream = new PrintStream(outputStream);
            // 发送消息使用线程池来实现
            this.executorService = Executors.newSingleThreadExecutor();
        }

        void exit(){
            done = true;
            CloseUtils.close(printStream);
            executorService.shutdown();
        }

        void send(String str) {
            executorService.execute(new WriteRunnable(str));
        }

        class WriteRunnable implements  Runnable{
            private final String msg;

            WriteRunnable(String msg){
                this.msg = msg;
            }
            @Override
            public void run(){
                if(ClientWriteHandler.this.done){
                    return;
                }
                try {
                    ClientWriteHandler.this.printStream.println(msg);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

TCPServer调用发送消息的逻辑:

    public void broadcast(String str) {
        for (ClientHandler client : clientHandlerList){
            // 发送消息
            client.send(str);
        }
    }

1.3 监听客户端链接逻辑重构

    private List<ClientHandler> clientHandlerList = new ArrayList<>();

    /**
     * 监听客户端链接
     */
    private class ClientListener extends Thread {
        private ServerSocket server;
        private boolean done = false;

        private ClientListener(int port) throws IOException {
            server = new ServerSocket(port);
            System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());
        }

        @Override
        public void run(){
            super.run();

            System.out.println("服务器准备就绪~");
            // 等待客户端连接
            do{
                // 得到客户端
                Socket client;
                try {
                    client = server.accept();
                }catch (Exception e){
                    continue;
                }
                try {
                    // 客户端构建异步线程
                    ClientHandler  clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));
                    // 启动线程
                    clientHandler.readToPrint();
                    clientHandlerList.add(clientHandler);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("客户端连接异常: " + e.getMessage());
                }

            }while (!done);
            System.out.println("服务器已关闭!");
        }
        void exit(){
            done = true;
            try {
                server.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
clientHandlerList作为类变量,用于管理当前用户的信息。接收与发送都使用该变量。

1.4 Socket、流的退出与关闭:

    /**
     * 退出、关闭流
     */
    public void exit(){
        readHandler.exit();
        writeHandler.exit();
        CloseUtils.close(socket);
        System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());
    }

    /**
     * 发送消息
     * @param str
     */
    public void send(String str){
        writeHandler.send(str);
    }

    /**
     * 接收消息
     */
    public void readToPrint() {
        readHandler.exit();
    }

    /**
     *  接收、发送消息异常,自动关闭
     */
    private void exitBySelf() {
        exit();
        closeNotiry.onSelfClosed(this);
    }
    /**
     *  关闭流
     */
    public interface CloseNotiry{
        void onSelfClosed(ClientHandler handler);
    }

2. TCP 客户端收发并行重构

2.1 客户端 main函数重构

    public static void main(String[] args) {
        // 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机
        ServerInfo info = UDPSearcher.searchServer(10000);
        System.out.println("Server:" + info);

        if( info != null){
            try {
                TCPClient.linkWith(info);
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

2.2 客户端接收消息重构

    static class ReadHandler extends Thread{
        private boolean done = false;
        private final InputStream inputStream;

        ReadHandler(InputStream inputStream){
            this.inputStream = inputStream;
        }
        @Override
        public void run(){
            try {
                // 得到输入流,用于接收数据
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    // 客户端拿到一条数据
                    String str = null;
                    try {
                        str = socketInput.readLine();
                    }catch (SocketTimeoutException e){

                    }
                    if(str == null){
                        System.out.println("连接已关闭,无法读取数据!");
                        break;
                    }
                    // 打印到屏幕
                    System.out.println(str);
                }while (!done);
                socketInput.close();
            }catch (IOException e){
                if(!done){
                    System.out.println("连接异常断开:" + e.getMessage());
                }
            }finally {
                // 连接关闭
                CloseUtils.close(inputStream);
            }
        }
        void exit(){
            done = true;
            CloseUtils.close(inputStream);
        }
    }

创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。

2.3 客户端发送消息重构

    private static void write(Socket client) throws IOException {
        // 构建键盘输入流
        InputStream in = System.in;
        BufferedReader input = new BufferedReader(new InputStreamReader(in));

        // 得到Socket输出流,并转换为打印流
        OutputStream outputStream = client.getOutputStream();
        PrintStream socketPrintStream = new PrintStream(outputStream);

        boolean flag = true;
        do {
            // 键盘读取一行
            String str = input.readLine();
            // 发送到服务器
            socketPrintStream.println(str);

            // 从服务器读取一行
            if("00bye00".equalsIgnoreCase(str)){
                break;
            }
        }while(flag);
        // 资源释放
        socketPrintStream.close();
    }

在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。

2.4 客户端 linkWith 主方法重构

     public static void linkWith(ServerInfo info) throws IOException {
        Socket socket = new Socket();
        // 超时时间
        socket.setSoTimeout(3000);
        // 端口2000;超时时间300ms
        socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//

        System.out.println("已发起服务器连接,并进入后续流程~");
        System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());
        System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());

        try {
            ReadHandler readHandler = new ReadHandler(socket.getInputStream());
            readHandler.start();
            // 发送接收数据
            write(socket);
        }catch (Exception e){
            System.out.println("异常关闭");
        }

        // 释放资源
        socket.close();
        System.out.println("客户端已退出~");
    }

原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。

3. TCP 收发并行重构测试:

服务端重构后日志:

J:\folder\JDK1.8\bin\java -ja…… - Channel\out\production\classes" server.Server
服务器信息: 0.0.0.0/0.0.0.0	P:30401
服务器准备就绪~
UDDProvider Started.
ServerProvider receive from ip:169.254.178.74	port:169.254.178.74	port:49878	dataValid:true
ServerProvider response to:169.254.178.74	port:30202	dataLen: 50
新客户链接: /169.254.178.74	P:51094
ping
pong
00bye0000bye00
客户端已无法读取数据!
客户端已退出:/169.254.178.74	P:51094

客户端重构后执行日志:

J:\folder\JDK1.8\bin\java -javaagent:J:\……- Channel\out\production\classes" client.Client
UDPSearcher Started.
UDPSearcher start listen.
UDPSearcher sendBroadcast started.
UDPSearcher sendBroadcast finished.
UDPSearch receive form ip:169.254.178.74	port:30201	dataValid:true
UDPSearcher Finished.
UDPSearcher listener finished.
Server:ServerInfo{sn='4cd6143b-205f-4e80-9b22-a6eae8e85cdd', port=30401, address='169.254.178.74'}
已发起服务器连接,并进入后续流程~
客户端信息: /169.254.178.74	P:51094
服务器信息:/169.254.178.74	P:30401
ping
pong
00bye00
客户端已退出~

Process finished with exit code 0
上一篇:02 Dos命令


下一篇:java