muduo源码解析30-网络库8:tcpserver类

tcpserver:

说明:

之前说的acceptor负责接收连接,tcpconnection负责对这个连接进行操作。

那么这两个合起来就有一个tcpserver的基本架构了

tcpserver使用acceptor来接受一个连接,使用tcpconnection来对这个连接进行处理。

tcpserver.h

/*
TcpServer实现了对于TCP服务的封装,功能是管理accept获得的TcpConnection
TcpServer是供用户直接使用的,生命期由用户控制,用户只需要设置好callback函数,再调用start即可

tcpserver维护当前所有的tcpconenction集合,便于对其进行管理
tcpserver构造函数中完成了对于acceptor类对象的构造,因此socket(),bind()操作在
tcpserver构造时就已经做好,而listen()则在tcpserver::start()中做好


构造函数中acceptor设置发生连接事件的回调函数就是tcpserver::newConnection,也就是说
acceptor中的acceptorsocket发生了可读网络事件(连接到来),首先acceptorcahnnel会调用
acceptor::handleRead()先accept()这个连接,此时连接已经接收完成.
但是还需要调用连接建立完成时的回调函数tcpserver::newConnection,在newConnection中
则是完成新建一个tcpconnection对象,并把它加入tcpconnection集合中来方便对所有的
tcpconnection连接进行管理.

需要注意一点就是,tcpserver并不是单线程的,其内部使用一个eventloopthreadpool
也就是说有多个IO线程,每个IO线程都有一个eventloop对象,因此也就有多个
while(1)
{
poll();
handleEvent();
}

这样的好处就是提高并发性,多个连接到来时,单eventloop可能会来不及处理.
这样子会带来一个问题,怎么统计当前所有连接进来的客户机呢?因此是多线程处理IO,
每个线程都有一个poller::m_pollfds,只对该线程的套接字集合进行管理,而tcpserver
如何知道哪些套接字正处于链接呢?
tcpserver使用tcpconenction来维护一个tcp连接集合,每次acceptor接受一个新的连接时,会
回调tcpserver::newConnection()新建一个tcpconnection加入到tcp连接集合,
每次断开连接时,会回调tcpserver::removeConnection()把退出的tcpconnection从tcp
连接集合中删除.


*/

#ifndef TCPSERVER_H
#define TCPSERVER_H

#include"base/atomic.h"
#include"base/types.h"
#include"net/tcpconnection.h"

#include<map>

namespace mymuduo {

namespace net {

class acceptor;
class eventloop;
class eventloopthreadpool;

class tcpserver
{
public:
    typedef std::function<void(eventloop*)> ThreadInitCallback;

    //端口复用/地址复用
    enum Option
    {
        kNoReusePort,
        kReusePort
    };

    tcpserver(eventloop* loop,const inetaddress& listenAddr,const string& name,
              Option option=kNoReusePort);
    ~tcpserver();

    //获得ipPort,name,eventloop*
    const string& ipPort()const{return m_ipPort;}
    const string& name() const{return m_name;}
    eventloop* getLoop() const{return m_loop;}

    //设置线程池内线程数量,设置线程回调函数
    void setThreadNum(int numThreads);
    void setThreadInitCallback(const ThreadInitCallback& cb){m_threadInitCallback=cb;}
    std::shared_ptr<eventloopthreadpool> threadPool(){return m_threadPool;}

    //tcpserver启动...
    void start();

    //设置连接建立时,读消息时,写完成时的回调函数
    void setConnectionCallback(const ConnectionCallback& cb){m_connectionCallback=cb;}
    void setMessageCallback(const MessageCallback& cb){m_messageCallback=cb;}
    void setWriteCompleteCallback(const WriteCompleteCallback& cb){m_writeCompleteCallback=cb;}


private:

    //新建一个连接,accpetor默认的接受连接时的回调函数
    void newConnection(int sockfd,const inetaddress& peerAddr);
    //移除一个连接
    void removeConnection(const TcpConnectionPtr& conn);
    //loop内部移除一个连接
    void removeConnectionInLoop(const TcpConnectionPtr& conn);

    //每个tcpconnection都有一个名字
    typedef std::map<string, TcpConnectionPtr> ConnectionMap;

