Linux–多路转接之epoll
Reactor反应堆模式
Reactor反应堆模式是一种事件驱动的设计模式,通常用于处理高并发的I/O操作,尤其是在服务器或网络编程中。
基本概念
Reactor模式又称之为响应器模式,基于事件多路复用机制,使得单个线程能够同时管理大量并发连接,而不需要为每个连接创建一个独立的线程。它通过一个事件分发器(Reactor)来监听和管理不同的I/O事件,当事件发生时,分发器会将该事件分发给对应的事件处理器来处理。
核心组件
- 事件分发器(Reactor):负责监听各种事件源(如socket、文件描述符)并将事件分发给相应的处理器。事件分发器通常使用I/O多路复用机制(如select、poll、epoll)来同时监听多个I/O事件。
- 事件处理器(Event Handler):定义了如何处理特定事件。当事件分发器检测到某个事件时,就会触发相应的事件处理器中的回调函数。
- 同步事件分离器(Demultiplexer):本质上是系统调用,用于监听事件源上的事件,并将事件通知给事件分发器。例如,在Linux中,可以使用select、poll或epoll等系统调用来实现同步事件分离器。
工作流程
- 注册事件:事件分发器注册需要监听的I/O事件(如连接、读写),并关联相应的事件处理器。
- 进入循环:事件分发器进入循环,使用I/O多路复用机制来监听注册的I/O事件。
- 分发事件:一旦某个I/O事件发生,事件分发器会将该事件分发给对应的事件处理器。
-
处理事件:事件处理器执行预定义的操作来处理该事件。处理完成后,可能会重新注册事件或关闭连接。
epoll服务器(ET)
服务器监听一个指定的端口,当有新的连接请求到来时,服务器接受连接并将其注册到Reactor中,以便处理后续的数据读写事件。
Socket.hpp
包含了一个抽象基类 Socket 和一个继承自 Socket 的具体实现类 TcpSocket。提供一个面向对象的网络套接字编程接口,允许用户通过继承和实现基类中的纯虚函数来创建不同类型的套接字(例如 TCP 套接字)。
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
#include "Comm.hpp"
namespace socket_ns
{
class Socket;
const static int gbacklog=8;//默认最大连接数
using socket_sptr=std::shared_ptr<Socket>;//套接字指针
enum
{
SOCKET_ERROR = 1,
BIND_ERROR,
LISTEN_ERROR,
USAGE_ERROR
};
//在基类创建一系列虚函数,只要派生类能用到就在这里创建
class Socket
{
public:
virtual void CreateSocketOrDie() =0; //创建套接字
virtual void BindSocketOrDie(InetAddr& addr) =0; //绑定套接字
virtual void ListenSocketOrDie()=0; //监听套接字
virtual int Accepter(InetAddr* addr,int* code) =0; //接受客户端
virtual bool Connector(InetAddr &addr) = 0; //连接客户端
virtual int SockFd() = 0; //获取Sockfd
virtual int Recv(std::string *out) = 0; //接收对方信息
virtual int Send(const std::string &in) = 0; //发送给对方信息
virtual void Close()=0; //关闭对应文件
public:
//创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建
void BuildListenSocket(InetAddr& addr)
{
CreateSocketOrDie();
BindSocketOrDie(addr);
ListenSocketOrDie();
}
bool BuildClientSocket(InetAddr &addr)
{
CreateSocketOrDie();
return Connector(addr);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int sockfd=-1)
:_sockfd(sockfd)
{}
void CreateSocketOrDie() override //override明确的重写基类函数
{
_sockfd=socket(AF_INET,SOCK_STREAM,0);
if(_sockfd<0)
{
LOG(FATAL, "socket error");
exit(SOCKET_ERROR);
}
SetNonBlock(_sockfd);
LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
}
void BindSocketOrDie(InetAddr& addr) override
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(addr.Port());
local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind error");
exit(BIND_ERROR);
}
LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
}
void ListenSocketOrDie() override
{
int n=listen(_sockfd,gbacklog);
if (n < 0)
{
LOG(FATAL, "listen error");
exit(LISTEN_ERROR);
}
LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
}
int Accepter(InetAddr* addr,int* code) override
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);
*code=errno;
if (sockfd < 0)
{
LOG(WARNING, "accept error\n");
return -1;
}
*addr=peer;
SetNonBlock(sockfd);
//socket_sptr sock=std::make_shared<TcpSocket>(sockfd);
return sockfd;
}
virtual bool Connector(InetAddr& addr)
{
struct sockaddr_in server;
memset(&server,0,sizeof(server));
server.sin_family=AF_INET;
server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
server.sin_port=htons(addr.Port());
int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));
if (n < 0)
{
std::cerr << "connect error" << std::endl;
return false;
}
return true;
}
int Recv(std::string *out) override
{
char inbuffer[1024];
ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
if (n > 0)
{
inbuffer[n] = 0;
*out += inbuffer; // 接收次数可能不只一次,一般是多次的,
}
return n;
}
int Send(const std::string &in) override
{
int n = send(_sockfd,in.c_str(),in.size(),0);
return n;
}
int SockFd() override
{
return _sockfd;
}
void Close() override
{
if (_sockfd > -1)
::close(_sockfd);
}
~TcpSocket()
{}
private:
int _sockfd;
};
}
代码和之前不一样的地方是实现了非阻塞套接字的设置
Calculate.hpp
用于执行基本的算术运算
#pragma once
#include <iostream>
#include "ProToCol.hpp"
using namespace protocol_ns;
class Calculate
{
public:
Calculate()
{
}
//根据输入的请求通过实际计算转换为结果
Response Excute(const Request &req)
{
Response resp(0, 0);
switch (req._oper)
{
case '+':
resp._result = req._x + req._y;
break;
case '-':
resp._result = req._x - req._y;
break;
case '*':
resp._result = req._x * req._y;
break;
case '/':
{
if (req._y == 0)
{
resp._code = 1;
}
else
{
resp._result = req._x / req._y;
}
}
break;
case '%':
{
if (req._y == 0)
{
resp._code = 2;
}
else
{
resp._result = req._x % req._y;
}
}
break;
default:
resp._code = 3;
break;
}
return resp;
}
~Calculate()
{
}
private:
};
protocol.hpp
用于处理网络通信中数据序列化和反序列化、编码和解码以及请求和响应对象生成的类和函数.
#pragma once
#include <iostream>
#include <string>
#include<unistd.h>
#include<memory>
#include<jsoncpp/json/json.h>
namespace protocol_ns
{
// 协议的样子:
// 报文 = 报头+有效载荷
// "有效载荷的长度"\r\n"有效载荷"\r\n
const std::string SEP= "\r\n";
// 解决TCP的粘报问题,TCP 读取不全的问题
std::string Encode(const std::string &json_str)
{
int json_str_len = json_str.size(); //有效载荷的长度
std::string proto_str = std::to_string(json_str_len); //转为string
proto_str += SEP; //+ 分隔符
proto_str += json_str;// + 数据字符串
proto_str += SEP;// + 分隔符
return proto_str; //返回一个报文
}
//将报文分析出数据字符串出来
std::string Decode(std::string &inbuffer)
{
auto pos = inbuffer.find(SEP); //找到分隔符的位置
if (pos == std::string::npos)
return std::string();
std::string len_str = inbuffer.substr(0, pos);//前头的有效数据长度的字符串
if (len_str.empty())
return std::string();
int packlen = std::stoi(len_str);//记录数据字符串的实际长度(传递时的差错主要出在这里)
int total = packlen + len_str.size() + 2 * SEP.size(); //报文总长度
if (inbuffer.size() < total)
return std::string();
std::string package = inbuffer.substr(pos + SEP.size(), packlen); //取出数据字符串
inbuffer.erase(0, total); //删除掉原先的报文
return package;
}
//请求将我们的数据序列化和反序列化(客户端)
class Request
{
public:
Request()
{
}
Request(int x, int y, char oper) : _x(x), _y(y), _oper(oper)
{
}
//序列化:将结构体数据转换为字符串
bool Serialize(std::string* out)
{
Json::Value root; //Json::Value: Json格式的值
root["x"] = _x;
root["y"] = _y;
root["oper"] = _oper;
Json::FastWriter writer;
*out=writer.write(root); //将Json值转换为字符串
return true;
}
//反序列化:将字符串转换为结构体数据
bool DeSerialize(const std::string& in)
{
Json::Value root;
Json::Reader reader;//解析字符串
bool res=reader.parse(in,root);//将字符串转为Json值,存放于root中
if (!res)
return false;
//再将Json值转为结构体数据
_x = root["x"].asInt();
_y = root["y"].asInt();
_oper = root["oper"].asInt();
return true;
}
public:
int _x;
int _y;
char _oper; //操作符 _x 加减乘除 _y
};
//将结果序列化和反序列化(服务端)
class Response
{
public:
Response()
{
}
Response(int result, int code) : _result(result), _code(code)
{
}
bool Serialize(std::string *out)
{
// 转换成为字符串
Json::Value root;
root["result"] = _result;
root["code"] = _code;
Json::FastWriter writer;
// Json::StyledWriter writer;
*out = writer.write(root);
return true;
}
bool Deserialize(const std::string &in)
{
Json::Value root;
Json::Reader reader;
bool res = reader.parse(in, root);
if (!res)
return false;
_result = root["result"].asInt();
_code = root["code"].asInt();
return true;
}
public:
int _result; // 结果
int _code; // 0:success 1: 除0 2: 非法操作 3. 4. 5
};
//创建需求
class Factory
{
public:
Factory()
{
srand(time(nullptr) ^ getpid());
opers = "+-*/%^&|";
}
std::shared_ptr<Request> BuildRequest()
{
int x = rand() % 10 + 1;
usleep(x * 10);
int y = rand() % 5; // [0,1,2,3,4]
usleep(y * x * 5