QT实现HTTP JSON高效多线程处理服务器

QT实现HTTP JSON高效多线程处理服务器

Legahero QQ:1395449850

现在一个平台级的系统光靠web打天下是不太现实的了,至少包含APP和web两部分,在早期APP直接访问web交换数据,后来程序员们发现由于 web界面的变化和数据展现多变,APP需要一个稳定、轻量的数据交互接口协议,重量的web无法满足,http json由于数据扩展性好、数据结构简单、轻量成为首选协议。

最近考虑用QT实现HTTP JSON服务器,主要原因是:使用java (servlet、com.sun.net.httpserver)容易反编译,虽然网上提供了一大把的java加密、混淆方案,但总感觉麻烦和不靠谱;其次单点服务器下运行效率java比C++慢;再有运维部署exe程序比web服务器简单(当然你要保证你的程序需要最够稳定);最后可以使你的json服务与web程序服务完全独立,web的维护不会影响到json服务,这点很重要能够保证你的系统不至于在web运维期间完全瘫痪。

选择QT来实现,编程速度比其他的C++快,windows、linux兼容性好,这里边需要解决几个问题:

  1. 框架的业务实现部分必须最够简单,业务部分容易实现和扩充;
  2. 高效多线程并发处理必须最够强;
  3. 体量小,CPU、内存占用小,能够长时间稳定运行;

处理流程总结如下:

QT实现HTTP JSON高效多线程处理服务器

HTTP协议基于TCP协议,首先实现一个异步收发的TCP服务类:

//继承QTCPSERVER以实现多线程TCPscoket的服务器。

class QAsynTcpServer : public QTcpServer

{

Q_OBJECT

public:

explicit QAsynTcpServer(QObject *parent = 0,int numConnections = 10000);

~QAsynTcpServer();

void setMaxPendingConnections(int numConnections);//重写设置最大连接数函数

protected slots:

void sockDisConnectSlot(int handle,const QString & ip, quint16 prot, QThread *th);//断开连接的用户信息

public:

void clear(); //断开所有连接,线程计数器请0

protected:

void incomingConnection(qintptr socketDescriptor);//覆盖已获取多线程

QHash<int,QTcpSocket *> * m_ClientList;//管理连接的map

private:

int maxConnections;

};

再从该QAsynTcpServer类继承实现Http服务类:

class QHttpServer : public QAsynTcpServer

{

Q_OBJECT

public:

QHttpServer(QObject *parent = 0,int numConnections=1000);

virtual ~QHttpServer();

private Q_SLOTS:

void handleRequest(QHttpRequest *request, QHttpResponse *response);

protected slots:

void sockDisConnectSlot(int handle,const QString & ip, quint16 prot, QThread *th);//断开连接的用户信息

protected:

void incomingConnection(qintptr socketDescriptor);//覆盖已获取多线程

};

实现多线程处理必须重写void QTcpServer::incomingConnection(qintptr socketDescriptor)的实现:

void QHttpServer::incomingConnection(qintptr socketDescriptor)
{
    qDebug() << "QHttpServer:incomingConnection,ThreadId:"<<QThread::currentThreadId()  ;
 
    //继承重写此函数后,QQAsynTcpServer默认的判断最大连接数失效,自己实现
    if (m_ClientList->size() > maxPendingConnections())
    {
        QTcpSocket tcp;
        tcp.setSocketDescriptor(socketDescriptor);        
        tcp.disconnectFromHost();
        qDebug() << "tcpClient->size() > maxPendingConnections(),disconnectFromHost";
        return;
    }
    auto th = ThreadHandle::getClass().getThread();
    QAsynHttpSocket* tcpTemp = new QAsynHttpSocket(socketDescriptor);
    QString ip =  tcpTemp->peerAddress().toString();
    qint16 port = tcpTemp->peerPort();
 
    //NOTE:断开连接的处理,从列表移除,并释放断开的Tcpsocket,线程管理计数减1,此槽必须实现
    connect(tcpTemp,SIGNAL(sockDisConnect(const int ,const QString &,const quint16, QThread *)),
            this,SLOT(sockDisConnectSlot(const int ,const QString &,const quint16, QThread *)));
 
    //必须在QAsynHttpSocket的线程中执行
    connect(tcpTemp, SIGNAL(newRequest(QHttpRequest *, QHttpResponse *)), this,
            SLOT(handleRequest(QHttpRequest *, QHttpResponse *)), Qt::DirectConnection);
 
    tcpTemp->moveToThread(th);//把tcp类移动到新的线程,从线程管理类中获取
    m_ClientList->insert(socketDescriptor,tcpTemp);//插入到连接信息中
 
    qDebug() << "QHttpServer m_ClientList add:"<<socketDescriptor  ;
}

ThreadHandle::getClass().getThread()为每个连接分配一个处理线程,m_ClientList管理所有的连接。

handleRequest处理Http请求,必须在该连接的线程中执行,所以使用了Qt::DirectConnection,在新的请求发生时触发,在该处实现和扩展json业务,以下为代码样例:

