阻塞通信之Socket编程

Socket通信,主要是基于TCP协议的通信。本文从Socket通信(代码实现)、多线程并发、以及TCP协议相关原理方面 介绍 阻塞Socket通信一些知识。

本文从服务器端的视角,以“Echo Server”程序为示例,描述服务器如何处理客户端的连接请求。Echo Server的功能就是把客户端发给服务器的数据原封不动地返回给客户端。

第一种方式是单线程处理方式:服务器的处理方法如下:

     public void service(){
while (true) {
Socket socket = null;
try {
socket = serverSocket.accept();
System.out.println("new connection accepted " + socket.getInetAddress() + ":" + socket.getPort());
BufferedReader br = getBufferReader(socket);//获得socket输入流,并将之包装成BufferedReader
PrintWriter pw = getWriter(socket);//获得socket输出流,并将之包装成PrintWriter
String msg = null;
while ((msg = br.readLine()) != null) { pw.println(echo(msg));//服务端的处理逻辑,将client发来的数据原封不动再发给client
pw.flush();
if(msg.equals("bye"))//若client发送的是 "bye" 则关闭socket
break;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try{
if(socket != null)
socket.close();
}catch(IOException e){e.printStackTrace();}
}
}
}

上面用的是while(true)循环,这样,Server不是只接受一次Client的连接就退出,而是不断地接收Client的连接。

1)第5行,服务器线程执行到accept()方法阻塞,直至有client的连接请求到来。

2)当有client的请求到来时,就会建立socket连接。从而在第8、9行,就可以获得这条socket连接的输入流和输出流。输入流(BufferedReader)负责读取client发过来的数据,输出流(PrintWriter)负责将处理后的数据返回给Client。

下面来详细分析一下建立连接的过程:

Client要想成功建立一条到Server的socket连接,其实是受很多因素影响的。其中一个就是:Server端的“客户连接请求队列长度”。它可以在创建ServerSocket对象由构造方法中的 backlog 参数指定:JDK中 backlog参数的解释是: requested maximum length of the queue of incoming connections.

    public ServerSocket(int port, int backlog) throws IOException {
this(port, backlog, null);
}

看到了这个:incoming commections 有点奇怪,因为它讲的是“正在到来的连接”,那什么又是incoming commections 呢?这个就也TCP建立连接的过程有关了。

TCP建立连接的过程可简述为三次握手。第一次:Client发送一个SYN包,Server收到SYN包之后回复一个SYN/ACK包,此时Server进入一个“中间状态”--SYN RECEIVED 状态。

这可以理解成:Client的连接请求已经过来了,只不过还没有完成“三次握手”。因此,Server端需要把当前的请求保存到一个队列里面,直至当Server再次收到了Client的ACK之后,Server进入ESTABLISHED状态,此时:serverSocket 从accpet() 阻塞状态中返回。也就是说:当第三次握手的ACK包到达Server端后,Server从该请求队列中取出该连接请求,同时Server端的程序从accept()方法中返回。

那么这个请求队列长度,就是由 backlog 参数指定。那这个队列是如何实现的呢?这个就和操作系统有关了,感兴趣的可参考:How TCP backlog works in Linux

此外,也可以看出:服务器端能够接收的最大连接数 也与 这个请求队列有关。对于那种高并发场景下的服务器而言,首先就是请求队列要足够大;其次就是当连接到来时,要能够快速地从队列中取出连接请求并建立连接,因此,执行建立连接任务的线程最好不要阻塞。

现在来分析一下上面那个:单线程处理程序可能会出现的问题:

服务器始终只有一个线程执行accept()方法接受Client的连接。建立连接之后,又是该线程处理相应的连接请求业务逻辑,这里的业务逻辑是:把客户端发给服务器的数据原封不动地返回给客户端。

显然,这里一个线程干了两件事:接受连接请求 和 处理连接(业务逻辑)。好在这里的处理连接的业务逻辑不算复杂,如果对于复杂的业务逻辑 而且 有可能在执行业务逻辑过程中还会发生阻塞的情况时,那此时服务器就再也无法接受新的连接请求了。