    eventloop* m_loop;                  //tcpserver所在的那个eventloop
    const string m_ipPort;              //tcpserver的ipPort
    const string m_name;                //name
     //acceptor智能指针,用于创建套接字,绑定,监听和接收一个连接,
    std::unique_ptr<acceptor> m_acceptor;
    std::shared_ptr<eventloopthreadpool> m_threadPool;  //eventloop线程池智能指针
    ConnectionCallback m_connectionCallback;        //建立连接时的回调函数
    MessageCallback m_messageCallback;              //读消息时的回调函数
    WriteCompleteCallback m_writeCompleteCallback;  //写事件完成时的回调函数
    ThreadInitCallback m_threadInitCallback;        //线程初始化回调
    atomicInt32 m_start;                            //tcpserver是否开始
    int m_nextConnId;                               //先一个连接client的fd
    ConnectionMap m_connections;     //名字到tcpconenction的映射

};

}//namespace net

}//namespace mymuduo

#endif // TCPSERVER_H

tcpserver.cpp

#include "tcpserver.h"

#include"base/logging.h"
#include"net/acceptor.h"
#include"net/eventloop.h"
#include"eventloopthreadpool.h"
#include"net/socketsops.h"

namespace mymuduo {

namespace net {

//构造函数,初始化成员,设置默认连接建立回调,读消息回调
//acceptor完成套接字的创建,绑定,监听和接受一个新的连接操作
tcpserver::tcpserver(eventloop* loop,const inetaddress& listenAddr,
                     const string& name,Option option)
    :m_loop(loop),m_ipPort(listenAddr.toIpPort()),m_name(name),
      m_acceptor(new acceptor(loop,listenAddr,option==kReusePort)),
      m_threadPool(new eventloopthreadpool(loop,name)),
      m_connectionCallback(defaultConnectionCallback),
      m_messageCallback(defaultMessageCallback),
      m_nextConnId(1)
{
    //m_acceptor接受一个新的连接时,回调tcpserver::newConnection
    m_acceptor->setNewConnectionCallback(
                std::bind(&tcpserver::newConnection,this,_1,_2));
}

//析构函数,关闭所有tcpconnection时,让tcpconnection回调&tcpconnection::connectDestroyed关闭连接
tcpserver::~tcpserver()
{
    m_loop->assertInLoopThread();
    LOG_TRACE << "TcpServer::~TcpServer [" << m_name << "] destructing";

    for (auto& item : m_connections)
    {
      TcpConnectionPtr conn(item.second);
      item.second.reset();
      conn->getLoop()->runInLoop(
        std::bind(&tcpconnection::connectDestroyed, conn));
    }
}

//设置线程池线程数量
void tcpserver::setThreadNum(int numThreads)
{
    assert(0<=numThreads);
    m_threadPool->setThreadNum(numThreads);
}

//启动tcpserver,实质上启动内部线程池,并让eventloop回调acceptor::listen(),完成监听操作
void tcpserver::start()
{
    if(m_start.getAndSet(1)==0)
    {
        m_threadPool->start(m_threadInitCallback);
        assert(!m_acceptor->listenning());
        m_loop->runInLoop(std::bind(&acceptor::listen,get_pointer(m_acceptor)));
    }
}

//在新连接到达时,acceptor会先回调acceptor::handleRead()接受一个连接
//此时连接已经被acceptor接受完成,然后acceptor会调用&tcpserver::newConnection()
//调用的目的是把这个连接本身封装成一个tcpconnection,并把它加入到ConnectionMap,
//tcpconenction目的是便于对tcp连接进行管理
void tcpserver::newConnection(int sockfd, const inetaddress &peerAddr)
{
    m_loop->assertInLoopThread();
    //在线程池内找到一个eventloopthread,让其执行新建一个tcpconenction操作
    eventloop* ioloop=m_threadPool->getNextLoop();
    char buf[64];
    snprintf(buf,sizeof buf,"-%s#%d",ipPort().data(),m_nextConnId);
    ++m_nextConnId;
    string connName=m_name+buf;
    LOG_INFO << "TcpServer::newConnection [" << m_name
             << "] - new connection [" << connName
             << "] from " << peerAddr.toIpPort();
    inetaddress localAddr(sockets::getLocalAddr(sockfd));
    //新建一个tcp连接,这个操作在线程池内的ioloop线程中执行
    TcpConnectionPtr conn(new tcpconnection(ioloop,connName,sockfd,localAddr,peerAddr));
    m_connections[connName]=conn;   //更新tcpserver内部连接集合
    //设置tcpconnection的四个回调函数
    conn->setConnectionCallback(m_connectionCallback);
    conn->setMessageCallback(m_messageCallback);
    conn->setWriteCompleteCallback(m_writeCompleteCallback);
    conn->setCloseCallback(std::bind(&tcpserver::removeConnection,this,_1));
    //让eventloop回调tcpconnection::connectEstablished完成连接建立完成时操作
    ioloop->runInLoop(std::bind(&tcpconnection::connectEstablished,conn));
}

//conn关闭时eventloop回调tcpserver::removeConnectionInLoop用于关闭tcp连接
void tcpserver::removeConnection(const TcpConnectionPtr &conn)
{
    m_loop->runInLoop(std::bind(&tcpserver::removeConnectionInLoop,this,conn));
}

//
void tcpserver::removeConnectionInLoop(const TcpConnectionPtr &conn)
{
    m_loop->assertInLoopThread();
    LOG_INFO << "TcpServer::removeConnectionInLoop [" << m_name
             << "] - connection " << conn->name();
    //在m_connections中移除conn
    size_t n=m_connections.erase(conn->name());
    eventloop* ioloop=conn->getLoop();
    //让eventloop回调tcpconnection::connectDestroyed完成tcpconenction的销毁
    ioloop->queueInLoop(std::bind(&tcpconnection::connectDestroyed,conn));

}

}//namespace net

}//namespace mymuduo

