SocketServer服务端
import java.io.*; import java.net.*; public class SocketServer extends Thread { private int port = 8899; private ServerSocket serverSocket; private ThreadPool threadPool;// 线程池 private final int POOL_SIZE = 1; public SocketServer() throws IOException { serverSocket = new ServerSocket(port); threadPool = new ThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println("服务器启动了"); } public void run() { while (true) { Socket socket = null; try { socket = serverSocket.accept(); threadPool.execute(new Handler(socket)); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String args[]) throws IOException { new SocketServer().start(); } class Handler implements Runnable { private Socket socket; public Handler(Socket socket) { this.socket = socket; } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut, true); } private BufferedReader getReader(Socket socket) throws IOException { InputStream socketIn = socket.getInputStream(); return new BufferedReader(new InputStreamReader(socketIn)); } public String echo(String msg) { return msg; } @Override public void run() { try { System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort()); char[] data = new char[1024 * 1024]; BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; while (true) { int len = br.read(data); msg = String.valueOf(data, 0, len==-1?0:len); // 接收一个字符串数据 if (len == -1 || msg.equalsIgnoreCase("bye")) { System.out.println("客户端发出中断请求"); pw.println("服务器已经关闭本次连接12"); pw.flush(); pw.close(); br.close(); break; } System.out.println(msg); pw.println(msg); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (socket != null) socket.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
ThreadPool线程池
import java.util.LinkedList; public class ThreadPool extends ThreadGroup { private boolean isClosed = false;// 线程池是否关闭 private LinkedList<Runnable> workQueue;// 表示工作队列 private static int thredPoolID; // 表示线程池id private int threadID; // 表示工作线程id public ThreadPool(int poolSize) {//poolSize 指定线程池中的工作线程数目 super("ThreadPool" + (thredPoolID++)); System.out.println("线程池id"+poolSize); setDaemon(true); workQueue = new LinkedList<Runnable>();// 创建工作队列 for (int i = 0; i < poolSize; i++) { new WorkThread().start(); } } public synchronized void execute(Runnable task){ if(isClosed){ throw new IllegalStateException(); } if(task != null){ workQueue.add(task); notify(); } } protected synchronized Runnable getTask() throws InterruptedException{ while(workQueue.size() == 0){ if(isClosed)return null; wait(); } return workQueue.removeFirst(); } public synchronized void close(){ if(!isClosed){ isClosed = true; workQueue.clear(); interrupt(); } } public void join(){ synchronized (this) { isClosed = true; notifyAll(); } Thread[] threads = new Thread[activeCount()]; int count = enumerate(threads); for (int i = 0; i < count; i++) { try{ threads[i].join(); }catch (InterruptedException ex) { } } } private class WorkThread extends Thread{ public WorkThread(){ super(ThreadPool.this,"WorkThread"+(threadID++)); System.out.println("工作线程id"+threadID); } public void run(){ while(!isInterrupted()){ Runnable task = null; try{ task = getTask(); }catch (InterruptedException ex) { } if(task == null) return; try{ task.run(); }catch (Throwable e) { e.printStackTrace(); } } } } }