第二种方式是:一请求一线程的处理模式:

     public void service() {
while (true) {
Socket socket = null;
try {
socket = serverSocket.accept();//接受client的连接请求
new Thread(new Handler(socket)).start();//每接受一个请求 就创建一个新的线程 负责处理该请求
} catch (IOException e) {
e.printStackTrace();
}
finally {
try{
if(socket != null)
socket.close();
}catch(IOException e){e.printStackTrace();}
}
}
}

再来看Handler的部分实现:Handler是一个implements Runnable接口的线程,在它的run()里面处理连接(执行业务逻辑)

 class Handler implements Runnable{
Socket socket;
public Handler(Socket socket) {
this.socket = socket;
} @Override
public void run() {
try{
BufferedReader br = null;
PrintWriter pw = null;
System.out.println("new connection accepted " + socket.getInetAddress() + ":" + socket.getPort()); br = getBufferReader(socket);
pw = getWriter(socket); String msg = null;
while((msg = br.readLine()) != null){
pw.println(echo(msg));
pw.flush();
if(msg.equals("bye"))
break;
}
}catch(IOException e){
e.printStackTrace();
}
}

从上面的单线程处理模型中看到:如果线程在执行业务逻辑中阻塞了,服务器就不能接受用户的连接请求了。

而对于一请求一线程模型而言,每接受一个请求,就创建一个线程来负责该请求的业务逻辑。尽管,这个请求的业务逻辑执行时阻塞了,只要服务器还能继续创建线程,那它就还可以继续接受新的连接请求。此外,负责建立连接请求的线程 和 负责处理业务逻辑的线程分开了。业务逻辑执行过程中阻塞了,“不会影响”新的请求建立连接。

显然,如果Client发送的请求数量很多,那么服务器将会创建大量的线程,而这是不现实的。有以下原因:

1)创建线程是需要系统开销的,线程的运行系统资源(内存)。因此,有限的硬件资源 就限制了系统中线程的数目。

2)当系统中线程很多时,线程的上下文开销会很大。比如,请求的业务逻辑的执行是IO密集型任务,经常需要阻塞,这会造成频繁的上下文切换。  

3)当业务逻辑处理完成之后,就需要销毁线程,如果请求量大,业务逻辑又很简单,就会导致频繁地创建销毁线程。

那能不能重用已创建的线程? ---这就是第三种方式:线程池处理。

第三种方式是线程池的处理方式:

 public class EchoServerThreadPool {
private int port = 8000;
private ServerSocket serverSocket;
private ExecutorService executorService;
private static int POOL_SIZE = 4;//每个CPU中线程拥有的线程数 public EchoServerThreadPool()throws IOException {
serverSocket = new ServerSocket(port);
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE);
System.out.println("server start");
} public void service(){
while(true){
Socket socket = null;
try{
socket = serverSocket.accept();//等待接受Client连接
executorService.execute(new Handler(socket));//将已经建立连接的请求交给线程池处理
}catch(IOException e){
e.printStackTrace();
}
}
}
public static void main(String[] args)throws IOException{
new EchoServerThreadPool().service();
}
}

采用线程池最大的优势在于“重用线程”,有请求任务来了,从线程池中取出一个线程负责该请求任务,任务执行完成后,线程自动归还到线程池中,而且java.util.concurrent包中又给出了现成的线程池实现。因此,这种方式看起来很完美,但还是有一些问题是要注意的:

1)线程池有多大?即线程池里面有多少个线程才算比较合适?这个要根据具体的业务逻辑来分析,而且还得考虑面对的使用场景。一个合理的要求就是:尽量不要让CPU空闲下来,即CPU的复用率要高。如果业务逻辑是经常会导致阻塞的IO操作,一般需要设置 N*(1+WT/ST)个线程,其中N为可用的CPU核数,WT为等待时间,ST为实际占用CPU运算时间。如果业务逻辑是CPU密集型作业,那么线程池中的线程数目一般为N个或N+1个即可,因为太多了会导致CPU切换开销,太少了(小于N),有些CPU核就空闲了。

2)线程池带来的死锁问题

线程池为什么会带来死锁呢?在JAVA 1.5 之后,引入了java.util.concurrent包。线程池则可以通过如下方式实现:

ExecutorService executor = Executors.newSingleThreadExecutor();
//ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(task);// task implements Runnable executor.shutdown();

