两个阶段结束设计模式(Two Phase Termination)
线程一般结束后,线程中的数据还存在,需要释放,这样的情况为两个阶段,也就是目前所说的两阶段结束线程设计模式
线程统计数据
package com.ln.concurrent;
import java.util.Random;
/**
* @ProjectName: java-concurrency
* @Package: com.ln.concurrent
* @Name:CounterIncrement
* @Author:linianest
* @CreateTime:2021/1/4 11:39
* @version:1.0
* @Description TODO: 线程统计数据
*/
public class CounterIncrement extends Thread {
private volatile boolean terminated = false;
private int counter = 0;
private static final Random random = new Random(System.currentTimeMillis());
@Override
public void run() {
try {
while (!terminated) {
System.out.println(Thread.currentThread().getName() + " " + counter++);
Thread.sleep(random.nextInt(1_000));
}
} catch (InterruptedException e) {
// e.printStackTrace();
} finally {
this.clean();
}
}
private void clean() {
System.out.println("do some clean work for the second phase.current counter=" + counter);
}
public void close() {
this.terminated = true;
this.interrupt();
}
}
两阶段终结线程设计模式
package com.ln.concurrent.chapter15;
import com.ln.concurrent.CounterIncrement;
/**
* @ProjectName: java-concurrency
* @Package: com.ln.concurrent.chapter15
* @Name:CounterTest
* @Author:linianest
* @CreateTime:2021/1/4 11:46
* @version:1.0
* @Description TODO: 两阶段终结线程设计模式
*/
public class CounterTest {
public static void main(String[] args) throws InterruptedException {
CounterIncrement counterIncrement = new CounterIncrement();
counterIncrement.start();
Thread.sleep(10_000L);
counterIncrement.close();
}
}
案例:请求后台数据,后台出现异常,终结线程并释放请求的线程资源,会用到前面学的Thread-pre-Message模式,可以通过telnet测试
请求后台数据,后台出现异常,终结线程并释放请求的线程资源,会用到前面学的Thread-pre-Message模式
package com.ln.concurrent.chapter16;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ProjectName: java-concurrency
* @Package: com.ln.concurrent.chapter15
* @Name:AppServer
* @Author:linianest
* @CreateTime:2021/1/4 11:50
* @version:1.0
* @Description TODO: 请求后台数据,后台出现异常,终结线程并释放请求的线程资源,会用到前面学的Thread-pre-Message模式
*/
public class AppServer extends Thread {
private int port;
private static final int DEFULT_PORT = 12722;
private volatile boolean start = true;
private List<ClientHandle> clientHandlers = new ArrayList<>();
private final ExecutorService executor = Executors.newFixedThreadPool(10);
private ServerSocket server;
public AppServer() {
this(DEFULT_PORT);
}
public AppServer(int port) {
this.port = port;
}
@Override
public void run() {
try {
this.server = new ServerSocket(port);
while (start) {
Socket client = this.server.accept();
ClientHandle clientHandler = new ClientHandle(client);
executor.submit(clientHandler);
this.clientHandlers.add(clientHandler);
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
this.dispose();
}
}
/**
* 释放资源
*/
private void dispose() {
this.clientHandlers.stream().forEach(ClientHandle::stop);
this.executor.shutdown();
}
public void shutdown() throws IOException {
this.start = false;
this.server.close();
this.interrupt();
}
}
客户端
package com.ln.concurrent.chapter16;
import java.io.*;
import java.net.Socket;
import java.util.stream.Stream;
/**
* @ProjectName: java-concurrency
* @Package: com.ln.concurrent.chapter16
* @Name:ClientHandle
* @Author:linianest
* @CreateTime:2021/1/4 12:07
* @version:1.0
* @Description TODO: 客户端
*/
public class ClientHandle implements Runnable {
private final Socket socket;
private volatile boolean running = true;
public ClientHandle(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
/**
* 使用try resource方式,出现异常,数据流关闭
*/
try (InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
final BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
final PrintWriter printWriter = new PrintWriter(outputStream);) {
while (running) {
final String message = reader.readLine();
if (null == message) {
break;
}
System.out.println("Come from client >" + message);
printWriter.write("echo " + message + "\n");
// 将数据刷到管道中
printWriter.flush();
}
} catch (IOException e) {
e.printStackTrace();
this.running = false;
} finally {
this.stop();
}
}
public void stop() {
if (running) {
return;
}
this.running = false;
try {
this.socket.close();
} catch (IOException e) {
// e.printStackTrace();
}
}
}
测试socket server连接
package com.ln.concurrent.chapter16;
import java.io.IOException;
/**
* @ProjectName: java-concurrency
* @Package: com.ln.concurrent.chapter16
* @Name:AppServerClient
* @Author:linianest
* @CreateTime:2021/1/4 13:08
* @version:1.0
* @Description TODO: 测试socket server连接
*/
public class AppServerClient {
public static void main(String[] args) throws IOException, InterruptedException {
final AppServer server = new AppServer(13345);
server.start();
Thread.sleep(45_000L);
server.shutdown();
}
}