使用CBrother脚本做TCP服务器与C++客户端通信
工作中总是会遇到一些对于服务器压力不是特别大,但是代码量比较多,用C++写起来很不方便。对于这种需求,我选择用CBrother脚本做服务器,之所以不选择Python是因为python的语法我实在是适应不了,再来CBrother的网络框架也是用C++封装的异步IO,性能还是很有保证的。
废话不多说,先来看下服务器代码,我这里只是记录一个例子,不是全部代码,方便后面做项目的时候直接来自己博客复制代码修改。
import CBSocket.code //加载Socket扩展 var g_tcpModule = null; //全局保存TCP模块对象 const MSG_TYPE_USER = 1; //客户端发来的消息
const MSG_TYPE_CLOSE = 2; //socket断线的消息 const LOGIC_MSG_LOGIN = 1; //登陆消息全局定义
const LOGIC_MSG_GETID = 2; //获取ID消息全局定义 function main(a) //入口函数,cbrother从这里开始执行
{
var thread = new Thread(); //启动一个数据管理线程,串行处理全局数据,可以根据不同业务启动多个
thread.setThreadAction(new ThreadAction());
thread.start(); g_tcpModule = new TcpModule(); //启动一个TCP模块
var tcpAction = new TcpAction();
tcpAction.thread = thread;
g_tcpModule.setTcpAction(tcpAction); //设置TCP的处理类为TcpAction
g_tcpModule.addListenPort(6061,"0.0.0.0"); //监听6061端口
g_tcpModule.start(); print "tcpServer start!"; while(1)
{
Sleep(1000);
}
}
TcpAction主要处理tcpmodule的消息回调
class SocketBuf //这个类会给每个socket创建一个,用来拼包,因为tcp在传输过程中并不保证每次收到都是整包数据
{
var _byteArray = new ByteArray(1024 * 10); //每个socket预留10K的缓冲 function SocketBuf()
{
_byteArray.setLittleEndian(true); //c++编码为低位编址(LE) 如果要高位编码c++里面可以htonl
} function PushData(bytes,len)
{
print "pushdata " + len;
if(!_byteArray.writeBytes(bytes,len))
{
print "socket buf is full!";
return false;
} return true;
} function CheckPack()
{
print "begin CheckPack!";
var writePos = _byteArray.getWritePos();
print "checkpack " + writePos;
if(writePos < 4) //前4个字节表示包的长度
{
print "CheckPack null < 4";
return null;
} var msglen = _byteArray.readInt();
print "checkpack " + msglen;
if(writePos < msglen + 4) //缓存里的数据还不够一包数据,继续等待数据
{
print "CheckPack null writePos < msglen + 4";
_byteArray.setReadPos(0);
return null;
} //够一包数据了,取出数据,到另一个线程里去处理
var newBytes = _byteArray.readBytes(msglen);
newBytes.setLittleEndian(true); var readPos = _byteArray.getReadPos(); _byteArray.copy(_byteArray,0,readPos,writePos - readPos);
_byteArray.setReadPos(0);
_byteArray.setWritePos(writePos - readPos);
print "writePos:" + writePos;
print "readPos:" + readPos;
//XORCode(newBytes); //异或解密一下,这个为了安全性考虑。我在另一篇博客里专门写这个函数与C++加密函数的对应关系
return newBytes;
}
} class TcpAction
{
var thread; //这个是逻辑线程,这个例子里我只启动一个逻辑线程
var sockMap = new Map(); //保存每个socket的消息缓冲,拼包用
var lock = new Lock(); //sockMap的锁 function OnAccept(sock)
{
print "accept " + sock + " " + g_tcpModule.getRemoteIP(sock); //监听到客户端连接,管理起来
lock.lock();
var socketbuf = new SocketBuf();
sockMap.add(sock,socketbuf);
lock.unlock();
} function OnClose(sock)
{
print "onclose " + sock; //断线了,移除掉,并通知逻辑线程
lock.lock();
sockMap.remove(sock);
lock.unlock(); var newmsg = new ThreadMsg(sock,null);
newmsg.type = MSG_TYPE_CLOSE;
thread.addMsg(newmsg);
} function OnRecv(sock,byteArray,len)
{
print "onrecv " + sock + " len:" + len; //收到数据获取socket缓冲
lock.lock();
var socketbuf = sockMap.get(sock);
lock.unlock(); if(socketbuf == null)
{
return; //应该是被关掉了
} if(!socketbuf.PushData(byteArray,len)) //数据压进去
{
g_tcpModule.closeSocket(sock);
return;//buf满了都解不开包,说明数据有问题,关了它
} //把包解出来丢到逻辑线程去处理,循环是因为怕buf里同时又好几包数据
var newBytes = socketbuf.CheckPack();
while(newBytes != null)
{
thread.addMsg(new ThreadMsg(sock,newBytes));
newBytes = socketbuf.CheckPack();
}
}
}
最后是逻辑线程里,其实上面的代码写好了基本上就不动了,后续添加消息号增加处理都是在逻辑线程里去,所以上面的代码可以封装一下,我这里是为了自己好记忆,所以就这样写了。
//这个类是线程消息类,可以理解为一个结构体
class ThreadMsg
{
function ThreadMsg(id,bytes)
{
socketid = id;
byteArray = bytes;
type = MSG_TYPE_USER;
} var socketid;
var type;
var byteArray;
} class ThreadAction
{
var _userMap = new Map(); //用户名索引用户信息
var _socketMsp = new Map(); //Socket索引用户信息 var _funcMap = new Map(); //消息对应的处理函数 function onInit()
{
print "thread init" ; //线程启动时读取数据库数据 因为篇幅问题不写加载数据库的部分了,固定插入两条数据做模拟数据
//LoadDB();
_userMap.add("zhangsan",new UserData("zhangsan","123123",1));
_userMap.add("lisi",new UserData("lisi","321321",2)); _funcMap.add(LOGIC_MSG_LOGIN,onLogin); //注册消息号的处理函数
_funcMap.add(LOGIC_MSG_GETID,onGetID);
} function onMsg(msg)
{
switch(msg.type)
{
case MSG_TYPE_CLOSE:
{
//断线消息
print "MSG_TYPE_CLOSE";
OnClose(msg.socketid);
break;
}
case MSG_TYPE_USER:
{
//客户端发来的消息,客户端发来的数据结构都是从struct MsgBase派生,所以前4个字节就是struct MsgBase的msgid
var msgid = msg.byteArray.readInt();
var func = _funcMap.get(msgid);
print "MSG_TYPE_USER" + msgid;
if(func != null)
{
func.invoke(msg.socketid,msg.byteArray);
}
break;
}
}
} function onEnd()
{
print "thread end";
} function SendData(socketid,byteArray) //发送数据给客户端的函数
{
//XORCode(byteArray); //异或加密一下
var lenByte = new ByteArray();
lenByte.setLittleEndian(true);
lenByte.writeInt(byteArray.getWritePos()); g_tcpModule.sendData(socketid,lenByte);
g_tcpModule.sendData(socketid,byteArray);
} function OnClose(socketid)
{
//关闭的时候从socket管理里移除,清理在线状态
var userdata = _socketMsp.get(socketid);
if(userdata == null)
{
return;
}
userdata._IsOnLine = false;
userdata._SocketID = 0;
_socketMsp.remove(socketid);
} function onLogin(socketid,byteArray)
{
print "onLogin"; var namebuf = byteArray.readBytes(32); //这个长度要跟C++结构体对应 char name[32];
var name = namebuf.readString();
var pwdbuf = byteArray.readBytes(32); //这个长度要跟C++结构体对应 char pwd[32];
var pwd = pwdbuf.readString(); print name;
print pwd; var userdata = _userMap.get(name);
if(userdata == null)
{
//没有找到用户名,用户名错误
var resBytes = new ByteArray();
resBytes.writeInt(LOGIC_MSG_LOGIN);
resBytes.writeInt(1); //回复1表示帐号或者密码错误错误
SendData(socketid,resBytes);
print "name err!";
return;
} if(pwd != userdata._UserPwd)
{
var resBytes = new ByteArray();
resBytes.writeInt(LOGIC_MSG_LOGIN);
resBytes.writeInt(1); //回复1表示帐号或者密码错误错误
SendData(socketid,resBytes);
print "pwd err!";
return;
} if(userdata._IsOnLine) //这个帐号已经登录过了,冲掉
{
g_tcpModule.closeSocket(userdata._SocketID);
OnClose(userdata._SocketID);
} //登陆成功,添加进socket管理,并至在线状态
userdata._IsOnLine = true;
userdata._SocketID = socketid;
_socketMsp.add(socketid,userdata); var resBytes = new ByteArray();
resBytes.setLittleEndian(true);
resBytes.writeInt(LOGIC_MSG_LOGIN);
resBytes.writeInt(2); //回复2表示登录成功
SendData(socketid,resBytes);
print "login suc!";
} function onGetID(socketid,byteArray)
{
var userdata = _socketMsp.get(socketid);
if(userdata == null)
{
return; //该socket不在线,不处理
} var resBytes = new ByteArray();
resBytes.setLittleEndian(true);
resBytes.writeInt(LOGIC_MSG_GETID);
resBytes.writeInt(userdata._UserID);
SendData(socketid,resBytes);
}
}
服务器代码完成了,再来看看C++客户端代码。
enum
{
LOGIC_MSG_LOGIN = , //登陆
LOGIC_MSG_GETID = , //获取ID
}; struct MsgBase //消息基类
{
int msgid;
}; struct MsgLogin : public MsgBase //登陆消息
{
char name[];
char pwd[];
}; struct MsgLoginRet : public MsgBase //登陆返回
{
int res;
}; struct MsgGetID : public MsgBase //获取ID消息
{
}; struct MsgGetIDRet : public MsgBase //获取ID返回
{
int userid;
}; //接收服务器消息,将数据放到recvBuf里
bool RecvData(int sock,char* recvBuf)
{
int alllen = ;
int len = recv(sock,recvBuf,,); //先读4个字节为消息长度
if (len <= )
{
return false; //socket被关闭了
} alllen += len;
while (alllen < )
{
len = recv(sock,recvBuf + len, - len,);
if (len <= )
{
return false; //socket被关闭了
} alllen += len;
} int msglen = *((int*)recvBuf); //再将消息内容读入recvBuf
alllen = ;
len = recv(sock,recvBuf,msglen,);
if (len <= )
{
return false; //socket被关闭了
} alllen += len;
while (alllen < msglen)
{
len = recv(sock,recvBuf + len,msglen - len,);
if (len <= )
{
return false; //socket被关闭了
} alllen += len;
} return true;
} //发送数据
bool SendData(int sock,MsgBase* pbase,int len)
{
//XORBuf((char*)pbase,sizeof(len)); //异或加密,下一篇博客专门写这个函数 send(sock,(const char*)&len,,); //发送长度
send(sock,(const char*)pbase,len,); //发送数据
return true;
} int _tmain(int argc, _TCHAR* argv[])
{
WORD sockVersion = MAKEWORD(, );
WSADATA wsaData;
if (WSAStartup(sockVersion, &wsaData) != )
{
return ;
} int clinetsocket = socket(PF_INET, SOCK_STREAM, );
if (clinetsocket == -)
{
return ;
} struct hostent *hptr = gethostbyname("127.0.0.1"); struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = *(u_long*)hptr->h_addr_list[];
address.sin_port = htons(); int result = connect(clinetsocket, (struct sockaddr *)&address, sizeof(address));
if (result == -)
{
return NULL;
} //定义登陆消息结构
MsgLogin msg;
msg.msgid = LOGIC_MSG_LOGIN;
strcpy(msg.name,"lisi");
strcpy(msg.pwd,"");
int len = sizeof(msg);
SendData(clinetsocket,&msg,len); char recvBuf[] = {}; while (true)
{
if(!RecvData(clinetsocket,recvBuf))
{
printf("socket close!\n"); //连接断了
break;
} //收到的数据先转为pBase 看前4个字节的msgid
MsgBase* pBase = (MsgBase*)recvBuf; switch (pBase->msgid)
{
case LOGIC_MSG_LOGIN:
{
//登陆返回
MsgLoginRet* mlr = (MsgLoginRet*)pBase;
if (mlr->res == )
{
printf("login err!\n");
}
else
{
printf("login suc!\n"); //请求ID
MsgGetID msggetid;
msggetid.msgid = LOGIC_MSG_GETID;
len = sizeof(msggetid);
SendData(clinetsocket,&msggetid,len);
}
break;
}
case LOGIC_MSG_GETID:
{
//请求ID返回
MsgGetIDRet* mgir = (MsgGetIDRet*)pBase;
printf("userid : %d\n",mgir->userid);
break;
}
}
} return ;
}
这样客户端和服务器就都完成了,下面再来记录一下C++消息结构序列化后的二进制流。
MsgBase为所有消息的基类,所以从它派生的结构体前4个字节肯定是整形的msgid。在服务端直接readInt读取前4个字节就表示读取了MsgBase里的msgid。
MsgLogin有两个成员变量都为char[32]数组,所以这个结构体的总字节大小是64,除掉前4个字节是msgid意外,readBytes(32)表示读取这个数组,再readString表示获取\0结尾的字符串
MsgLoginRet只有一个成员变量,所以服务器第一个writeInt表示填充基类的msgid,第二个writeInt表示res。
之后的逻辑就都是添加消息号和消息结构做逻辑了,用脚本做服务器编码效率还是非常高的。