Executors可以创建各种类型的线程池。如果创建一个缓存的线程池:

ExecutorService executor = Executors.newCachedThreadPool();

对于高负载的服务器而言,在缓存线程池中,被提交的任务没有排成队列,而是直接交给线程执行。也就是说:只要来一个请求,如果线程池中没有线程可用,服务器就会创建一个新的线程。如果线程已经把CPU用完了,此时还再创建线程就没有太大的意义了。因此,对于高负载的服务器而言,一般使用的是固定数目的线程池(来自Effective Java)

主要有两种类型的死锁:①线程A占有了锁X,等待锁Y,而线程B占用了锁Y,等待锁X。因此,向线程池提交任务时,要注意判断:提交了的任务(Runnable对象)会不会导致这种情况发生?

②线程池中的所有线程在执行各自的业务逻辑时都阻塞了,它们都需要等待某个任务的执行结果,而这个任务还在“请求队列”里面未提交!

3)来自Client的请求实在是太多了,线程池中的线程都用完了(已无法再创建新线程)。此时,服务器只好拒绝新的连接请求,导致Client抛出:ConnectException。

4)线程泄露

导致线程泄露的原因也很多,而且还很难发觉,网上也有很多善于线程池线程泄露的问题。比如说:线程池中的线程在执行业务逻辑时抛异常了,怎么办?是不是这个工作线程就异常终止了?那这样,线程池中可用的线程数就少了一个了?看一下JDK ThreadPoolExecutor 线程池中的线程执行任务的过程如下:

       try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}

从上面源码看出:线程执行出异常后是由 afterExecute(task, thrown) 来处理的。至于对线程有何影响,我也没找到很好的解释。

另外一种引起线程泄露的情况就是:线程池中的工作线程在执行业务逻辑时,一直阻塞下去了。那这也意味着这个线程基本上不干活了,这就影响了线程池中实际可用的线程数目。如何所有的线程都是这种情况,那也无法向线程池提交任务了。此外,关于线程池带来的问题还可参考:Java编程中线程池的风险规避 另外, 关于JAVA线程池使用可参考下:Java的Executor框架和线程池实现原理

到这里,阻塞通信的三种模式都已经介绍完毕了。在网上发现了一篇很好的博文,刚好可以配合我这篇文章的代码演示一起来看:架构设计:系统间通信(1)——概述从“聊天”开始上篇

TCP连接 对 应用层协议(比如 HTTP协议)会产生哪些影响?

主要从以下几个方面描述TCP协议对应用层协议的影响:(结合JAVA网络编程中的 具体SOcket类的 相关参数分析)

1)最大段长度MSS

TCP协议是提供可靠连接的,在建立连接的过程中,会协商一些参数,比如MSS。TCP传输的数据是流,把流截成一段段的报文进行传输,MSS是 每次传输TCP报文的 最大数据分段。

为什么需要MSS呢?如果传输的报文太大,则需要在IP层进行分片,分成了若干片的报文在传输过程中任何一片丢失了,整个报文都得重传。重传直接影响了网络效率。因此,在建立连接时就协商(SYN包)底层的数据链路层最大能传递多大的报文(比如以太网的MTU=1500),然后在传输层(TCP)就对数据进行分段,尽量避免TCP传输的数据在IP层分片。

另外,关于MSS可参考:【网络协议】TCP分段与IP分片 和 IP分片详解

而对于上层应用而言(比如HTTP协议),它只管将数据写入缓冲区,但其实它写入的数据在TCP层其实是被分段发送的。当目的主机收到所有的分段之后,需要重组分段。因此,就会出现所谓的HTTP粘包问题。

2)TCP连接建立过程的“三次握手”

“三次握手”的大概流程如下:

Client发送一个SYN包,Server返回一个SYN/ACK包,然后Client再对 SYN/ACK 包进行一次确认ACK。在对 SYN/ACK 进行确认时,Client就可以向Server端 发送实际的数据了。这种利用ACK确认时顺带发送数据的方式 可以 减少 Client与Server 之间的报文交换。

3)TCP“慢启动”的拥塞控制

什么是“慢启动”呢?因为TCP连接是可靠连接,具有拥塞控制的功能。如果不进行拥塞控制,网络拥堵了导致容易丢包,丢包又得重传,就很难保证可靠性了。

