IO复用EPOLL服务器设计
软件层次设计
WebServer :服务器逻辑框架: epoller监听+线程池读写
|
|
Epoller Timer :epoll操作封装, 定时器给连接计时
| |
----------
|
HttpConnection :把监听连接返回的文件描述符封装成一个连接实例, 对readv, write网络数据传输进行封装, 管理连接
| |
HttpRequest HttpResponse :请求操作封装,响应操作封装,业务逻辑
| |
--------------
|
Buffer :读写缓冲区
ThreadPool : 线程池,负责读写操作(上图上两层属于主线程,下三层属于线程池)
Log : 日志类
WebServer设计:
- 按照软件分层设计的草图,WebServer设计目标为:
- 1.监听IO事件
- 2.处理超时连接
-
数据:
int port_; //端口
int timeoutMS_; //毫秒MS,定时器的默认过期时间
bool isClose_; //服务启动标志
int listenFd_; //监听文件描述符
bool openLinger_; //优雅关闭选项
char* srcDir_; //需要获取的路径uint32_t listenEvent_; //初始监听描述符监听设置 uint32_t connectionEvent_;//初始连接描述符监听设置 std::unique_ptr<TimerManager>timer_; //定时器 std::unique_ptr<ThreadPool> threadpool_; //线程池 std::unique_ptr<Epoller> epoller_; //反应堆 std::unordered_map<int, HTTPconnection> users_;//连接队列
-
函数:
- 构造函数: 设置服务器参数 + 初始化定时器/线程池/反应堆/连接队列
- 析构函数: 关闭listenFd_, 销毁 连接队列/定时器/线程池/反应堆
- 主函数start()
- 创建端口,绑定端口,监听端口, 创建epoll反应堆, 将监听描述符加入反应堆
- 等待事件就绪
- 连接事件-->handleListen()
- 写事件-->handleWrite()
- 读事件-->handleRead()
- 事件处理完毕,修改反应堆,再跳到2处循环执行
- handleListen: 新初始化一个HttpConnection对象
- handleWrite: 对应连接对象进行处理-->若处理成功,则监听事件转换成 读 事件
- handleRead: 对应连接对象进行处理-->若处理成功,则监听事件转换成 写 事件
头文件:
#ifndef WEBSERVER_H
#define WEBSERVER_H
#include <unordered_map>
#include <fcntl.h> // fcntl()
#include <unistd.h> // close()
#include <assert.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "../epoll/epoller.h"
#include "../log/log.h"
// #include "../timer/heaptimer.h"
// #include "../pool/sqlconnpool.h"
#include "../threadpool/threadpool.hpp"
// #include "../pool/sqlconnRAII.h"
#include "../http/httpconn.h"
class WebServer {
public:
WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int threadNum,
bool openLog, int logLevel, int logQueSize);
~WebServer();
void Start();
private:
bool InitSocket_();
void InitEventMode_(int trigMode);
void AddClient_(int fd, sockaddr_in addr);
void DealListen_();
void DealWrite_(HttpConn* client);
void DealRead_(HttpConn* client);
void SendError_(int fd, const char*info);
void ExtentTime_(HttpConn* client);
void CloseConn_(HttpConn* client);
void OnRead_(HttpConn* client);
void OnWrite_(HttpConn* client);
void OnProcess(HttpConn* client);
static const int MAX_FD = 65536;
static int SetFdNonblock(int fd);
int port_;
bool openLinger_;
int timeoutMS_; /* 毫秒MS */
bool isClose_;
int listenFd_;
char* srcDir_;
uint32_t listenEvent_;
uint32_t connEvent_;
// std::unique_ptr<HeapTimer> timer_;
std::unique_ptr<ThreadPool> threadpool_;
std::unique_ptr<Epoller> epoller_;
std::unordered_map<int, HttpConn> users_;
};
#endif //WEBSERVER_H
源文件:
#include "webserver.h"
using namespace std;
WebServer::WebServer(
int port, int trigMode, int timeoutMS, bool OptLinger,
int threadNum,
bool openLog, int logLevel, int logQueSize):
port_(port), openLinger_(OptLinger), timeoutMS_(timeoutMS), isClose_(false),
threadpool_(new ThreadPool(threadNum)), epoller_(new Epoller())
{
srcDir_ = getcwd(nullptr, 256);
assert(srcDir_);
strncat(srcDir_, "/resources/", 16);
HttpConn::userCount = 0;
HttpConn::srcDir = srcDir_;
// SqlConnPool::Instance()->Init("localhost", sqlPort, sqlUser, sqlPwd, dbName, connPoolNum);
InitEventMode_(trigMode);
if(!InitSocket_()) { isClose_ = true;}
if(openLog) {
Log::Instance()->init(logLevel, "./log", ".log", logQueSize);
if(isClose_) { LOG_ERROR("========== Server init error!=========="); }
else {
LOG_INFO("========== Server init ==========");
LOG_INFO("Port:%d, OpenLinger: %s", port_, OptLinger? "true":"false");
LOG_INFO("Listen Mode: %s, OpenConn Mode: %s",
(listenEvent_ & EPOLLET ? "ET": "LT"),
(connEvent_ & EPOLLET ? "ET": "LT"));
LOG_INFO("LogSys level: %d", logLevel);
LOG_INFO("srcDir: %s", HttpConn::srcDir);
LOG_INFO("ThreadPool num: %d", threadNum);
}
}
}
WebServer::~WebServer() {
close(listenFd_);
isClose_ = true;
free(srcDir_);
// SqlConnPool::Instance()->ClosePool();
}
void WebServer::InitEventMode_(int trigMode) {
listenEvent_ = EPOLLRDHUP;
connEvent_ = EPOLLONESHOT | EPOLLRDHUP;
switch (trigMode)
{
case 0:
break;
case 1:
connEvent_ |= EPOLLET;
break;
case 2:
listenEvent_ |= EPOLLET;
break;
case 3:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
default:
listenEvent_ |= EPOLLET;
connEvent_ |= EPOLLET;
break;
}
HttpConn::isET = (connEvent_ & EPOLLET);
}
void WebServer::Start() {
int timeMS = -1; /* epoll wait timeout == -1 无事件将阻塞 */
if(!isClose_) { LOG_INFO("========== Server start =========="); }
while(!isClose_) {
// if(timeoutMS_ > 0) {
// timeMS = timer_->GetNextTick();
// }
int eventCnt = epoller_->Wait(timeMS);
for(int i = 0; i < eventCnt; i++) {
/* 处理事件 */
int fd = epoller_->GetEventFd(i);
uint32_t events = epoller_->GetEvents(i);
if(fd == listenFd_) {
DealListen_();
}
else if(events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
assert(users_.count(fd) > 0);
CloseConn_(&users_[fd]);
}
else if(events & EPOLLIN) {
assert(users_.count(fd) > 0);
DealRead_(&users_[fd]);
}
else if(events & EPOLLOUT) {
assert(users_.count(fd) > 0);
DealWrite_(&users_[fd]);
} else {
LOG_ERROR("Unexpected event");
}
}
}
}
void WebServer::SendError_(int fd, const char*info) {
assert(fd > 0);
int ret = send(fd, info, strlen(info), 0);
if(ret < 0) {
LOG_WARN("send error to client[%d] error!", fd);
}
close(fd);
}
void WebServer::CloseConn_(HttpConn* client) {
assert(client);
LOG_INFO("Client[%d] quit!", client->GetFd());
epoller_->DelFd(client->GetFd());
client->Close();
}
void WebServer::AddClient_(int fd, sockaddr_in addr) {
assert(fd > 0);
users_[fd].init(fd, addr);
// if(timeoutMS_ > 0) {
// timer_->add(fd, timeoutMS_, std::bind(&WebServer::CloseConn_, this, &users_[fd]));
// }
epoller_->AddFd(fd, EPOLLIN | connEvent_);
SetFdNonblock(fd);
LOG_INFO("Client[%d] in!", users_[fd].GetFd());
}
void WebServer::DealListen_() {
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
do {
int fd = accept(listenFd_, (struct sockaddr *)&addr, &len);
if(fd <= 0) { return;}
else if(HttpConn::userCount >= MAX_FD) {
SendError_(fd, "Server busy!");
LOG_WARN("Clients is full!");
return;
}
AddClient_(fd, addr);
} while(listenEvent_ & EPOLLET);
}
void WebServer::DealRead_(HttpConn* client) {
assert(client);
// ExtentTime_(client);
threadpool_->submit(std::bind(&WebServer::OnRead_, this, client));
}
void WebServer::DealWrite_(HttpConn* client) {
assert(client);
// ExtentTime_(client);
threadpool_->submit(std::bind(&WebServer::OnWrite_, this, client));
}
// void WebServer::ExtentTime_(HttpConn* client) {
// assert(client);
// if(timeoutMS_ > 0) { timer_->adjust(client->GetFd(), timeoutMS_); }
// }
void WebServer::OnRead_(HttpConn* client) {
assert(client);
int ret = -1;
int readErrno = 0;
ret = client->read(&readErrno);
if(ret <= 0 && readErrno != EAGAIN) {
CloseConn_(client);
return;
}
OnProcess(client);
}
void WebServer::OnProcess(HttpConn* client) {
if(client->process()) {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
} else {
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLIN);
}
}
void WebServer::OnWrite_(HttpConn* client) {
assert(client);
int ret = -1;
int writeErrno = 0;
ret = client->write(&writeErrno);
if(client->ToWriteBytes() == 0) {
/* 传输完成 */
if(client->IsKeepAlive()) {
OnProcess(client);
return;
}
}
else if(ret < 0) {
if(writeErrno == EAGAIN) {
/* 继续传输 */
epoller_->ModFd(client->GetFd(), connEvent_ | EPOLLOUT);
return;
}
}
CloseConn_(client);
}
/* Create listenFd */
bool WebServer::InitSocket_() {
int ret;
struct sockaddr_in addr;
if(port_ > 65535 || port_ < 1024) {
LOG_ERROR("Port:%d error!", port_);
return false;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port_);
struct linger optLinger = { 0 };
if(openLinger_) {
/* 优雅关闭: 直到所剩数据发送完毕或超时 */
optLinger.l_onoff = 1;
optLinger.l_linger = 1;
}
listenFd_ = socket(AF_INET, SOCK_STREAM, 0);
if(listenFd_ < 0) {
LOG_ERROR("Create socket error!", port_);
return false;
}
ret = setsockopt(listenFd_, SOL_SOCKET, SO_LINGER, &optLinger, sizeof(optLinger));
if(ret < 0) {
close(listenFd_);
LOG_ERROR("Init linger error!", port_);
return false;
}
int optval = 1;
/* 端口复用 */
/* 只有最后一个套接字会正常接收数据。 */
ret = setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, (const void*)&optval, sizeof(int));
if(ret == -1) {
LOG_ERROR("set socket setsockopt error !");
close(listenFd_);
return false;
}
ret = bind(listenFd_, (struct sockaddr *)&addr, sizeof(addr));
if(ret < 0) {
LOG_ERROR("Bind Port:%d error!", port_);
close(listenFd_);
return false;
}
ret = listen(listenFd_, 6);
if(ret < 0) {
LOG_ERROR("Listen port:%d error!", port_);
close(listenFd_);
return false;
}
ret = epoller_->AddFd(listenFd_, listenEvent_ | EPOLLIN);
if(ret == 0) {
LOG_ERROR("Add listen error!");
close(listenFd_);
return false;
}
SetFdNonblock(listenFd_);
LOG_INFO("Server port:%d", port_);
return true;
}
int WebServer::SetFdNonblock(int fd) {
assert(fd > 0);
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK);
}
引用
- epoll [https://subingwen.cn/linux/epoll/]
- code [https://github.com/Aged-cat/WebServer]