QT实现HTTP JSON高效多线程处理服务器
Legahero QQ:1395449850
现在一个平台级的系统光靠web打天下是不太现实的了,至少包含APP和web两部分,在早期APP直接访问web交换数据,后来程序员们发现由于 web界面的变化和数据展现多变,APP需要一个稳定、轻量的数据交互接口协议,重量的web无法满足,http json由于数据扩展性好、数据结构简单、轻量成为首选协议。
最近考虑用QT实现HTTP JSON服务器,主要原因是:使用java (servlet、com.sun.net.httpserver)容易反编译,虽然网上提供了一大把的java加密、混淆方案,但总感觉麻烦和不靠谱;其次单点服务器下运行效率java比C++慢;再有运维部署exe程序比web服务器简单(当然你要保证你的程序需要最够稳定);最后可以使你的json服务与web程序服务完全独立,web的维护不会影响到json服务,这点很重要能够保证你的系统不至于在web运维期间完全瘫痪。
选择QT来实现,编程速度比其他的C++快,windows、linux兼容性好,这里边需要解决几个问题:
- 框架的业务实现部分必须最够简单,业务部分容易实现和扩充;
- 高效多线程并发处理必须最够强;
- 体量小,CPU、内存占用小,能够长时间稳定运行;
处理流程总结如下:
HTTP协议基于TCP协议,首先实现一个异步收发的TCP服务类:
//继承QTCPSERVER以实现多线程TCPscoket的服务器。
class QAsynTcpServer : public QTcpServer
{
Q_OBJECT
public:
explicit QAsynTcpServer(QObject *parent = 0,int numConnections = 10000);
~QAsynTcpServer();
void setMaxPendingConnections(int numConnections);//重写设置最大连接数函数
protected slots:
void sockDisConnectSlot(int handle,const QString & ip, quint16 prot, QThread *th);//断开连接的用户信息
public:
void clear(); //断开所有连接,线程计数器请0
protected:
void incomingConnection(qintptr socketDescriptor);//覆盖已获取多线程
QHash<int,QTcpSocket *> * m_ClientList;//管理连接的map
private:
int maxConnections;
};
再从该QAsynTcpServer类继承实现Http服务类:
class QHttpServer : public QAsynTcpServer
{
Q_OBJECT
public:
QHttpServer(QObject *parent = 0,int numConnections=1000);
virtual ~QHttpServer();
private Q_SLOTS:
void handleRequest(QHttpRequest *request, QHttpResponse *response);
protected slots:
void sockDisConnectSlot(int handle,const QString & ip, quint16 prot, QThread *th);//断开连接的用户信息
protected:
void incomingConnection(qintptr socketDescriptor);//覆盖已获取多线程
};
实现多线程处理必须重写void QTcpServer::incomingConnection(qintptr socketDescriptor)的实现:
void QHttpServer::incomingConnection(qintptr socketDescriptor)
{
qDebug() << "QHttpServer:incomingConnection,ThreadId:"<<QThread::currentThreadId() ;
//继承重写此函数后,QQAsynTcpServer默认的判断最大连接数失效,自己实现
if (m_ClientList->size() > maxPendingConnections())
{
QTcpSocket tcp;
tcp.setSocketDescriptor(socketDescriptor);
tcp.disconnectFromHost();
qDebug() << "tcpClient->size() > maxPendingConnections(),disconnectFromHost";
return;
}
auto th = ThreadHandle::getClass().getThread();
QAsynHttpSocket* tcpTemp = new QAsynHttpSocket(socketDescriptor);
QString ip = tcpTemp->peerAddress().toString();
qint16 port = tcpTemp->peerPort();
//NOTE:断开连接的处理,从列表移除,并释放断开的Tcpsocket,线程管理计数减1,此槽必须实现
connect(tcpTemp,SIGNAL(sockDisConnect(const int ,const QString &,const quint16, QThread *)),
this,SLOT(sockDisConnectSlot(const int ,const QString &,const quint16, QThread *)));
//必须在QAsynHttpSocket的线程中执行
connect(tcpTemp, SIGNAL(newRequest(QHttpRequest *, QHttpResponse *)), this,
SLOT(handleRequest(QHttpRequest *, QHttpResponse *)), Qt::DirectConnection);
tcpTemp->moveToThread(th);//把tcp类移动到新的线程,从线程管理类中获取
m_ClientList->insert(socketDescriptor,tcpTemp);//插入到连接信息中
qDebug() << "QHttpServer m_ClientList add:"<<socketDescriptor ;
}
ThreadHandle::getClass().getThread()为每个连接分配一个处理线程,m_ClientList管理所有的连接。
handleRequest处理Http请求,必须在该连接的线程中执行,所以使用了Qt::DirectConnection,在新的请求发生时触发,在该处实现和扩展json业务,以下为代码样例:
//处理新的http 请求,这里处理业务
void QHttpServer::handleRequest(QHttpRequest *req, QHttpResponse *resp)
{
qDebug() << "QHttpServer:handleRequest,ThreadId:"<<QThread::currentThreadId() ;
qDebug() <<"path:"<< req->path();
QJsonDocument doc=QJsonDocument::fromBinaryData(req->body());
QJsonObject recv_obj=doc.object();//这是接收到的json对象
QJsonObject resp_obj; //返回json对象
resp_obj.insert("man_num",4);
resp_obj.insert("time", "20150601113432");
QByteArray data = QJsonDocument(resp_obj).toJson(QJsonDocument::Compact);
resp->setHeader("Content-Type", "text/html");
resp->setHeader("Content-Length", QString::number(data.length()));
resp->writeHead(200);
resp->end(data);
resp->flush();
req->deleteLater();
resp->deleteLater();
qDebug() <<"handleRequest end";
}
在这里使用QJsonDocument来解析json请求,并给请求回json应答,按应用求求实现自己的业务逻辑。
当客户端关闭连接的时候需要释放连接占用的线程资源,并从连接管理中删除。
//释放线程资源
void QHttpServer::sockDisConnectSlot(int handle,const QString & ip, quint16 prot,QThread * th)
{
qDebug() << "QHttpServer:sockDisConnectSlot,ThreadId:"<<QThread::currentThreadId() ;
qDebug() << "QHttpServer m_ClientList size:"<<m_ClientList->size() ;
qDebug() << "QHttpServer m_ClientList:remove:"<<handle ;
m_ClientList->remove(handle);//连接管理中移除断开连接的socket
ThreadHandle::getClass().removeThread(th); //告诉线程管理类那个线程里的连接断开了,释放数量
qDebug() << "QHttpServer m_ClientList size:"<<m_ClientList->size() ;
}
在这里我们理解为客户端采用一个请求一个应答的往来方式实现http 解析,从QTcpSocket继承建立自己的QAsynHttpSocket类:
class QAsynHttpSocket: public QTcpSocket
{
Q_OBJECT
public:
explicit QAsynHttpSocket(qintptr socketDescriptor, QObject *parent = 0);
~QAsynHttpSocket();
void write(const QByteArray &data);
//void flush();
void waitForBytesWritten();
Q_SIGNALS:
void newRequest(QHttpRequest *, QHttpResponse *);
void allBytesWritten();
signals:
//NOTE:断开连接的用户信息,此信号必须发出!线程管理类根据信号计数的
void sockDisConnect(const int ,const QString &,const quint16, QThread *);
private Q_SLOTS:
void doParseRequest();
void doDisconnected();
void doUpdateWriteCount(qint64);
private:
static int doMessageBegin(http_parser *parser);
static int doMessageComplete(http_parser *parser);
static int doHeaderField(http_parser *parser, const char *at, size_t length);
static int doHeaderValue(http_parser *parser, const char *at, size_t length);
static int doHeadersComplete(http_parser *parser);
static int doBody(http_parser *parser, const char *at, size_t length);
static int doUrl(http_parser *parser, const char *at, size_t length);
static int doStatus(http_parser *parser, const char *at, size_t length);
static int doChunkHeader(http_parser *parser);
static int doChunkComplete(http_parser *parser);
private:
qintptr socketID;
http_parser *m_parser;
http_parser_settings *m_parserSettings;
//临时变量
QByteArray m_currentUrl;
// The ones we are reading in from the parser
HeaderHash m_currentHeaders;
QString m_currentHeaderField;
QString m_currentHeaderValue;
QByteArray m_currentBody;
QByteArray m_currentStatus;
// Keep track of transmit buffer status
qint64 m_transmitLen;
qint64 m_transmitPos;
private:
Q_DISABLE_COPY(QAsynHttpSocket)
};
在本程序中使用nodejs 的http-parser的源代码来解析http 请求,http-parser是一个用C代码编写的HTTP消息解析器,可以解析HTTP请求或者回应消息,该解析器常常在高性能的HTTP应用中使用;在解析的过程中,它不会调用任何系统资源,不会在HEAP上申请内存,不会缓存数据,并且可以在任意时刻打断解析过程,而不会产生任何影响,对于每个HTTP消息(在WEB服务器中就是每个请求),需要的内存占用很少(40字节?)。
http-parser的特性:无第三方依赖、可以处理持久消息(keep-alive)、支持解码chunk编码的消息、支持Upgrade协议升级(如无例外就是WebSocket)、可以防御缓冲区溢出攻击。解析器可以处理以下类型的HTTP消息:头部的字段和值、Content-Length、请求方法、返回的HTTP代码、Transfer-Encoding、HTTP版本、请求的URL、HTTP消息body主体。
使用非常简单,建立回调函数,执行http_parser_execute连续分析数据。可以在QAsynHttpSocket的构造函数中初始化http_parser,绑定回调函数。
QAsynHttpSocket::QAsynHttpSocket(qintptr socketDescriptor, QObject *parent) : //构造函数在主线程执行,lambda在子线程
QTcpSocket(parent),socketID(socketDescriptor),
m_parser(0),
m_parserSettings(0),
m_transmitLen(0),
m_transmitPos(0)
{
this->setSocketDescriptor(socketDescriptor);
m_parser = (http_parser *)malloc(sizeof(http_parser));
http_parser_init(m_parser, HTTP_REQUEST);
m_parserSettings = new http_parser_settings();
m_parserSettings->on_message_begin = doMessageBegin;
m_parserSettings->on_message_complete = doMessageComplete;
m_parserSettings->on_header_field = doHeaderField;
m_parserSettings->on_header_value = doHeaderValue;
//在HeadersComplete完成时激发newRequest
m_parserSettings->on_headers_complete = doHeadersComplete;
m_parserSettings->on_body = doBody;
m_parserSettings->on_status = doStatus;
m_parserSettings->on_url = doUrl;
m_parserSettings->on_chunk_header = doChunkHeader;
m_parserSettings->on_chunk_complete = doChunkComplete;
m_parser->data = this;
connect(this, SIGNAL(readyRead()), this, SLOT(doParseRequest()));
connect(this, SIGNAL(disconnected()), this, SLOT(doDisconnected()));
//当所有的字节写完updateWriteCount激发allBytesWritten信号
connect(this, SIGNAL(bytesWritten(qint64)), this, SLOT(doUpdateWriteCount(qint64)));
qDebug() << "new connect:"<<socketDescriptor ;
}
http_parser有两种类型的回调:
- 通知
typedef int (*http_cb) (http_parser *);
包括:on_message_begin
,on_headers_complete
,on_message_complete
- 数据
typedef int (*http_data_cb) (http_parser *, const char at, size_t length);
包括;(只限与请求)on_uri
, (通用)on_header_field
,on_header_value
,on_body
用户的回调函数应该返回0
表示成功。返回非0
的值,会告诉解析器发生了错误,解析器会立刻退出。
Socket的readyRead()对应的槽实现函数中持续调用http_parser_execute。
void QAsynHttpSocket::doParseRequest()
{
Q_ASSERT(m_parser);
qDebug() << "QHttpConnection:parseRequest,ThreadId:"<<QThread::currentThreadId() ;
while (this->bytesAvailable()) {
qDebug() << "readAll,ThreadId:"<<QThread::currentThreadId() ;
QByteArray arr = this->readAll();
qDebug() << "http_parser_execute begin,ThreadId:"<<QThread::currentThreadId() ;
http_parser_execute(m_parser, m_parserSettings, arr.constData(), arr.size());
qDebug() << "http_parser_execute end,ThreadId:"<<QThread::currentThreadId() ;
}
}
http_parser在每一个http请求消息解析完成后触发MessageComplete回调,这个时候可建立QHttpRequest和QHttpResponse,并发射新请求到来信号,触发handleRequest业务处理。
int QAsynHttpSocket::doMessageComplete(http_parser *parser)
{
qDebug() << "doMessageComplete" ;
// TODO: do cleanup and prepare for next request
QAsynHttpSocket *theConnection = static_cast<QAsynHttpSocket *>(parser->data);
QHttpRequest* request = new QHttpRequest(theConnection);
/** set method **/
request->setMethod(static_cast<QHttpRequest::HttpMethod>(parser->method));
/** set version **/
request->setVersion(
QString("%1.%2").arg(parser->http_major).arg(parser->http_minor));
/** get parsed url **/
struct http_parser_url urlInfo;
int r = http_parser_parse_url(theConnection->m_currentUrl.constData(),
theConnection->m_currentUrl.size(),
parser->method == HTTP_CONNECT, &urlInfo);
Q_ASSERT(r == 0);
Q_UNUSED(r);
request->setUrl(createUrl(theConnection->m_currentUrl.constData(), urlInfo));
// Insert last remaining header,这个已经在doHeadersComplete中执行
//theConnection->m_currentHeaders[theConnection->m_currentHeaderField.toLower()] =
// theConnection->m_currentHeaderValue;
request->setHeaders(theConnection->m_currentHeaders);
/** set client information **/
request->m_remoteAddress = theConnection->peerAddress().toString();
request->m_remotePort = theConnection->peerPort();
qDebug() << "QHttpResponse:new,ThreadId:"<<QThread::currentThreadId() ;
QHttpResponse *response = new QHttpResponse(theConnection);
if (parser->http_major < 1 || parser->http_minor < 1)
response->m_keepAlive = false;
Q_EMIT theConnection->newRequest(request, response);
return 0;
}
本程序框架代码很少,在windows10下占用内存9.5M,经过测试完全满足高并发长期稳定运行。 代码下载
备注:本程序参照网上局部代码并经过大幅度整理修改,大部分代码原创实现。