而“慢启动”就是实现 拥塞控制 的一种机制。也就是说:对于建立的TCP连接而言,它不能立马就发送很多报文,而是:先发送 1个报文,等待对方确认;收到确认后,就可以一次发送2个报文了,再等待对方确认;收到确认后,就一次可以发送4个报文了.....每次可发送的报文数依次增加(指数级增加,当然不会一直增加下去),这个过程就是“打开拥塞窗口”。

那这个慢启动特性有何影响呢?

一般而言,就是“老的”TCP连接 比 新建立的 TCP连接有着更快的发送速度。因为,新的TCP连接有“慢启动”啊。而“老的”TCP连接可能一次允许发送多个报文。

因此,对于HTTP连接而言,选择重用现有连接既可以减少新建HTTP连接的开销,又可以重用老的TCP连接,立即发送数据。

HTTP重用现有的连接,在HTTP1.0的 Connection头部设置"Keep-Alive"属性。在HTTP1.1版本中,默认是打开持久连接的,可参考HTTP1.1中的 persistent 参数。

4)发送数据时,先收集待发送的数据,让发送缓冲区满了之后再发送的Nagle算法

对于一条Socket连接而言,发送方有自己的发送缓冲区。在JAVA中,由java.net.SocketOptions类的 SO_SNFBUF 属性指定。可以调用setSendBufferSize方法来设置发送缓冲区(同理接收缓冲区)

public synchronized void setSendBufferSize(int size)
throws SocketException{
if (!(size > 0)) {
throw new IllegalArgumentException("negative send size");
}
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.SO_SNDBUF, new Integer(size));
}

那什么是Negle算法呢?

假设每次发送的TCP分段只包含少量的有效数据(比如1B),而TCP首部加上IP首部至少有40B,每次为了发送1B的数据都要带上一个40B的首部,显然网络利用率是很低的。

因为,Negle算法就是:发送方的数据不是立即就发送,而是先放在缓冲区内,等到缓冲区满了再发送(或者所发送的所有分组都已经返回了确认了)。说白了,就是先把数据“聚集起来”,分批发送。

Negale算法对上层应用会有什么影响呢?

对小批量数据传输的时延影响很大。比如 网络游戏 中的实时捕获 玩家的位置。玩家位置变了,也许只有一小部分数据发送给 服务器,若采用Negale算法,发送的数据被缓冲起来了,服务器会迟迟接收不到玩家的实时位置信息。因此,Negale算法适合于那种大批量数据传输的场景。

因此,SocketOptions类的 TCP_NODELAY 属性用来设置 在TCP连接中是否启用 Negale算法。

    public void setTcpNoDelay(boolean on) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
getImpl().setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on));
}

5)在发送数据时捎带确认的延迟确认算法

比如,Server在接收到了Client发送的一些数据,但是Server并没有立即对这些数据进行确认。而是:当Server有数据需要发送到Client时,在发送数据的同时 捎带上 对前面已经接收到的数据的确认。(这其实也是尽量减少Server与Client之间的报文量,毕竟:每发一个报文,是有首部开销的。)

这种方式会影响到上层应用的响应性。可能会对HTTP的请求-响应模式产生很大的时延。

6)TCP的 KEEP_ALIVE

