tcpserver:
说明:
之前说的acceptor负责接收连接,tcpconnection负责对这个连接进行操作。
那么这两个合起来就有一个tcpserver的基本架构了
tcpserver使用acceptor来接受一个连接,使用tcpconnection来对这个连接进行处理。
tcpserver.h
/* TcpServer实现了对于TCP服务的封装,功能是管理accept获得的TcpConnection TcpServer是供用户直接使用的,生命期由用户控制,用户只需要设置好callback函数,再调用start即可 tcpserver维护当前所有的tcpconenction集合,便于对其进行管理 tcpserver构造函数中完成了对于acceptor类对象的构造,因此socket(),bind()操作在 tcpserver构造时就已经做好,而listen()则在tcpserver::start()中做好 构造函数中acceptor设置发生连接事件的回调函数就是tcpserver::newConnection,也就是说 acceptor中的acceptorsocket发生了可读网络事件(连接到来),首先acceptorcahnnel会调用 acceptor::handleRead()先accept()这个连接,此时连接已经接收完成. 但是还需要调用连接建立完成时的回调函数tcpserver::newConnection,在newConnection中 则是完成新建一个tcpconnection对象,并把它加入tcpconnection集合中来方便对所有的 tcpconnection连接进行管理. 需要注意一点就是,tcpserver并不是单线程的,其内部使用一个eventloopthreadpool 也就是说有多个IO线程,每个IO线程都有一个eventloop对象,因此也就有多个 while(1) { poll(); handleEvent(); } 这样的好处就是提高并发性,多个连接到来时,单eventloop可能会来不及处理. 这样子会带来一个问题,怎么统计当前所有连接进来的客户机呢?因此是多线程处理IO, 每个线程都有一个poller::m_pollfds,只对该线程的套接字集合进行管理,而tcpserver 如何知道哪些套接字正处于链接呢? tcpserver使用tcpconenction来维护一个tcp连接集合,每次acceptor接受一个新的连接时,会 回调tcpserver::newConnection()新建一个tcpconnection加入到tcp连接集合, 每次断开连接时,会回调tcpserver::removeConnection()把退出的tcpconnection从tcp 连接集合中删除. */ #ifndef TCPSERVER_H #define TCPSERVER_H #include"base/atomic.h" #include"base/types.h" #include"net/tcpconnection.h" #include<map> namespace mymuduo { namespace net { class acceptor; class eventloop; class eventloopthreadpool; class tcpserver { public: typedef std::function<void(eventloop*)> ThreadInitCallback; //端口复用/地址复用 enum Option { kNoReusePort, kReusePort }; tcpserver(eventloop* loop,const inetaddress& listenAddr,const string& name, Option option=kNoReusePort); ~tcpserver(); //获得ipPort,name,eventloop* const string& ipPort()const{return m_ipPort;} const string& name() const{return m_name;} eventloop* getLoop() const{return m_loop;} //设置线程池内线程数量,设置线程回调函数 void setThreadNum(int numThreads); void setThreadInitCallback(const ThreadInitCallback& cb){m_threadInitCallback=cb;} std::shared_ptr<eventloopthreadpool> threadPool(){return m_threadPool;} //tcpserver启动... void start(); //设置连接建立时,读消息时,写完成时的回调函数 void setConnectionCallback(const ConnectionCallback& cb){m_connectionCallback=cb;} void setMessageCallback(const MessageCallback& cb){m_messageCallback=cb;} void setWriteCompleteCallback(const WriteCompleteCallback& cb){m_writeCompleteCallback=cb;} private: //新建一个连接,accpetor默认的接受连接时的回调函数 void newConnection(int sockfd,const inetaddress& peerAddr); //移除一个连接 void removeConnection(const TcpConnectionPtr& conn); //loop内部移除一个连接 void removeConnectionInLoop(const TcpConnectionPtr& conn); //每个tcpconnection都有一个名字 typedef std::map<string, TcpConnectionPtr> ConnectionMap; eventloop* m_loop; //tcpserver所在的那个eventloop const string m_ipPort; //tcpserver的ipPort const string m_name; //name //acceptor智能指针,用于创建套接字,绑定,监听和接收一个连接, std::unique_ptr<acceptor> m_acceptor; std::shared_ptr<eventloopthreadpool> m_threadPool; //eventloop线程池智能指针 ConnectionCallback m_connectionCallback; //建立连接时的回调函数 MessageCallback m_messageCallback; //读消息时的回调函数 WriteCompleteCallback m_writeCompleteCallback; //写事件完成时的回调函数 ThreadInitCallback m_threadInitCallback; //线程初始化回调 atomicInt32 m_start; //tcpserver是否开始 int m_nextConnId; //先一个连接client的fd ConnectionMap m_connections; //名字到tcpconenction的映射 }; }//namespace net }//namespace mymuduo #endif // TCPSERVER_H
tcpserver.cpp
#include "tcpserver.h" #include"base/logging.h" #include"net/acceptor.h" #include"net/eventloop.h" #include"eventloopthreadpool.h" #include"net/socketsops.h" namespace mymuduo { namespace net { //构造函数,初始化成员,设置默认连接建立回调,读消息回调 //acceptor完成套接字的创建,绑定,监听和接受一个新的连接操作 tcpserver::tcpserver(eventloop* loop,const inetaddress& listenAddr, const string& name,Option option) :m_loop(loop),m_ipPort(listenAddr.toIpPort()),m_name(name), m_acceptor(new acceptor(loop,listenAddr,option==kReusePort)), m_threadPool(new eventloopthreadpool(loop,name)), m_connectionCallback(defaultConnectionCallback), m_messageCallback(defaultMessageCallback), m_nextConnId(1) { //m_acceptor接受一个新的连接时,回调tcpserver::newConnection m_acceptor->setNewConnectionCallback( std::bind(&tcpserver::newConnection,this,_1,_2)); } //析构函数,关闭所有tcpconnection时,让tcpconnection回调&tcpconnection::connectDestroyed关闭连接 tcpserver::~tcpserver() { m_loop->assertInLoopThread(); LOG_TRACE << "TcpServer::~TcpServer [" << m_name << "] destructing"; for (auto& item : m_connections) { TcpConnectionPtr conn(item.second); item.second.reset(); conn->getLoop()->runInLoop( std::bind(&tcpconnection::connectDestroyed, conn)); } } //设置线程池线程数量 void tcpserver::setThreadNum(int numThreads) { assert(0<=numThreads); m_threadPool->setThreadNum(numThreads); } //启动tcpserver,实质上启动内部线程池,并让eventloop回调acceptor::listen(),完成监听操作 void tcpserver::start() { if(m_start.getAndSet(1)==0) { m_threadPool->start(m_threadInitCallback); assert(!m_acceptor->listenning()); m_loop->runInLoop(std::bind(&acceptor::listen,get_pointer(m_acceptor))); } } //在新连接到达时,acceptor会先回调acceptor::handleRead()接受一个连接 //此时连接已经被acceptor接受完成,然后acceptor会调用&tcpserver::newConnection() //调用的目的是把这个连接本身封装成一个tcpconnection,并把它加入到ConnectionMap, //tcpconenction目的是便于对tcp连接进行管理 void tcpserver::newConnection(int sockfd, const inetaddress &peerAddr) { m_loop->assertInLoopThread(); //在线程池内找到一个eventloopthread,让其执行新建一个tcpconenction操作 eventloop* ioloop=m_threadPool->getNextLoop(); char buf[64]; snprintf(buf,sizeof buf,"-%s#%d",ipPort().data(),m_nextConnId); ++m_nextConnId; string connName=m_name+buf; LOG_INFO << "TcpServer::newConnection [" << m_name << "] - new connection [" << connName << "] from " << peerAddr.toIpPort(); inetaddress localAddr(sockets::getLocalAddr(sockfd)); //新建一个tcp连接,这个操作在线程池内的ioloop线程中执行 TcpConnectionPtr conn(new tcpconnection(ioloop,connName,sockfd,localAddr,peerAddr)); m_connections[connName]=conn; //更新tcpserver内部连接集合 //设置tcpconnection的四个回调函数 conn->setConnectionCallback(m_connectionCallback); conn->setMessageCallback(m_messageCallback); conn->setWriteCompleteCallback(m_writeCompleteCallback); conn->setCloseCallback(std::bind(&tcpserver::removeConnection,this,_1)); //让eventloop回调tcpconnection::connectEstablished完成连接建立完成时操作 ioloop->runInLoop(std::bind(&tcpconnection::connectEstablished,conn)); } //conn关闭时eventloop回调tcpserver::removeConnectionInLoop用于关闭tcp连接 void tcpserver::removeConnection(const TcpConnectionPtr &conn) { m_loop->runInLoop(std::bind(&tcpserver::removeConnectionInLoop,this,conn)); } // void tcpserver::removeConnectionInLoop(const TcpConnectionPtr &conn) { m_loop->assertInLoopThread(); LOG_INFO << "TcpServer::removeConnectionInLoop [" << m_name << "] - connection " << conn->name(); //在m_connections中移除conn size_t n=m_connections.erase(conn->name()); eventloop* ioloop=conn->getLoop(); //让eventloop回调tcpconnection::connectDestroyed完成tcpconenction的销毁 ioloop->queueInLoop(std::bind(&tcpconnection::connectDestroyed,conn)); } }//namespace net }//namespace mymuduo
测试:
#include "net/tcpserver.h" #include "base/logging.h" #include "base/thread.h" #include "net/eventloop.h" #include "net/inetaddress.h" #include <utility> #include <stdio.h> #include <unistd.h> using namespace mymuduo; using namespace mymuduo::net; int numthreads = 0; class EchoServer { public: EchoServer(eventloop* loop, const inetaddress& listenAddr) : loop_(loop), server_(loop, listenAddr, "EchoServer") { server_.setConnectionCallback( std::bind(&EchoServer::onConnection, this, _1)); server_.setMessageCallback( std::bind(&EchoServer::onMessage, this, _1, _2, _3)); server_.setThreadNum(numthreads); } void start() { server_.start(); } // void stop(); private: void onConnection(const TcpConnectionPtr& conn) { LOG_TRACE << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " is " << (conn->connected() ? "UP" : "DOWN"); LOG_INFO << conn->getTcpInfoString(); conn->send("hello\n"); } void onMessage(const TcpConnectionPtr& conn, buffer* buf, timestamp time) { string msg(buf->retrieveAllAsString()); LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toString(); if (msg == "exit\n") { conn->send("bye\n"); conn->shutdown(); } if (msg == "quit\n") { loop_->quit(); } //打印一下读到的数据 LOG_INFO<<msg.data(); conn->send(msg); } eventloop* loop_; tcpserver server_; }; int main() { LOG_INFO << "pid = " << getpid() << ", tid = " << currentthread::tid(); LOG_INFO << "sizeof TcpConnection = " << sizeof(tcpconnection); numthreads = 8; eventloop loop; inetaddress listenAddr("192.168.1.103",12306); EchoServer server(&loop, listenAddr); server.start(); loop.loop(); }
实现了一个echo服务器,可以接受连接并echo客户机发送过来的消息。