多线程设计模式-两阶段结束设计模式(Two Phase Termination)

两个阶段结束设计模式(Two Phase Termination)

线程一般结束后,线程中的数据还存在,需要释放,这样的情况为两个阶段,也就是目前所说的两阶段结束线程设计模式

线程统计数据

package com.ln.concurrent;

import java.util.Random;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent
 * @Name:CounterIncrement
 * @Author:linianest
 * @CreateTime:2021/1/4 11:39
 * @version:1.0
 * @Description TODO: 线程统计数据
 */
public class CounterIncrement extends Thread {

    private volatile boolean terminated = false;

    private int counter = 0;
    private static final Random random = new Random(System.currentTimeMillis());

    @Override
    public void run() {

        try {
            while (!terminated) {
                System.out.println(Thread.currentThread().getName() + " " + counter++);
                Thread.sleep(random.nextInt(1_000));
            }
        } catch (InterruptedException e) {
//            e.printStackTrace();
        } finally {
            this.clean();
        }
    }

    private void clean() {
        System.out.println("do some clean work for the second phase.current counter=" + counter);
    }

    public void close() {
        this.terminated = true;
        this.interrupt();
    }
}

两阶段终结线程设计模式

package com.ln.concurrent.chapter15;

import com.ln.concurrent.CounterIncrement;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter15
 * @Name:CounterTest
 * @Author:linianest
 * @CreateTime:2021/1/4 11:46
 * @version:1.0
 * @Description TODO: 两阶段终结线程设计模式
 */
public class CounterTest {
    public static void main(String[] args) throws InterruptedException {
        CounterIncrement counterIncrement = new CounterIncrement();
        counterIncrement.start();
        Thread.sleep(10_000L);
        counterIncrement.close();
    }
}

案例:请求后台数据,后台出现异常,终结线程并释放请求的线程资源,会用到前面学的Thread-pre-Message模式,可以通过telnet测试

请求后台数据,后台出现异常,终结线程并释放请求的线程资源,会用到前面学的Thread-pre-Message模式

package com.ln.concurrent.chapter16;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter15
 * @Name:AppServer
 * @Author:linianest
 * @CreateTime:2021/1/4 11:50
 * @version:1.0
 * @Description TODO: 请求后台数据,后台出现异常,终结线程并释放请求的线程资源,会用到前面学的Thread-pre-Message模式
 */
public class AppServer extends Thread {
    private int port;
    private static final int DEFULT_PORT = 12722;
    private volatile boolean start = true;
    private List<ClientHandle> clientHandlers = new ArrayList<>();
    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    private ServerSocket server;

    public AppServer() {
        this(DEFULT_PORT);
    }

    public AppServer(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        try {
            this.server = new ServerSocket(port);
            while (start) {
                Socket client = this.server.accept();
                ClientHandle clientHandler = new ClientHandle(client);
                executor.submit(clientHandler);
                this.clientHandlers.add(clientHandler);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            this.dispose();
        }
    }

    /**
     * 释放资源
     */
    private void dispose() {
        this.clientHandlers.stream().forEach(ClientHandle::stop);
        this.executor.shutdown();
    }

    public void shutdown() throws IOException {
        this.start = false;
        this.server.close();
        this.interrupt();
    }
}

客户端

package com.ln.concurrent.chapter16;

import java.io.*;
import java.net.Socket;
import java.util.stream.Stream;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter16
 * @Name:ClientHandle
 * @Author:linianest
 * @CreateTime:2021/1/4 12:07
 * @version:1.0
 * @Description TODO: 客户端
 */
public class ClientHandle implements Runnable {
    private final Socket socket;
    private volatile boolean running = true;

    public ClientHandle(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        /**
         * 使用try resource方式,出现异常,数据流关闭
         */
        try (InputStream inputStream = socket.getInputStream();
             OutputStream outputStream = socket.getOutputStream();
             final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
             final PrintWriter printWriter = new PrintWriter(outputStream);) {
            while (running) {
                final String message = reader.readLine();
                if (null == message) {
                    break;
                }
                System.out.println("Come from client >" + message);
                printWriter.write("echo " + message + "\n");
                // 将数据刷到管道中
                printWriter.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
            this.running = false;
        } finally {
            this.stop();
        }

    }

    public void stop() {
        if (running) {
            return;
        }
        this.running = false;
        try {
            this.socket.close();
        } catch (IOException e) {
//            e.printStackTrace();
        }
    }
}

测试socket server连接

package com.ln.concurrent.chapter16;

import java.io.IOException;

/**
 * @ProjectName: java-concurrency
 * @Package: com.ln.concurrent.chapter16
 * @Name:AppServerClient
 * @Author:linianest
 * @CreateTime:2021/1/4 13:08
 * @version:1.0
 * @Description TODO: 测试socket server连接
 */
public class AppServerClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        final AppServer server = new AppServer(13345);
        server.start();

        Thread.sleep(45_000L);
        server.shutdown();
    }
}

上一篇:线程池


下一篇:一个科长能收396万回扣,耗材集采降价空间有多大?