这个在JDK源码中解释的非常好了。故直接贴上来:

    /**
* When the keepalive option is set for a TCP socket and no data
* has been exchanged across the socket in either direction for
* 2 hours (NOTE: the actual value is implementation dependent),
* TCP automatically sends a keepalive probe to the peer. This probe is a
* TCP segment to which the peer must respond.
* One of three responses is expected:
* 1. The peer responds with the expected ACK. The application is not
* notified (since everything is OK). TCP will send another probe
* following another 2 hours of inactivity.
* 2. The peer responds with an RST, which tells the local TCP that
* the peer host has crashed and rebooted. The socket is closed.
* 3. There is no response from the peer. The socket is closed.
*
* The purpose of this option is to detect if the peer host crashes.
*
* Valid only for TCP socket: SocketImpl

当TCP连接设置了KEEP-ALIVE时,如果这条socket连接在2小时(视情况而定)内没有数据交换,然后就会发一个“探测包”,以判断对方的状态。

然后,等待对方发送这个探测包的响应。一共会出现以上的三种情况,并根据出现的情况作出相应的处理。

①对方(peer)收到了正常的 ACK,说明一切正常,上层应用并不会注意到这个过程(发送探测包的过程)。再等下一个2个小时时继续探测连接是否存活。

②对方返回一个RST包,表明对方已经crashed 或者 rebooted,socket连接关闭。

③未收到对方的响应,socket连接关闭。

这里需要注意的是:在HTTP协议中也有一个KEEP-ALIVE,可参考:HTTP长连接

7)TCP连接关闭时的影响

TCP关闭连接有“四次挥手”,主动关闭连接的一方会有一个 TIME_WAIT 状态。也就是说,在Socket的close()方法执行后,close()方法立即返回了,但是底层的Socket连接并不会立即关闭,而是会等待一段时间,将剩余的数据都发送完毕再关闭连接。可以用SocketOptions的 SO_LINGER 属性来控制sockect的关闭行为。

看JDK中 SO_LINGER的解释如下:

    /**
* Specify a linger-on-close timeout. This option disables/enables
* immediate return from a <B>close()</B> of a TCP Socket. Enabling
* this option with a non-zero Integer <I>timeout</I> means that a
* <B>close()</B> will block pending the transmission and acknowledgement
* of all data written to the peer, at which point the socket is closed
* <I>gracefully</I>. Upon reaching the linger timeout, the socket is
* closed <I>forcefully</I>, with a TCP RST. Enabling the option with a
* timeout of zero does a forceful close immediately. If the specified
* timeout value exceeds 65,535 it will be reduced to 65,535.
* <P>
* Valid only for TCP: SocketImpl
*
* @see Socket#setSoLinger
* @see Socket#getSoLinger
*/
public final static int SO_LINGER = 0x0080;

因此,当调用Socket类的 public void setSoLinger(boolean on, int linger)设置了 linger 时间后,执行 close()方法不会立即返回,而是进入阻塞状态。

然后,Socket会 等到所有的数据都已经确认发送了 peer 端。(will block pending the transmission and acknowledgement of all data written to the peer)【第四次挥手时client 发送的ACK到达了Server端】

或者:经过了 linger 秒之后,强制关闭连接。( Upon reaching the linger timeout, the socket is closed forcefully)

那为什么需要一个TIME_WAIT时延呢?即:执行 close()方法 时需要等待一段时间再 真正关闭Socket?这也是“四次挥手”时,主动关闭连接的一方会 持续 TIME_WAIT一段时间(一般是2MSL大小)

①确保“主动关闭端”(Client端)最后发送的ACK能够成功到达“被动关闭端”(Server端)

因为,如何不能确保ACK是否成功到达Server端的话,会影响Server端的关闭。假设最后第四次挥手时 Client 发送给 Server的ACK丢失了,若没有TIME_WAIT,Server会认为是自己FIN包没有成功发送给Client(因为Server未收到ACK啊),就会导致Server重传FIN,而不能进入 closed 状态。

②旧的TCP连接包会干扰新的TCP连接包,导致新的TCP连接收到的包乱序。

若没有TIME_WAIT,本次TCP连接(为了更好的阐述问题,记本次TCP连接为TCP_连接1)断开之后,又立即建立新的一条TCP连接(TCP_连接2)。

TCP_连接1 发送的包 有可能在网络中 滞留了。而现在又新建了一条 TCP_连接2 ,如果滞留的包(滞留的包是无效的包了,因为TCP_连接1已经关闭了) 又 重新到达了 TCP_连接2,由于 滞留的包的(源地址,源端口,目的地址,目的端口)与 TCP_连接2 中发送的包 是一样的,因此会干扰 TCP_连接2 中的包(序号)。

如果有TIME_WAIT,由于TIME_WAIT的长度是 2MSL。因此,TCP_连接1中的滞留的包,经过了2MSL时间之后,已经失效了。就不会干扰新的TCP_连接2了。

此外,这也是为什么在Linux中,你Kill了某个连接进程之后,又立即重启连接进程,会报 端口占用错误,因为在底层,其实它的端口还未释放。

上一篇:json格式化工具


下一篇:python wheel 包命名规则和 abi 兼容