Reactor 反应堆模式

这里我们用到了 ET 模式

  • Reactor.hpp文件:
#pragma once
#include <string>
#include <unordered_map>
#include "Connection.hpp"
#include "Epoller.hpp"

// TcpServer就是Reactor(反应堆)
// class TcpServer // 对Connection和Epoller管理就行
class Reactor
{
  const static int gnum = 64;

public:
  Reactor() : _is_running(false) {}

  void AddConnection(int sockfd, uint32_t events, func_t recver, func_t sender, func_t excepter)
  {
      // 1.构建Connection对象
      Connection *conn = new Connection(sockfd);
      conn->SetEvent(events);
      conn->Register(recver, sender, excepter);
      conn->SetSelf(this);

      // 2.向内核表示对fd的关心
      _epoller.AddEvent(conn->SockFd(), conn->Events());

      // std::cout << "sockfd : " << sockfd << " , events : " << (events & EPOLLIN) << std::endl;

      // 3.向_connections添加Connection对象
      _connections.insert(std::make_pair(conn->SockFd(), conn));
  }

  bool ConnectionIsExist(int sockfd)
  {
      auto iter = _connections.find(sockfd);

      return iter != _connections.end();
  }

  void EnableReadWrite(int sockfd, bool wr, bool rd)
  {
      uint32_t events = (wr ? EPOLLOUT : 0) | (rd ? EPOLLIN : 0) | EPOLLET;
      if (ConnectionIsExist(sockfd))
      {
          // 修改对事件的关心
          _connections[sockfd]->SetEvent(events);
          // 设置到内核
          _epoller.ModEvent(sockfd, events);
      }
  }

  void RemoveConnection(int sockfd)
  {
      if (!ConnectionIsExist(sockfd))
          return;
      // 解除对文件描述符的关心
      _epoller.DelEvent(sockfd);
      // 关闭文件描述符
      ::close(sockfd);
      // 去除该连接
      delete _connections[sockfd];
      _connections.erase(sockfd);
  }

  // 一次派发
  void LoopOnce(int timeout)
  {
      int n = _epoller.Wait(recv, gnum, timeout); // n个事件就绪
      for (int i = 0; i < n; i++)
      {
          int sockfd = recv[i].data.fd;
          uint32_t revents = recv[i].events;

          // std::cout << "sockfd : " << sockfd << " , revents : " << revents << std::endl;

          // 挂起或者出错了转为读写事件就绪
          if (revents & EPOLLHUP)
              revents |= (EPOLLIN | EPOLLOUT);
          if (revents & EPOLLERR)
              revents |= (EPOLLIN | EPOLLOUT);

          // 读事件就绪
          if (revents & EPOLLIN)
          {
              // 文件描述符得在_connections存在(比如客户端可能退出了,这个文件描述符就没有了)
              if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_recver != nullptr))
                  _connections[sockfd]->_recver(_connections[sockfd]); // 处理读事件就绪,这里_recver已经在AddConnection注册了!
          }
          // 写事件就绪
          if (revents & EPOLLOUT)
          {
              if (ConnectionIsExist(sockfd) && (_connections[sockfd]->_sender != nullptr))
                  _connections[sockfd]->_sender(_connections[sockfd]); // 处理写事件就绪,这里_sender已经在AddConnection注册了!
          }
      }
  }

  // 只负责事件派发
  void Despatcher()
  {
      _is_running = true;
      int timeout = -1; // 阻塞等
      while (true)
      {
          LoopOnce(timeout);
          // 处理其他事情
          Debug();
      }
      _is_running = false;
  }

  void Debug()
  {
      for (auto &connection : _connections)
      {
          std::cout << "------------------------------------" << std::endl;
          std::cout << "fd : " << connection.second->SockFd() << " , ";
          uint32_t events = connection.second->Events();
          if ((events & EPOLLIN) && (events & EPOLLET))
              std::cout << "EPOLLIN | EPOLLET";
          if ((events & EPOLLIN) && (events & EPOLLET))
              std::cout << "EPOLLIN | EPOLLET";
          std::cout << std::endl;
      }
      std::cout << "------------------------------------" << std::endl;
  }
  ~Reactor() {}