测试:

#include "net/tcpserver.h"

#include "base/logging.h"
#include "base/thread.h"
#include "net/eventloop.h"
#include "net/inetaddress.h"

#include <utility>

#include <stdio.h>
#include <unistd.h>

using namespace mymuduo;
using namespace mymuduo::net;

int numthreads = 0;

class EchoServer
{
 public:
  EchoServer(eventloop* loop, const inetaddress& listenAddr)
    : loop_(loop),
      server_(loop, listenAddr, "EchoServer")
  {
    server_.setConnectionCallback(
        std::bind(&EchoServer::onConnection, this, _1));
    server_.setMessageCallback(
        std::bind(&EchoServer::onMessage, this, _1, _2, _3));
    server_.setThreadNum(numthreads);
  }

  void start()
  {
    server_.start();
  }
  // void stop();

 private:
  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_TRACE << conn->peerAddress().toIpPort() << " -> "
        << conn->localAddress().toIpPort() << " is "
        << (conn->connected() ? "UP" : "DOWN");
    LOG_INFO << conn->getTcpInfoString();

    conn->send("hello\n");
  }

  void onMessage(const TcpConnectionPtr& conn, buffer* buf, timestamp time)
  {
    string msg(buf->retrieveAllAsString());
    LOG_TRACE << conn->name() << " recv " << msg.size() << " bytes at " << time.toString();
    if (msg == "exit\n")
    {
      conn->send("bye\n");
      conn->shutdown();
    }
    if (msg == "quit\n")
    {
      loop_->quit();
    }
    //打印一下读到的数据
    LOG_INFO<<msg.data();
    conn->send(msg);
  }

  eventloop* loop_;
  tcpserver server_;
};

int main()
{

  LOG_INFO << "pid = " << getpid() << ", tid = " << currentthread::tid();
  LOG_INFO << "sizeof TcpConnection = " << sizeof(tcpconnection);

  numthreads = 8;

  eventloop loop;
  inetaddress listenAddr("192.168.1.103",12306);
  EchoServer server(&loop, listenAddr);

  server.start();

  loop.loop();
}

实现了一个echo服务器,可以接受连接并echo客户机发送过来的消息。

 

上一篇:muduo echo服务器测试


下一篇:muduo库学习笔记十三:base库之ThreadLocalSingleton