这里我们用到了 ET 模式。
-
Reactor.hpp
文件:
#pragma once
#include <string>
#include <unordered_map>
#include "Connection.hpp"
#include "Epoller.hpp"
// TcpServer就是Reactor(反应堆)
// class TcpServer // 对Connection和Epoller管理就行
class Reactor
{
const static int gnum = 64;
public:
Reactor() : _is_running(false) {}
void AddConnection(int sockfd, uint32_t events, func_t recver, func_t sender, func_t excepter)
{
// 1.构建Connection对象
Connection *conn = new Connection(sockfd);
conn->SetEvent(events);
conn->Register(recver, sender, excepter);
conn->SetSelf(this);
// 2.向内核表示对fd的关心
_epoller.AddEvent(conn->SockFd(), conn->Events());
// std::cout << "sockfd : " << sockfd << " , events : " << (events & EPOLLIN) << std::endl;
// 3.向_connections添加Connection对象
_connections.insert(std::make_pair(conn->SockFd(), conn));
}
bool ConnectionIsExist(int sockfd)
{
auto iter = _connections.find(sockfd);
return iter != _connections.end();
}
void EnableReadWrite(int sockfd, bool wr, bool rd)
{
uint32_t events = (wr ? EPOLLOUT : 0) | (rd ? EPOLLIN : 0) | EPOLLET;
if (ConnectionIsExist(sockfd))
{
// 修改对事件的关心
_connections[sockfd]->SetEvent(events);
// 设置到内核
_epoller.ModEvent(sockfd, events);
}
}
void RemoveConnection(int sockfd)
{
if (!ConnectionIsExist(sockfd))
return;
// 解除对文件描述符的关心
_epoller.DelEvent(sockfd);
// 关闭文件描述符
::close(sockfd);
// 去除该连接
delete _connections[sockfd];
_connections.erase(sockfd);
}
// 一次派发
void LoopOnce(int timeout)
{
int n = _epoller.Wait(recv, gnum, timeout); // n个事件就绪
for (int i = 0; i < n; i++)
{
int sockfd = recv[i].data.fd;
uint32_t revents = recv[i].events;
// std::cout << "sockfd : " << sockfd << " , revents : " << revents << std::endl;
// 挂起或者出错了转为读写事件就绪
if (revents & EPOLLHUP)
revents |= (EPOLLIN | EPOLLOUT);
if (revents & EPOLLERR)
revents |= (EPOLLIN | EPOLLOUT);
// 读事件就绪
if (revents & EPOLLIN)
{
// 文件描述符得在_connections存在(比如客户端可能退出了,这个文件描述符就没有了)
if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_recver != nullptr))
_connections[sockfd]->_recver(_connections[sockfd]); // 处理读事件就绪,这里_recver已经在AddConnection注册了!
}
// 写事件就绪
if (revents & EPOLLOUT)
{
if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_sender != nullptr))
_connections[sockfd]->_sender(_connections[sockfd]); // 处理写事件就绪,这里_sender已经在AddConnection注册了!
}
}
}
// 只负责事件派发
void Despatcher()
{
_is_running = true;
int timeout = -1; // 阻塞等
while (true)
{
LoopOnce(timeout);
// 处理其他事情
Debug();
}
_is_running = false;
}
void Debug()
{
for (auto &connection : _connections)
{
std::cout << "------------------------------------" << std::endl;
std::cout << "fd : " << connection.second->SockFd() << " , ";
uint32_t events = connection.second->Events();
if ((events & EPOLLIN) && (events & EPOLLET))
std::cout << "EPOLLIN | EPOLLET";
if ((events & EPOLLIN) && (events & EPOLLET))
std::cout << "EPOLLIN | EPOLLET";
std::cout << std::endl;
}
std::cout << "------------------------------------" << std::endl;
}
~Reactor() {}
private:
std::unordered_map<int, Connection *> _connections; // 保存fd 和 对应的连接
Epoller _epoller;
struct epoll_event recv[gnum];
bool _is_running;
};
-
Socket.hpp
文件:
#pragma once
#include <string.h>
#include <memory>
#include "Log.hpp"
#include "Comm.hpp"
namespace socket_ns
{
const static int gbacklog = 8;
class Socket;
using socket_sptr = std::shared_ptr<Socket>; // 定义智能指针,以便于后面多态
// 使用
// std::unique_ptr<Socket> listensocket = std::make_unique<TcpSocket>();
// listensocket->BuildListenSocket();
// socket_sptr retsock = listensocket->Accepter();
// retsock->Recv();
// retsock->Send();
// std::unique_ptr<Socket> clientsocket = std::make_unique<TcpSocket>();
// clientsocket->BuildClientSocket();
// clientsocket->Send();
// clientsocket->Recv();
class Socket
{
public:
virtual void CreateSocketOrDie() = 0;
virtual void BindSocketOrDie(InetAddr &addr) = 0;
virtual void ListenSocketOrDie() = 0;
virtual int Accepter(InetAddr *addr, int *errcode) = 0;
virtual bool Connector(InetAddr &addr) = 0;
virtual void SetSocketAddrReuse() = 0;
virtual int SockFd() = 0;
virtual ssize_t Recv(std::string *out) = 0;
virtual ssize_t Send(std::string &in) = 0;
// virtual void Other() = 0;
public:
void BuildListenSocket(InetAddr &addr)
{
CreateSocketOrDie();
SetSocketAddrReuse();
BindSocketOrDie(addr);
ListenSocketOrDie();
}
bool BuildClientSocket(InetAddr &addr)
{
CreateSocketOrDie();
return Connector(addr);
}
};
class TcpSocket : public Socket
{
public:
TcpSocket(int sockfd = -1) : _socktfd(sockfd)
{
}
virtual void SetSocketAddrReuse() override
{
int opt = 1;
::setsockopt(_socktfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
}
virtual void CreateSocketOrDie() override
{
// 创建
_socktfd = socket(AF_INET, SOCK_STREAM, 0); // 这个就是文件描述符
if (_socktfd < 0)
{
LOG(FATAL, "create sockfd error, error code : %d, error string : %s", errno, strerror(errno));
exit(CREATE_ERROR);
}
LOG(INFO, "create sockfd success");
}
virtual void BindSocketOrDie(InetAddr &addr) override
{
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(addr.Port());
local.sin_addr.s_addr = INADDR_ANY;
// 绑定
int n = ::bind(_socktfd, CONV(&local), sizeof(local));
if (n < 0)
{
LOG(FATAL, "bind sockfd error, error code : %d, error string : %s", errno, strerror(errno));
exit(BIND_ERROR);
}
LOG(INFO, "bind sockfd success");
}
virtual void ListenSocketOrDie() override
{
// 监听
int ret = ::listen(_socktfd, gbacklog);
if (ret < 0)
{
LOG(FATAL, "listen error, error code : %d , error string : %s", errno, strerror(errno));
exit(LISTEN_ERROR);
}
LOG(INFO, "listen success!");
}
virtual int Accepter(InetAddr *addr, int *errcode) override
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 获取新连接
int newsockfd = accept(_socktfd, CONV(&peer), &len); // 建立连接成功,创建新文件描述符进行通信
*errcode = errno;
LOG(DEBUG, "errno : ", errno);
if (newsockfd < 0)
{
LOG(WARNING, "accept error, error code : %d , error string : %s", errno, strerror(errno));
return -1;
}
LOG(INFO, "accept success! new sockfd : %d", newsockfd);
SetNonBlock(_socktfd); // 这里不是newsockfd,这里是对listensock进行非阻塞
*addr = peer;
// socket_sptr sock = std::make_shared<TcpSocket>(newsockfd); // 创建新的文件描述符,传出去以便于后面的Recv和Send
return newsockfd;
}
virtual bool Connector(InetAddr &addr) override
{
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(addr.Port());
local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
// 发起连接
int n = ::connect(_socktfd, CONV(&local), sizeof(local));
if (n < 0)
{
LOG(WARNING, "create connect error, error code : %d, error string : %s", errno, strerror(errno));
return false;
}
LOG(INFO, "create connect success");
return true;
}
virtual int SockFd() override
{
return _socktfd;
}
virtual ssize_t Recv(std::string *out) override
{
char buff[1024];
ssize_t n = recv(_socktfd, buff, sizeof(buff) - 1, 0);
if (n > 0)
{
buff[n] = 0;
*out += buff; // 方便当数据到来不是刚好1条数据的时候,进行合并后来的数据
}
return n;
}
virtual ssize_t Send(std::string &in) override
{
ssize_t n = send(_socktfd, in.c_str(), in.size(), 0);
return n;
}
private:
int _socktfd; // 用同一个_socket
};
}
-
Connection.hpp
文件:
#pragma once
#include <string>
#include <functional>
#include <sys/epoll.h>
#include "InetAddr.hpp"
class Reactor;
class Connection;
using func_t = std::function<void(Connection *)>;
class Connection
{
public:
Connection(int sockfd) : _sockfd(sockfd), _R(nullptr) {}
void SetEvent(uint32_t events)
{
_events = events;
}
void Register(func_t recver, func_t sender, func_t excepter)
{
_recver = recver;
_sender = sender;
_excepter = excepter;
}
void SetSelf(Reactor *R)
{
_R = R;
}
int SockFd()
{
return _sockfd;
}
void AppendInbuff(const std::string &buff)
{
_inbuffer += buff;
}
void AppendOutbuff(const std::string &buff)
{
_outbuffer += buff;
}
std::string &Inbuffer() // 返回引用,后面Decode得字符串切割
{
return _inbuffer;
}
std::string &Outbuffer() // 返回引用,后面Decode得字符串切割
{
return _outbuffer;
}
void OutBufferRemove(int n)
{
_outbuffer.erase(0, n);
}
uint32_t Events()
{
return _events;
}
~Connection() {}
private:
int _sockfd;
// 输入输出缓冲区
std::string _inbuffer;
std::string _outbuffer;
// 已经准备好的事件
uint32_t _events;
InetAddr _clientaddr;
public:
// 处理事件
func_t _recver;
func_t _sender;
func_t _excepter;
Reactor *_R;
};
-
Epoller.hpp
文件:
#pragma once
#include <sys/epoll.h>
#include "Log.hpp"
class Epoller
{
bool EventCore(int sockfd, uint32_t event, int type)
{
struct epoll_event ep_event;
ep_event.data.fd = sockfd;
ep_event.events = event;
int n = ::epoll_ctl(_epfd, type, sockfd, &ep_event);
if (n < 0)
{
LOG(ERROR, "epoll_ctl error");
return false;
}
LOG(DEBUG, "epoll_ctl add %d fd success", sockfd);
return true