private:
  std::unordered_map<int, Connection *> _connections; // 保存fd 和 对应的连接
  Epoller _epoller;

  struct epoll_event recv[gnum];
  bool _is_running;
};
  • Socket.hpp文件:
#pragma once

#include <string.h>
#include <memory>

#include "Log.hpp"
#include "Comm.hpp"

namespace socket_ns
{
  const static int gbacklog = 8;

  class Socket;
  using socket_sptr = std::shared_ptr<Socket>; // 定义智能指针,以便于后面多态

  // 使用
  // std::unique_ptr<Socket> listensocket = std::make_unique<TcpSocket>();
  // listensocket->BuildListenSocket();
  // socket_sptr retsock = listensocket->Accepter();
  // retsock->Recv();
  // retsock->Send();

  // std::unique_ptr<Socket> clientsocket = std::make_unique<TcpSocket>();
  // clientsocket->BuildClientSocket();
  // clientsocket->Send();
  // clientsocket->Recv();

  class Socket
  {
  public:
      virtual void CreateSocketOrDie() = 0;
      virtual void BindSocketOrDie(InetAddr &addr) = 0;
      virtual void ListenSocketOrDie() = 0;
      virtual int Accepter(InetAddr *addr, int *errcode) = 0;
      virtual bool Connector(InetAddr &addr) = 0;
      virtual void SetSocketAddrReuse() = 0;
      virtual int SockFd() = 0;

      virtual ssize_t Recv(std::string *out) = 0;
      virtual ssize_t Send(std::string &in) = 0;
      // virtual void Other() = 0;

  public:
      void BuildListenSocket(InetAddr &addr)
      {
          CreateSocketOrDie();
          SetSocketAddrReuse();
          BindSocketOrDie(addr);
          ListenSocketOrDie();
      }

      bool BuildClientSocket(InetAddr &addr)
      {
          CreateSocketOrDie();
          return Connector(addr);
      }
  };

  class TcpSocket : public Socket
  {
  public:
      TcpSocket(int sockfd = -1) : _socktfd(sockfd)
      {
      }

      virtual void SetSocketAddrReuse() override
      {
          int opt = 1;
          ::setsockopt(_socktfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
      }

      virtual void CreateSocketOrDie() override
      {
          // 创建
          _socktfd = socket(AF_INET, SOCK_STREAM, 0); // 这个就是文件描述符
          if (_socktfd < 0)
          {
              LOG(FATAL, "create sockfd error, error code : %d, error string : %s", errno, strerror(errno));
              exit(CREATE_ERROR);
          }
          LOG(INFO, "create sockfd success");
      }
      virtual void BindSocketOrDie(InetAddr &addr) override
      {
          struct sockaddr_in local;
          bzero(&local, sizeof(local));
          local.sin_family = AF_INET;
          local.sin_port = htons(addr.Port());
          local.sin_addr.s_addr = INADDR_ANY;
          // 绑定
          int n = ::bind(_socktfd, CONV(&local), sizeof(local));
          if (n < 0)
          {
              LOG(FATAL, "bind sockfd error, error code : %d, error string : %s", errno, strerror(errno));
              exit(BIND_ERROR);
          }
          LOG(INFO, "bind sockfd success");
      }
      virtual void ListenSocketOrDie() override
      {
          // 监听
          int ret = ::listen(_socktfd, gbacklog);
          if (ret < 0)
          {
              LOG(FATAL, "listen error, error code : %d , error string : %s", errno, strerror(errno));
              exit(LISTEN_ERROR);
          }
          LOG(INFO, "listen success!");
      }
      virtual int Accepter(InetAddr *addr, int *errcode) override
      {
          struct sockaddr_in peer;
          socklen_t len = sizeof(peer);

          // 获取新连接
          int newsockfd = accept(_socktfd, CONV(&peer), &len); // 建立连接成功,创建新文件描述符进行通信

          *errcode = errno;
          LOG(DEBUG, "errno : ", errno);

          if (newsockfd < 0)
          {
              LOG(WARNING, "accept error, error code : %d , error string : %s", errno, strerror(errno));
              return -1;
          }
          LOG(INFO, "accept success! new sockfd : %d", newsockfd);

          SetNonBlock(_socktfd); // 这里不是newsockfd,这里是对listensock进行非阻塞

          *addr = peer;
          // socket_sptr sock = std::make_shared<TcpSocket>(newsockfd); // 创建新的文件描述符,传出去以便于后面的Recv和Send
          return newsockfd;
      }

      virtual bool Connector(InetAddr &addr) override
      {
          struct sockaddr_in local;
          bzero(&local, sizeof(local));
          local.sin_family = AF_INET;
          local.sin_port = htons(addr.Port());
          local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());

          // 发起连接
          int n = ::connect(_socktfd, CONV(&local), sizeof(local));
          if (n < 0)
          {
              LOG(WARNING, "create connect error, error code : %d, error string : %s", errno, strerror(errno));
              return false;
          }
          LOG(INFO, "create connect success");
          return true;
      }