//处理新的http 请求,这里处理业务

void QHttpServer::handleRequest(QHttpRequest *req, QHttpResponse *resp)

{

qDebug() << "QHttpServer:handleRequest,ThreadId:"<<QThread::currentThreadId()  ;

qDebug() <<"path:"<< req->path();

QJsonDocument doc=QJsonDocument::fromBinaryData(req->body());

QJsonObject recv_obj=doc.object();//这是接收到的json对象

QJsonObject resp_obj; //返回json对象

resp_obj.insert("man_num",4);

resp_obj.insert("time", "20150601113432");

QByteArray data = QJsonDocument(resp_obj).toJson(QJsonDocument::Compact);

resp->setHeader("Content-Type", "text/html");

resp->setHeader("Content-Length", QString::number(data.length()));

resp->writeHead(200);

resp->end(data);

resp->flush();

req->deleteLater();

resp->deleteLater();

qDebug() <<"handleRequest end";

}

在这里使用QJsonDocument来解析json请求,并给请求回json应答,按应用求求实现自己的业务逻辑。

当客户端关闭连接的时候需要释放连接占用的线程资源,并从连接管理中删除。

//释放线程资源

void QHttpServer::sockDisConnectSlot(int handle,const QString & ip, quint16 prot,QThread * th)

{

qDebug() << "QHttpServer:sockDisConnectSlot,ThreadId:"<<QThread::currentThreadId()  ;

qDebug() << "QHttpServer m_ClientList size:"<<m_ClientList->size()  ;

qDebug() << "QHttpServer m_ClientList:remove:"<<handle  ;

m_ClientList->remove(handle);//连接管理中移除断开连接的socket

ThreadHandle::getClass().removeThread(th); //告诉线程管理类那个线程里的连接断开了,释放数量

qDebug() << "QHttpServer m_ClientList size:"<<m_ClientList->size()  ;

}

在这里我们理解为客户端采用一个请求一个应答的往来方式实现http 解析,从QTcpSocket继承建立自己的QAsynHttpSocket类:

class QAsynHttpSocket: public QTcpSocket

{

Q_OBJECT

public:

explicit QAsynHttpSocket(qintptr socketDescriptor, QObject *parent = 0);

~QAsynHttpSocket();

void write(const QByteArray &data);

//void flush();

void waitForBytesWritten();

Q_SIGNALS:

void newRequest(QHttpRequest *, QHttpResponse *);

void allBytesWritten();

signals:

//NOTE:断开连接的用户信息,此信号必须发出!线程管理类根据信号计数的

void sockDisConnect(const int ,const QString &,const quint16, QThread *);

private Q_SLOTS:

void doParseRequest();

void doDisconnected();

void doUpdateWriteCount(qint64);

private:

static int doMessageBegin(http_parser *parser);

static int doMessageComplete(http_parser *parser);

static int doHeaderField(http_parser *parser, const char *at, size_t length);

static int doHeaderValue(http_parser *parser, const char *at, size_t length);

static int doHeadersComplete(http_parser *parser);

static int doBody(http_parser *parser, const char *at, size_t length);

static int doUrl(http_parser *parser, const char *at, size_t length);

static int doStatus(http_parser *parser, const char *at, size_t length);

static int doChunkHeader(http_parser *parser);

static int doChunkComplete(http_parser *parser);

private:

qintptr socketID;

http_parser *m_parser;

http_parser_settings *m_parserSettings;

//临时变量

QByteArray m_currentUrl;

// The ones we are reading in from the parser

HeaderHash m_currentHeaders;

QString m_currentHeaderField;

QString m_currentHeaderValue;

QByteArray m_currentBody;

QByteArray m_currentStatus;

// Keep track of transmit buffer status

qint64 m_transmitLen;

qint64 m_transmitPos;

private:

Q_DISABLE_COPY(QAsynHttpSocket)

};

在本程序中使用nodejs 的http-parser的源代码来解析http 请求,http-parser是一个用C代码编写的HTTP消息解析器,可以解析HTTP请求或者回应消息,该解析器常常在高性能的HTTP应用中使用;在解析的过程中,它不会调用任何系统资源,不会在HEAP上申请内存,不会缓存数据,并且可以在任意时刻打断解析过程,而不会产生任何影响,对于每个HTTP消息(在WEB服务器中就是每个请求),需要的内存占用很少(40字节?)。

http-parser的特性:无第三方依赖、可以处理持久消息(keep-alive)、支持解码chunk编码的消息、支持Upgrade协议升级(如无例外就是WebSocket)、可以防御缓冲区溢出攻击。解析器可以处理以下类型的HTTP消息:头部的字段和值、Content-Length、请求方法、返回的HTTP代码、Transfer-Encoding、HTTP版本、请求的URL、HTTP消息body主体。

使用非常简单,建立回调函数,执行http_parser_execute连续分析数据。可以在QAsynHttpSocket的构造函数中初始化http_parser,绑定回调函数。

