本文章纯属个人经验总结,伪代码也是写文章的时候顺便白板编码的,可能有逻辑问题,请帮忙指正,谢谢。
Internet(全球互联网)是无数台机器基于TCP/IP协议族相互通信产生的。TCP/IP协议族分了四层实现,链路层、网络层、传输层、应用层。
与我们应用开发者接触最多的应该是应用层了,例如web应用普遍使用HTTP协议,HTTP协议帮助我们开发者做了非常多的事情,通过HTTP足以完成大部分的通信工作了,但是有时候会有一些特殊的场景出现,使得HTTP协议并不能得心应手的完成工作,这个时候就需要我们寻找其他的应用层协议去适应场景了。
在项目启动初期就要基于业务场景和运行环境选择适当的传输协议,例如常见的发布/订阅场景也就是推送业务可以使用MQTT等协议,文件传输可以用FTP等协议。不过我们这次要说的不是如何选择通讯协议,而是如何自己实现一套自定的通讯协议。
上面啰嗦了那么多,现在开始步入正题了
正文
实现自定义的应用层协议,也就是意味着要针对传输层协议进行开发,传输层有TCP、UDP两种协议,两者的区别和适用场景请自行seach,TCP传输具有可靠性,UDP传输不管数据是否送达,一般选择TCP,这篇文章也是讲的TCP方式。
上面说过了TCP/IP是一种协议,也就是一种约定的东西,那怎么针对这个约定编程呢?其实操作系统已经做了这件事了,并且很有风度的为我们提供了方便的使用方式(Socket API),也就是我们常说的Socket。用C/C++可以直接调用操作系统的API进行操作,JAVA等需要虚拟机的语言可调用SDK提供的API进行开发。
嗯不说没用的了,相信很多人已经不耐烦了。我们先来讲讲套路。用到代码的地方用伪代码描述,伪代码可以方便的传递思想逻辑等,嗯,好吧其实是懒(写博文新手询问:此处是不是需要卖个萌?)。
真正正文
常见的坑
1、收到的信息不完整,或者比预期更多(半包,粘包)
2、BIO读写阻塞导致线程挂起
3、物理链路意外断开,程序不能发觉异常导致挂起
4、多线程共享同一socket导致数据错乱
5、长时间占用大量空闲socket
使用阻塞式的Socket通信有一些弊端,因此我总结了几个套路,应该也是一个通讯中间件应该做的事情。
常用套路
1、制定消息格式(解决半粘包)
2、规定通讯工作流程(合理使用队列、线程池、连接池)
3、加入心跳检测机制(解决异常断开导致连接不可用)
4、加入链接回收机制(按空闲或超时时间等规则终止链接)
5、异常处理(不可复用异常发生时及时关闭连接)
6、即时重发(连接不可用时,即时选用另一条连接重发)
我们针对每一步套路进行构思设计,充分考虑其中潜在的问题和可扩展性。
1、制定消息格式(报文)
说到通讯,就必须要有协议,就像人或者其他动物沟通一样,人有语言规则,其他动物也有他们的语言规则,机器沟通也需要“语言”规则。
制定报文就是制定机器的语言规则。机器沟通的目的无非就是:获取数据、发送数据、指令,这两者都是一方发起请求,一方处理并响应。首先,机器不是人,他们不是那么智能,不能理解你啥时候能说完话,所以要让机器B知道机器A的数据发送结束了没有,就需要让他们先约定好,说话前,先告诉对方要说多少内容。然后机器A先告诉机器B,我要说一句话(一行),或者是我要说10个字母(10字节),这个时候机器B就可以根据机器A告知的长度去接受机器A的消息了,可解决半包粘包问题。
上面说到了他们要先约定好的一件事情,就是要告诉对方这次要说多少内容。这个约定的规则之一,就是消息长度。
既然有长度这一个信息了,有一就有二,我们顺便约定个其他的东西吧。比如,我这次要找你获取数据,还是发送给你一点数据,还是要让你执行一个指令还是其他事情。那么这个约定的规则中又加入一个信息,就是行为标识。
消息长度、行为标识、都是一条信息的基本属性,那么还有其他属性吗?当然有,这个就看想把这份规则制定的多详细了,不过也不是越长越好,而是在能解决基本事情后,越简单越好,毕竟东西多了,一是解析慢,二是数据包也会变大。
说到这里大家应该也都明白了报文应该怎么制定,下面给出一个简单的Socket通讯基本报文格式,大家参考一下。可同时用于请求和响应。
顺序
|
字段名 | 长度(字节) |
字段类型
|
描述
|
1
|
消息长度 |
4(32bit)
|
int | socket报文的长度最长2^31-1字节,大文件传输不使用此字段 |
2
|
行为标识
|
1(8bit)
|
byte
|
用于分支处理数据1字节可标识256种行为,一般够用
|
3
|
加密标识 |
1(8bit)
|
byte
|
区分加密方式0不加密
|
4
|
时间戳 |
8(64bit)
|
long | 消息时间戳,其实也没啥用,加着玩的,忽视掉吧 |
5
|
消息体
|
|
String
|
长度为消息长度-10字节,建议使用json,具体解析行为由行为标识字段定义
|
2、规定通信工作流程
因为BIO通讯不是那么灵活,所以我建议使用的时候一个Socket连接同时只被一个线程操作,并且同一个ServerSocket只做主动请求或者只做被动接收,这样能减少网络因素带来的一些乱七八糟我也不知道会变成啥玩意的¥@U!%1#% fa23 &%3 9&+……事情发生。其实也可以用消息分包或对socket对象加锁来用于多线程使用,但我是懒得去处理这样的事情,没必要嘛(好吧,其实还是懒)。
工作流程如下:
1、主动端发送数据,发送完后进入读取状态,等待响应。
2、被动端线程阻塞等待数据,读取到长度等前14个字节后进行初步解析,并根据行为标识或加密标识等字段进行处理,处理结束后,响应一个报文,然后继续等待数据。
代码思路如下:
关于性能方面可以 使用队列+线程池+连接池相互配合,这次先不讨论这些,想要讨论的可以私信我或评论,一起讨论。
1、基本封装
/** 消息包(报文) **/
class SocketPackage {
int length;// 长度
byte action;// 行为标识
byte encryption;// 加密标识
long timestamp;// 时间戳
String data;// 消息体 /** TODO:将此消息包转换为适当的byte数组 **/
byte[] toBytes() { byte[] lengthBytes = int2bytes(length);
// ...将各个字段都做了转换成bytes的操作后,合并byte数组并返回
} /** TODO:读取输入流转换成一个消息包 **/
static SocketPackage parse(InputStream in) throws IOException {
SocketPackage sp = new SocketPackage();
byte[] lengthBytes = new byte[4];
in.read(lengthBytes);// 未收到信息时此步将会阻塞
sp.length = bytes2int(lengthBytes);
// .....其他字段读取就不写了,这里要控制好异常,不要随意catch住,如果发生异常,不是socket坏了就是报文异常了,应当采用拒绝连接的形式向对方跑出异常
} }
/** 封装下socket,使其可以保存更多的连接信息,不要纠结名字,我纠结了好一会儿不知道怎么命名,反正是伪代码,就这样写着吧 **/
class NiuxzSocket {
Socket socket;
volatile long lastUse;// 上次使用时间
// ...这里还可以再加其他属性,比如是否是写状态,写操作开始时间,上次非心跳包时间等 NiuxzSocket(Socket socket) {
this.socket = socket;
this.lastUse = System.currentTimeMillis();
} InputStream getIn() {
return socket.getInputStream();
} void write(byte[] bytes) throws IOException {
this.socket.getOutputStream().write(bytes);
}
}
2、主动端:
主动端的核心是连接池SocketPool和SocketClient服务
大概流程是调用SocketClient发送数据包,SocketClient从连接池中获取一个可用连接,如果没有可用连接,就创建一个。SocketClient根据业务类型或消息类型分别对NiuxzSocket进行操作。
/** 封装一个发送信息的接口,提供常用的发送信息方法。 **/
interface SocketClient {
SocketPackage sendData(SocketPackage sp);// 发送一个消息包,并等待返回的消息包
// TODO:还可以根据双方的业务和协议添加几个更方便使用的接口方法。比如只返回消息体字段,或者直接返回json内容的 void sendHeartBeat(NiuxzSocket socket);// 发送一个心跳包,这个方法后面讲心跳包时会用到
} class DefaultSocketClient implements SocketClient {
SocketPool socketPool;// 先假装有一个socket连接池,用来管理socket。不使用连接池的话,在这里直接注入一个NiuxzSocket就可以了。下面代码中也直接使用socket,但是一定要在使用时进行加锁操作。否则就会造成多线程访问同一个socket导致数据错乱了。 /** 此方法就是主动端工作入口了,业务代码可以直接调用这里进行发送数据 **/
SocketPackage sendData(SocketPackage sp){
NiuxzSocket niuxzSocket = socketPool.get();//获取一个socket,这里可以看到获取的socket并不是原生的socket,其实是我们自己封装后的socket
try{
niuxzSocket.write(sp.toBytes());//阻塞持续写到缓存中
niuxzSocket.lastUse = System.currentTimeMillis();//根据业务方法更新socket的状态信息
SocketPackage sp = SocketPackage.parse(niuxzSocket.getIn());//阻塞读,等待消息的返回,因为是单线程操作socket所以不存在消息插队的情况。
return sp;
}catch(Exception e){
LOG.error("发送消息包失败",e);
socketPool.destroy(niuxzSocket)
//在发生不可复用的异常时才关闭socket,并销毁这个NiuxzSocke。不可复用异常意思是IO操作到了一半不知道具体到哪了所以整个socket都不可用了。
}
finally{
if(socketPool!=null){
socketPool.recycle(niuxzSocket );//使用完这个socket后我们不要关闭,因为还要复用,让连接池回收这个socket。recycle内要判断socket是否是销毁状态。
}
}
}
}
/** 定义一个连接池接口SocketPool **/
interface SocketPool {
/** 获取一个连接 **/
NiuxzSocket get(); /** 回收Socket **/
void recycle(NiuxzSocket ns); /** 销毁Socket **/
void destroy(NiuxzSocket ns);
} /** 实现连接池 **/
class DefaultSocketPool implements SocketPool {
BlockingQueue<NiuxzSocket> sockets;// 存放socket的容器,也可以使用数组 NiuxzSocket get() {
// TODO:池里有就获取,没有就开一个线程去创建 并且等待创建完成,可使用synchronized/wait或Lock/condition
}
// TODO:实现socketPool,实现连接池是属于性能可靠性优化,要做的事情会比较多。偷个懒,大家懂就好,具体实现,等有时间我把我的连接池代码整理后再写一篇文章,有想了解的可以给我评论讨论下。
}
3、被动端
被动端的核心是NiuxzServer和Worker和SocketHandler
大概流程是开启端口等待连接、接受连接创建线程、达到线程最大数,拒绝连接、连接进入开始读取数据、读取到数据后进行分支处理,处理完后把结果响应到主动端,完成一次交互。继续读取。
/**开启一个ServerSocket并等待连接,联入后开启一个线程进行处理**/
class NiuxzServer{
ServerSocket serverSocket;
HashMap<NiuxzSocket> sockets = new HashMap<NiuxzSocket>();
public static AtomicInteger workerCount = 0;
public Object waitLock = new Object();
int maxWorkerCount = 100;//允许100个连接进入
int port;//配置一个端口号 /**工作入口**/
void work(){
serverSocket = new ServerSocket(port);
while(true){
Socket socekt = serverSocket.accept();//阻塞等待连接
NiuxzSocket niuxzSocket = new NiuxzSocket(socket);
sockets.put(niuxzSocket ,1);//将连接放入map中
Worker worker = new Worker(niuxzSocket );//创建一个工作线程
worker.start();//开始线程
while(true){
if(workerCount.incrementAndGet()>=maxWorkerCount){//如果超过了规定的最大线程数,就进入等待,等待其他连接销毁
synchronized(waitLock){
if(workerCount.incrementAndGet()>=maxWorkerCount){//double check 确定进入等待前没有正在断开的socket
waitLock.wait();
}else{
break;
}
}
}else{
break;
}
}
}
} /**销毁一个连接**/
void destroy(NiuxzSocket socket){
synchronized(waitLock){
sockets.remove(socket);//从池子里删除
workerCount.decrementAndGet();//当前连接数减一
waitLock.notify();//通知work方法 可以继续接受请求了
}
} /**创建一个工作者线程类,处理连入的socket**/
class Worker extends Thread{
HashMap<Integer,SocketHandler> handlers;//针对每种行为标识做的消息处理器。
NiuxzSocket socket;
Worker(NiuxzSocketsocket){//构造函数
this.socket = socket;
}
void run(){
try{
while(true){
SocketPackage sp = SocketPackage.parse(socket.getIn());//阻塞读,直到读完一个消息包未知,这样可以解决粘包或半包的问题
SocketHandler handler = handlers.get(sp.getAction());//根据行为标识获取响应的处理器
handler.handle(sp,socket);//处理结果和响应信息都在handler中回写
}
}cache(Exception e){
LOG.error("连接异常中断",e);
NiuxzServer.destroy(socket);
}
}
}
}
/** 创建一个消息处理器 SocketHandler 接收所有内容后 回显 **/
class EchoSocketHandler implements SocketHandler {
/** 处理socket请求 **/
void handle(SocketPackage sp, NiuxzSocket socket) {
sp.setAction(10);// 比如协议中的行为标识10是响应成功的意思
socket.write(sp.toBytes());// 直接回写
}
}
至此两端的工作代码已经初步完成。socket可以按照相互制定的通讯方式进行通讯了。
3、心跳机制:
心跳机制socket长链接通讯中不可或缺的一个机制。主动端可以检测socket是否存活,被动端可以检测对方是否还在线。因为有时候网络并不一定那么完美,会出现链路上的异常,此时应用层可能并不能发现问题,等下次再用这个连接的时候就会抛出异常了,如果是被动端,还会白白占用着一个线程,不如在那之前就发现一部分异常,并销毁连接,下次通讯时出错的概率就降低了很多,被动端也会释放线程,释放资源。
代码可以这样实现:
主动端:
做一个定时任务遍历判断连接池中所有连接的上次使用时间是否超过心跳包间隔时间,超过了就取出这个socket并开启一个线程(最好使用使用线程池),在线程中发送一个心跳包。
@Scheduled(fixedDelay=30*1000)//延时30秒执行一次
void HeartBeat(){
for(NiuxzSocket socket:socketPool.getAllSocket()){
if(System.curTime() - socket.getLastUse() > 30*1000){//如果系统时间减上次使用时间大于30秒
//开启线程,从连接池中取出这个连接remove(socket)移除成功再继续操作,保证不会有其他线程同时使用这个socket。发送一个SocketPackage,socketClient.sendHeartBeat()
if(socketPool.remove(socket)){
socketClient.snedHeartBeat(socket);//socketClient.snedHeartBeat这个方法实现:行为标识设置为心跳包,比如规定1就是心跳包。完事回收这个链接socketPool.recycle(socket),但当中间反生异常,则代表这个连接不可用了,就销毁socketPool.destroy(socket)。
}
}
}
}
被动端:
跟主动端一样,定时扫描连接池,但是发现超过规定的空闲超时时间的连接时不发送心跳包而是直接销毁,关闭socket后,正在read的线程就会读取到EOF(-1),停止线程。规定的超时时间一定要大于约定的心跳包的间隔时间。
4、即时重发:
优化SocketClient,在每次发送的时候,如果发生异常,销毁当前socket后,再次执行一次或两次即可。重试几次后如果不行再把异常抛出。
5、完善填坑:
通过上面的工作,我们其实已经解决了问题1、3、4了。通过报文制定信息长度解决半包粘包问题,通过客户端的连接池或操作socket加锁的方式解决多线程访问socket时会造成数据错乱的问题(好吧,加锁谁不会呢。。所以推荐使用连接池的方式,提高吞吐量)。
还有问题2、5。其实我们可以通过一个Sokcet健康检测任务(也可与心跳检测任务合并,把心跳任务的延迟时间改为100ms或者更低)去遍历连接池,判断每个连接的信息,挨个判断每个状态是否异常,然后再决定要不要关闭socket。
比如问题1,可能在读写操作时对方卡死,导致很久不处理任务或者对方挂起了,压根不会继续接收或回写信息了,这时如果有一个超时机制就比较好了,幸运的是java的socket是有setSoTimeout方法的,可以设置read的超时时间,给主动端设置个30s,被动端迟迟不响应,就会抛出超时异常,这时候我们就销毁这个socket了。
但是,java的socket没有提供write的超时设置,那给被动端写数据时,被动端接收巨缓慢或者出了什么问题导致压根不接收数据了,就会导致这个写入线程一直挂起。我们当然不希望发生这样的事情,那么我们可以在write之前记录下当前时间并把socket变为正在写出状态,然后在Sokcet健康检测任务中判断这个socket是否是写出状态并且时间是否超过xx秒,来决定是否关闭这个socket。
空闲socket关闭就更简单了,在NiuxzSocket再加一个上次非心跳包发送时间,然后在健康检测任务中进行判断就可以了。
以上便是我用同步socket实现第一版分布式文件系统时总结的经验,有些问题其实在NIO中变得不是问题了。NIO和AIO更适合会持有大量连接的服务器端。