      virtual int SockFd() override
      {
          return _socktfd;
      }

      virtual ssize_t Recv(std::string *out) override
      {
          char buff[1024];
          ssize_t n = recv(_socktfd, buff, sizeof(buff) - 1, 0);
          if (n > 0)
          {
              buff[n] = 0;
              *out += buff; // 方便当数据到来不是刚好1条数据的时候,进行合并后来的数据
          }
          return n;
      }
      virtual ssize_t Send(std::string &in) override
      {
          ssize_t n = send(_socktfd, in.c_str(), in.size(), 0);
          return n;
      }

  private:
      int _socktfd; // 用同一个_socket
  };
}
  • Connection.hpp文件:
#pragma once
#include <string>
#include <functional>
#include <sys/epoll.h>
#include "InetAddr.hpp"

class Reactor;
class Connection;
using func_t = std::function<void(Connection *)>;

class Connection
{
public:
  Connection(int sockfd) : _sockfd(sockfd), _R(nullptr) {}

  void SetEvent(uint32_t events)
  {
      _events = events;
  }

  void Register(func_t recver, func_t sender, func_t excepter)
  {
      _recver = recver;
      _sender = sender;
      _excepter = excepter;
  }

  void SetSelf(Reactor *R)
  {
      _R = R;
  }

  int SockFd()
  {
      return _sockfd;
  }

  void AppendInbuff(const std::string &buff)
  {
      _inbuffer += buff;
  }

  void AppendOutbuff(const std::string &buff)
  {
      _outbuffer += buff;
  }

  std::string &Inbuffer() // 返回引用,后面Decode得字符串切割
  {
      return _inbuffer;
  }

  std::string &Outbuffer() // 返回引用,后面Decode得字符串切割
  {
      return _outbuffer;
  }

  void OutBufferRemove(int n)
  {
      _outbuffer.erase(0, n);
  }

  uint32_t Events()
  {
      return _events;
  }

  ~Connection() {}

private:
  int _sockfd;

  // 输入输出缓冲区
  std::string _inbuffer;
  std::string _outbuffer;

  // 已经准备好的事件
  uint32_t _events;

  InetAddr _clientaddr;

public:
  // 处理事件
  func_t _recver;
  func_t _sender;
  func_t _excepter;

  Reactor *_R;
};
  • Epoller.hpp文件:
#pragma once
#include <sys/epoll.h>
#include "Log.hpp"

class Epoller
{
  bool EventCore(int sockfd, uint32_t event, int type)
  {
      struct epoll_event ep_event;
      ep_event.data.fd = sockfd;
      ep_event.events = event;
      int n = ::epoll_ctl(_epfd, type, sockfd, &ep_event);
      if (n < 0)
      {
          LOG(ERROR, "epoll_ctl error");
          return false;
      }
      LOG(DEBUG, "epoll_ctl add %d fd success", sockfd);
      return true
上一篇:Redis生产环境性能优化


下一篇:某星球预约抢票脚本