QAsynHttpSocket::QAsynHttpSocket(qintptr socketDescriptor, QObject *parent) : //构造函数在主线程执行,lambda在子线程

QTcpSocket(parent),socketID(socketDescriptor),

m_parser(0),

m_parserSettings(0),

m_transmitLen(0),

m_transmitPos(0)

{

this->setSocketDescriptor(socketDescriptor);

m_parser = (http_parser *)malloc(sizeof(http_parser));

http_parser_init(m_parser, HTTP_REQUEST);

m_parserSettings = new http_parser_settings();

m_parserSettings->on_message_begin = doMessageBegin;

m_parserSettings->on_message_complete = doMessageComplete;

m_parserSettings->on_header_field = doHeaderField;

m_parserSettings->on_header_value = doHeaderValue;

//在HeadersComplete完成时激发newRequest

m_parserSettings->on_headers_complete = doHeadersComplete;

m_parserSettings->on_body = doBody;

m_parserSettings->on_status = doStatus;

m_parserSettings->on_url = doUrl;

m_parserSettings->on_chunk_header = doChunkHeader;

m_parserSettings->on_chunk_complete = doChunkComplete;

m_parser->data = this;

connect(this, SIGNAL(readyRead()), this, SLOT(doParseRequest()));

connect(this, SIGNAL(disconnected()), this, SLOT(doDisconnected()));

//当所有的字节写完updateWriteCount激发allBytesWritten信号

connect(this, SIGNAL(bytesWritten(qint64)), this, SLOT(doUpdateWriteCount(qint64)));

qDebug() << "new connect:"<<socketDescriptor ;

}

http_parser有两种类型的回调:

  • 通知 typedef int (*http_cb) (http_parser *);包括:on_message_begin,on_headers_complete,on_message_complete
  • 数据 typedef int (*http_data_cb) (http_parser *, const char at, size_t length);包括;(只限与请求)on_uri, (通用) on_header_fieldon_header_value,on_body

用户的回调函数应该返回0表示成功。返回非0的值,会告诉解析器发生了错误,解析器会立刻退出。

Socket的readyRead()对应的槽实现函数中持续调用http_parser_execute。

void QAsynHttpSocket::doParseRequest()

{

Q_ASSERT(m_parser);

qDebug() << "QHttpConnection:parseRequest,ThreadId:"<<QThread::currentThreadId()  ;

while (this->bytesAvailable()) {

qDebug() << "readAll,ThreadId:"<<QThread::currentThreadId()  ;

QByteArray arr = this->readAll();

qDebug() << "http_parser_execute begin,ThreadId:"<<QThread::currentThreadId()  ;

http_parser_execute(m_parser, m_parserSettings, arr.constData(), arr.size());

qDebug() << "http_parser_execute end,ThreadId:"<<QThread::currentThreadId()  ;

}

}

http_parser在每一个http请求消息解析完成后触发MessageComplete回调,这个时候可建立QHttpRequest和QHttpResponse,并发射新请求到来信号,触发handleRequest业务处理。

int QAsynHttpSocket::doMessageComplete(http_parser *parser)

{

qDebug() << "doMessageComplete" ;

// TODO: do cleanup and prepare for next request

QAsynHttpSocket *theConnection = static_cast<QAsynHttpSocket *>(parser->data);

QHttpRequest* request = new QHttpRequest(theConnection);

/** set method **/

request->setMethod(static_cast<QHttpRequest::HttpMethod>(parser->method));

/** set version **/

request->setVersion(

QString("%1.%2").arg(parser->http_major).arg(parser->http_minor));

/** get parsed url **/

struct http_parser_url urlInfo;

int r = http_parser_parse_url(theConnection->m_currentUrl.constData(),

theConnection->m_currentUrl.size(),

parser->method == HTTP_CONNECT, &urlInfo);

Q_ASSERT(r == 0);

Q_UNUSED(r);

request->setUrl(createUrl(theConnection->m_currentUrl.constData(), urlInfo));

// Insert last remaining header,这个已经在doHeadersComplete中执行

//theConnection->m_currentHeaders[theConnection->m_currentHeaderField.toLower()] =

//   theConnection->m_currentHeaderValue;

request->setHeaders(theConnection->m_currentHeaders);

/** set client information **/

request->m_remoteAddress = theConnection->peerAddress().toString();

request->m_remotePort = theConnection->peerPort();

qDebug() << "QHttpResponse:new,ThreadId:"<<QThread::currentThreadId()  ;

QHttpResponse *response = new QHttpResponse(theConnection);

if (parser->http_major < 1 || parser->http_minor < 1)

response->m_keepAlive = false;

Q_EMIT theConnection->newRequest(request, response);

return 0;

}

本程序框架代码很少,在windows10下占用内存9.5M,经过测试完全满足高并发长期稳定运行。 代码下载

备注:本程序参照网上局部代码并经过大幅度整理修改,大部分代码原创实现。

上一篇:网易云课堂_程序设计入门-C语言_第一周:简单的计算程序_1逆序的三位数


下一篇:黑信 socket即时通讯 示例