Connection --- 连接管理模块

目录

模块设计

模块实现

shared_from_this

模块测试纠错


模块设计

Connection模块是对通信连接也就是通信套接字的整体的管理模块,对连接的所有操作都是通过这个模块提供的接口来完成的。

那么他具体要进行哪些方面的管理呢?

首先每个通信连接都需要维护用户态的输入缓冲区的输出缓冲区,也就是Conenction内部需要包含两个Buffer对象。 那么针对缓冲区,需要提供什么接口呢?除去缓冲区本身的接口之外,我们的Connection需要提供 Send 接口,这个Send接口其实就是将要发送的数据写入到用户输出缓冲区中,并启动写事件监听。 

其次,还也需要对每个通信套接字本身做管理,能够完成套接字的种种操作,其实就是内部需要包含一个Socket对象,未来关闭套接字时能够直接调用Socket的接口

还需要对套接字的监听事件做管理,每一个通信套接字的Channel对象都是包含在Conenction内部的,那么我们的Conenction就需要提供几个关于事件管理的接口,需要能够设置几个事件的回调函数,以及设置需要监控的事件。

一个Connection 对象还需要保存协议解析的上下文信息,也就是一个Any对象,未来要提供接口用于获取内部的上下文信息的指针。

最后,一个Connection也需要保存回调函数,这些回调函数就是由用户设置给TcpServer模块,TcpServer模块再设置给Connection模块,而Connection内部再加一些自己的处理,在自己内部实现一些回调函数最终设置给 Channel 模块中,用于事件触发之后的回调处理。

那么大致需要提供哪些功能性接口给上层:

1  发送数据接口,其实就是把数据放到发送缓冲区,监听写事件,写事件触发时在调用Channel中的处理方法进行实际的发送,写入内核缓冲区

2 关闭连接接口,这个关闭连接的接口是提供给用户的关闭连接接口,但是他并不是直接调用Socket的Close接口,而是先要将输入和输出缓冲区的数据都处理完之后,再调用一个我们私有的真正关闭连接的接口

3 启动超时销毁功能

4 取消超时销毁功能

这两个接口都是提供给TcpServer模块的,同时TcpServer模块再提供给用户的接口

5 协议切换,协议其实就是规定一个数据到来之后该如何处理,它取决于上下文以及数据的业务处理函数,所以协议切换其实就是重新设置上下文以及重新设置一套回调函数。

这时候我们就需要考虑一个很重要的问题: Connection 对象如何管理? 

首先,所有的Connection 对象肯定是要交给TcpServer 模块进行整体的管理的,这个我们还是很好理解,因为未来我们的Acceptor获取到新连接之后,需要创建Connection对象,而创建Connection对象的接口就只能是TcpServer模块提供给他的,也就是由TcpServer来创建,创建之后,他也需要对所有的Connection进行管理。 

那么怎么管理呢?new一个Connection对象然后使用指针进行管理?那么会不会有这样一种情况:连接有多种操作,如果其中一个操作把连接关闭释放了,后续的操作再调用Conenction的接口不就是野指针访问了吗?那么程序会触发硬件异常而崩溃。这是我们不希望看到的。即便 TcpServer中可能还保存着这个对象的指针,但是指针指向的资源却可能已经释放了。

我们可以使用智能指针 share_ptr 来对Connction 进行管理。这样一来,我们能够保证,任何一个操作在进行的时候,只要函数栈帧已经展开,参数接受了Conenction的shared_ptr对象,那么在函数运行期间,该函数栈帧内的shared_ptr 对象至少还保留了一个Connection 的计数,该资源不会被立即释放,也就不会造成野指针的访问。就算其他的操作将连接释放了,也只是将shared_ptr的计数减一,或者说将TcpServer中所管理的Conncetion的基础的shared_ptr对象释放了,并不会实际释放Connection的资源。 

基于使用sharet_ptr来管理Connection的思想,我们未来设置接口的时候,就需要传递Connection 的智能指针对象或者引用了,而不是直接传递原始指针。

那么Connection类该如何设计?

他需要的成员:

每一个连接都需要有一个在服务器内部的唯一标识,也就是id,为什么不直接使用 fd ?因为我们对fd 的管理也不是直接使用原始的fd 的,而是使用Socket来管理,我们需要将其与系统的IO的部分进行解耦。 同时,未来这个连接的id也是她所对应的定时任务的id。

其次,他需要一个Channel对象、一个Socket对象,两个Buffer对象以及一个Any对象

再有,他需要保存四个回调函数,这四个回调函数是由用户设置的,分别是连接建立时执行的回调函数,新数据到来时执行的回调方法,连接关闭时执行的回调方法以及任意事件到来执行的回调方法。

由于Connection设计到超时的管理,那么我们还需要一个值来表示是否启动了超时管理。

以及每一个Cconnection对象需要和一个EventLoop对象绑定,所以他还需要一个EventLoop的指针。

这是几个简单的成员,我们还需要一个成员就是连接所处的状态。

 这个状态并不是站在内核层面的状态,而是站在应用层角度的状态。

状态有以下几种: 1 连接建立中: 此时我们已经从底层将连接的文件描述符获取上来,并为其创建了Connection对象,但是Connection内部的各项设置还未完成

                              2 连接建立完成,Connection对象的各项设置已经完成,可以进行通信了

                              3 连接待关闭状态,对端或者本端需要关闭这个连接,但是在实际关闭连接之前我们还需要把缓冲区的数据全部处理完

                              4 连接关闭状态,处于这个状态的时候,就可以把套接字关闭,以及资源释放了

那么这个状态我们可以使用枚举来定义。

enum CONN_STATU
{
    CLOSED,             //关闭状态,不再进行任何操作,释放资源
    CONNECTING,         //连接待处理,还需要进行各项设置才能开始监听通信
    CONNECTED,          //连接建立,可以通信
    CLOSING             //连接带关闭,尽快处理缓冲区剩余数据
};

同时还需要一个变量用来保存是否开启了超时释放机制,只需要用一个bool类型就行了。

那么Connection内部的成员大概就这些:

class Connection;
using PtrConnection = std::shared_ptr<Connection>;

class Connection :public std::enable_shared_from_this<Connection>
{
private:
    uint64_t _id;       //连接的id,也是连接对应的超时任务的id
    int _sockfd;        //保存对应的描述符
    EventLoop* _loop;   //绑定的EventLoop
    CONN_STATU _con_statu;  //连接状态
    Socket _socket;     //套接字操作模块
    Channel _channel;   //事件管理模块
    Buffer _in_buffer;  //输入缓冲区
    Buffer _out_buffer; //输出缓冲区
    Any _context;       //连接上下文
    bool _enable_inactive_release;    //标识是否开启非活跃连接超时释放机制

    using EventCallBack = std::function<void(const PtrConnection&)>;        //用户设置的任意事件回调
    using ConnectCallBack = std::function<void(const PtrConnection&)>;    //用户设置的连接建立回调
    using MessageCallBack = std::function<void(const PtrConnection&,Buffer*)>;    //用户设置的新数据回调,需要传递输入缓冲区  
    using CloseCallBack = std::function<void(const PtrConnection&)>;        //用户设置的连接关闭回调
    using SvrCloseCallBack = std::function<void(const PtrConnection&)>;        //TcpServer设置的连接关闭回调
    ConnectCallBack _connect_cb;
    MessageCallBack _message_cb;
    CloseCallBack _close_cb;
    EventCallBack _event_cb;
    SvrCloseCallBack _svr_close_cb;

而对于接口的设计,我们需要在实现的过程中再来一步一步设计,因为Connection的设计有点复杂,我们就算在这里列出来了也没意义。

模块实现

首先,我们的Connection需要基本的构造函数,在构造函数中,我们需要传入 id ,loop ,socket这三个基本的成员。同时,我们需要为channel对象设置四个回调函数。

    void HandlerRead(); //读事件回调
    void HandlerWrite();//写事件回调
    void HandlerClose();//挂断事件回调
    void HandlerError();//错误事件回调
    void HandlerEvent();//任意事件回调
public: //功能接口
    Connection(uint64_t id , int sockfd ,EventLoop* loop)
    :_id(id),_sockfd(sockfd),_loop(loop),_con_statu(CONNECTING),_socket(sockfd),_channel(_sockfd,_loop),_enable_inactive_release(false)
    {
        _channel.SetReadCallBack(std::bind(&Connection::HandlerRead,this));
        _channel.SetWriteCallBack(std::bind(&Connection::HandlerWrite,this));
        _channel.SetCloseCallBack(std::bind(&Connection::HandlerClose,this));
        _channel.SetErrorCallBack(std::bind(&Connection::HandlerError,this));
        _channel.SetEventCallBack(std::bind(&Connection::HandlerEvent,this));
    }

那么我们现在就需要实现五个回调函数,

首先读事件回调,他的思路很简单,只需要从套接字中读取数据,然后放在输入缓冲区,再来调用用户传进来的新数据回调函数就行了。

    void HandlerRead() //读事件回调
    {
        // 1 从套接字读取数据
#define READ_SIZE 65535
        char buffer[READ_SIZE] = {0};
        int ret = _socket.Read(buffer,READ_SIZE-1);
        if(ret < 0 )  //对Socket的Read做处理,只有真正出错的时候才返回 -1 ,其他的时候都返回 >= 0 的值
        {
            //说明套接字出错,那么此时也不能直接关闭连接,因为输出缓冲区中还有数据待发送,所以是调用ShutDown接口
            ShutDown();     //先处理剩余数据,再实际关闭连接
        }
        // 2 放入输入缓冲区
        _in_buffer.WriteAndPush(buffer,ret);
        // 3 调用新数据回调 
        if(_in_buffer.ReadSize())  //可能没读到数据,被信号打断了或者其他原因
        {
            if(_message_cb) _message_cb(shared_from_this(),&_in_buffer);   //shared_from_this() 会从当前对象的第一个创建的shared_ptr中进行拷贝
        }
    }

这里我们可能会疑惑,为什么调用完_message_cb 之后不添加写事件监控,也就是这样一行代码:

        if(_out_buffer.ReadSize()) _channel.EnableWrite(); //如果写入了新的数据,那么开启写事件监控

其实是因为,未来用户只能调用我们Connection提供的Send接口来发送数据,但是Send接口我们也懂,只会将数据写入到输出缓冲区中,我们在Send函数的实现中,只有实际写入了再来启动写事件的监控更加合理。 

shared_from_this

这里有一个细节,就是我们前面声明类的时候,Connection 类继承了 std::enable_shared_from_this这个类,这个继承关系是为了我们能够使用 shared_from_this这个功能。

这个用法叫做自我引用

为什么我们需要引进一个shared_from_this这样的接口呢? 

我们说了使用shared_ptr对所有的Connection对象进行管理,这样能够防止在操作的过程中资源被释放。 但是,我们在给 _message_cb 这样的回调函数传参的时候,如何保证传给他的shared_ptr对象是和管理Conenction 的shared_ptr的对象共享计数呢?

因为如果我们直接使用 shared_ptr<Connection> p (this) ,这样创建一个只能指针对象传参的时候,他的计数是独立的,并不会和TcpServer中管理Conenction的shared_ptr共享计数,那么我们就需要一个办法能够创建出一个和Conenction 的管理的shared_ptr对象共享技术的智能指针进行传参,而shared_from_this就可以解决这样的问题。

std::enable_shared_from_this<T> 内部维护了一个 std::weak_ptr<T>。当第一个 std::shared_ptr<T> 开始管理该对象时,这个 weak_ptr 被初始化。之后,当 shared_from_this() 被调用时,它将基于这个已经存在的 weak_ptr 返回一个新的 std::shared_ptr<T>,这个新的 shared_ptr 与原有的 shared_ptr 共享对对象的所有权。

那么使用这个接口,我们就能保证在这些回调函数在执行的时候,即使其他的地方调用了_svr_close_cb把TcpServer模块中的基础计数的智能指针释放了,这份资源也还存在,至少在我们这次函数栈帧内还存在,不会出现野指针的问题。

第二个函数就是HandlerWrite ,首先这是设置给channel的回调函数,也就是说只有当写事件触发时才会调用,那么我们直接调用write接口是不会被阻塞的。 当然我们需要判断Write的返回值,判断是否出错,如果写入数据出错了,那么我们就没必要再继续处理数据了,即使处理了也不可能再发出去,那么这时候我们就需要调用实际关闭连接的接口。

同时,我们还要考虑一种情况,就是,此时其实是读出错之后,调用ShutDown而监听调用的写事件,那么这时候写完这一次数据之后就需要关闭连接。 其实也就是判断连接是否为待关闭状态。

    void HandlerWrite() //写事件回调
    {
        // 1 向套接字写入数据
        int ret =_socket.Send(_out_buffer.ReadPosition(),_out_buffer.ReadSize());  //直接尝试全部写完
        // 2 判断是否写完
        if(ret < 0)  //在 Socket 的Send接口中做处理,只有真正出错时才返回 -1 ,那么这时候不需要再继续任何处理数据的操作了,直接关闭连接
        {
            Release();  // 这个接口是实际关闭连接的接口
        }
        if(ret == _out_buffer.ReadSize()) //说明写完了,那么可以关闭写事件的监控了
        {
            _channel.DisableWwrite();   //关闭写事件监控,直到下一次
        }
        //否则就表示没写完,那么就先不管比写事件监控
        _out_buffer.MoveWriteOffset(ret); //不管怎么样都要移动读偏移
        //然后判断连接是否是待关闭状态,如果是,写完这次数据我们就必须要关闭连接了
        if(_con_statu == CLOSING) Release();
    }

然后就是任意事件回调,任意事件回调我们只需要判断是否启动了超时连接,如果启动了,那么就需要刷新定时任务。 同时我们也需要执行以下用户的任意回调函数,除此之外就没其他的操作了。

    void HandlerEvent() //任意事件回调
    {
        if(_enable_inactive_release)  //如果启动了非活跃超时释放
        _loop->RefreshTimerTask(_id);
        if(_event_cb) _event_cb(shared_from_this());
    }

然后就是挂断事件回调,挂断事件也很简答,因为可能在挂断事件触发的时候,也触发了读事件,那么我们可以先处理以下数据,然后直接调用Release关闭连接,处不处理都行,反正也无法返回了,但是最好处理以下,因为可能是一些功能性请求。

    void HandlerClose() //挂断事件回调
    {
        if(_in_buffer.ReadSize())
        _message_cb(shared_from_this(),&_in_buffer);
        Release();
    }

最后就是错误事件回调,错误事件触发的时候,我们的处理方法和挂断事件是一样的。

    void HandlerError() //错误事件回调
    {
        HandlerClose();
    }

为什么我们实现这些接口不需要包装一层 _loop->RunInLoop() 呢? 还是那句话,因为这些IO事件的回调一定是在对应的EventLoop线程执行的,所以没有线程安全问题,但是其他的操作就可能有了,比如我们的Release以及ShutDown等操作,这些对连接的操作未来都可能是在其他线程中执行的,可能是在别的模块中被调用的,所以需要包装一层。

再来实现几个简单的接口,也就是设置回调函数的方法:

    void SetMessageCallBack(const MessageCallBack& cb){_message_cb = cb;}
    void SetConnectCallBack(const ConnectCallBack& cb){_connect_cb = cb;}
    void SetCloseCallBack(const CloseCallBack& cb){_close_cb = cb;}
    void SetEventCallBack(const EventCallBack& cb){_event_cb = cb;}
    void SetSvrCloseCallBack(const SvrCloseCallBack& cb){_svr_close_cb = cb;}

目前我们大概就能完成连接待处理状态的操作了。然后设置Connection的属性并让其开始通信。

就如同上面的这几个设置回调函数的接口,我们都是在创建出来一个对象之后,正式通信之前设置的,除此之外,我们还可以设置启动和取消非活跃连接销毁机制。

我们要注意的是,这两个接口其实有可能在通信的过程中被调用,如果是在通信之前被调用,那么是没有线程安全问题的,但是如果是在连接已经开始通信之后再被调用,那么我们要保证线程安全,就需要封装一层函数放到 EventLoop 的RunInLoop 中运行,比如这些接口其实都是在TcpServer中调用的,我们无法确定具体在哪个线程进行执行。

    void EnableInactiveReleaseInLoop(uint64_t delay = 30)  //启动非活跃连接销毁
    {
        _enable_inactive_release = true;
        //判断是否已经有超时任务
        if(_loop->HasTimerTask(_id))   //说明这个连接有定时任务在时间轮,那么直接刷新延迟、
        {
            _loop->RefreshTimerTask(_id); //直接刷新,前面的时间轮的刷新策略需要更新,再刷新的时候需要启动任务,也就是 _is_cancled = true
        }
        else    //没有定时任务,那么需要添加
        {
            _loop->AddTimerTask(_id,delay,std::bind(&Connection::Release,this));  //把销毁任务添加进去
        }
    }
    void DisableInactuveReleaseInLoop()   //取消非活跃连接销毁
    {
        _enable_inactive_release = false;
        //判断时间轮中是否有该任务,如果有,那么需要取消
        if(_loop->HasTimerTask(_id))
        _loop->CancelTimerTask(_id);
    }

    void EnableInactiveRelease(uint64_t delay)  //启动非活跃连接销毁
    {
        _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,delay));
    }
    void DisableInactuveRelease()   //取消非活跃连接销毁
    {
        _loop->RunInLoop(std::bind(&Connection::DisableInactuveReleaseInLoop,this));
    }

当设置完这些之后,我们就可以启动连接,开始通信了,我们可以设置一个 Established () 接口以供外界或者说上层的TcpServer调用。 但是为了线程安全,我们需要将实际的操作封装成一个任务交给loop。

    void EstablishedInLoop()
    {
        if(_con_statu!=CONNECTING) abort(); //出错,因为这个函数调用时一定是处于连接待处理阶段的,不可能是其他的状态
        _con_statu = CONNECTED;  //更新状态
        _channel.EnableRead();  //启动读事件监听
        if(_connect_cb) _connect_cb(shared_from_this()); //调用用户设置的连接建立回调
    }


    void Established()
    {
        _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));
    }

同时,在准备阶段也需要设置上下文的接口,这个接口我们就不关心线程安全的问题了,因为一般来说我们的逻辑没问题的话是不会重复设置的。

    void SetContext(const Any& context){_context = context;} //设置上下文

那么已经步入通信阶段之后,我们需要提供一个接口给用户用于发送数据,也就是向输出缓冲区写入数据,这个接口也不是线程安全的,所以也需要封装成任务交给 RunInLoop

    void SendInLoop(const char* in , size_t len)       //发送数据(用户层面)
    {
       _out_buffer.WriteAndPush(in,len);
       //启动写事件监控
       if(_out_buffer.ReadSize()) _channel.EnableWrite(); 
    }

    void Send(const char* in,size_t len)
    {
        _loop->RunInLoop(std::bind(&Connection::SendInLoop,this,in,len));
    }

然后就是设置上下文的接口,同时要注意线程安全问题。

设置上下文其实就是重新设置上下文以及四个回调函数。

    void UpgradeInLoop(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
    {
        _context = context;
        _connect_cb = con;
        _message_cb = msg;
        _event_cb = evt;
        _close_cb = clo;
    }

 具体执行的操作就是这样的,但是 Upgrade 这个接口有点特殊,他是线程不安全的,所以必须在EventLoop线程中执行,同时,这还不够,他必须被立马执行,不能放到任务队列中,因为如果如果放到任务队列中,那么如果此时有读事件到来,那么就会先进行事件处理,也就是会先使用老的处理函数进行处理,然后才更新回调方法,这是不对的,我们必须在调用Upgrade的时候立即将协议的回调函数和上下文进行替换。或许换一种说法:我们必须在EventLoop线程中调用Upgrade函数,如果在其他线程内调用,那么直接报错。 绝对不能在其他的线程中调用这个函数。

那么我们可以在EventLoop中再提供一个接口

    void AssertInLoop()const{assert(_thread_id == std::this_thread::get_id());}

而我们的Upgrade接口的实现就是这样的:

    void Upgrade(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
    {
       _loop->AssertInLoop();
        _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,con,msg,clo,evt));
    }

我们还需要提供一个接口用于获取上下文,这个接口可以不进行封装,

    Any* GetContext() {return &_context;}

然后就是关闭连接的接口了,我们有两套接口,首先来实现ShutDown,也就是用户关闭连接的接口,这个接口也是需要注意线程安全问题,需要封装成任务。

在这个接口中,我们需要处理输入缓冲区和输出缓冲区的数据,然后再调用Release接口关闭连接。

    void ShutDownInLoop()
    {
        _con_statu = CLOSING;   //设置连接待关闭
        if(_in_buffer.ReadSize()) //有数据待处理
            if(_message_cb) _message_cb(shared_from_this(),&_in_buffer);
        //所有数据都处理完之后,处理待发送数据
        if(_out_buffer.ReadSize())  //如果有数据待发送
        {
            _channel.EnableWrite();    //启动写事件监听
            //触发写事件之后,在写事件回调中会调用Release进行连接释放
        }
        else Release(); //如果没有数据待发送就直接关闭
    }

    void ShutDown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
    } 

最后就是实际释放连接的Release接口了

这个接口需要做很多事情:

首先,连接状态设置为 CLOSED, 然后取消定时任务,移除所有事件监控,关闭文件描述符,以及调用用户的回调函数,最后释放掉TcpServer所管理的基础计数

    void ReleaseinLoop()        //实际关闭连接
    {
        assert(_con_statu == CLOSING);
        _con_statu = CLOSED;    //更新状态
        if(_loop->HasTimerTask(_id)) _loop->CancelTimerTask(_id);   //取消定时任务
        _channel.Remove();      //移除事件监控
        _socket.Close();        //关闭套接字
        if(_close_cb)   _close_cb(shared_from_this());        //先调用用户设置的关闭连接调用
        if(_svr_close_cb) _svr_close_cb(shared_from_this()); //再调用TcpServer提供的接口用于移除管理,删除基础计数
        //注意调用的顺序,因为Release接口并不需要保留一个shared_ptr参数,那么就意味着,可能调用完 _svr_close_cb之后,计数就为0,把资源释放了,那么这时候已经无法找到_close_cb了
    }

    void Release()  //不提供给外部
    {
        _loop->RunInLoop(std::bind(&Connection::ReleaseinLoop,this));
    }

最后我们在设置两个接口,用于获取 sockfd 和 id

    int Fd()const {return _sockfd;}
    uint64_t Id()const {return _id;}

模块测试纠错

未来主Reactor线程获取到一个新连接之后,就不是创建一个Channel对象了,而是创建一个Connection对象,至于如何创建一个Connection对象,这需要有TcpServer提供并进行管理。不过我们这里由于还没有实现TcpServer ,我们就直接使用全局函数来模拟了。

首先模拟用户设置的四个回调函数

void Message(const PtrConnection&con,Buffer* inbuffer)    //模拟用户新数据回调
{
    char buf[1024] = {0};
    inbuffer->ReadAndPop(buf,inbuffer->ReadSize()); //先不嵌入协议以及业务逻辑,我们直接读出来然后发回去
    NORMAL_LOG("read: %s  and reply it",buf);
    con->Send(buf,strlen(buf));
    return ;
}

void Close(const PtrConnection&con)  
{
    NORMAL_LOG("连接关闭: %d",con->Fd());
}
void Connect(const PtrConnection&con)  
{
    NORMAL_LOG("连接建立: %d",con->Fd());
}

//任意事件回调
void Event(const PtrConnection&con) 
{
    NORMAL_LOG("事件到来:%d",con->Fd());
}

然后模拟TcpServer模块的管理Connection对象的unordered_map 以及设置的连接id,直接用全局的模拟一下

std::unordered_map<uint64_t,PtrConnection> _conns; //管理Connection的基础计数,使用_id作为key值,有TcpServer维护 
uint64_t auto_id = 0; //模拟TcpServer维护的一个CON_ID

模拟一个主Reactor的EventLoop模块,我们目前测试只用一个线程来测试

//简单使用一个全局的 EventLoop 对象来进行测试
EventLoop loop;

然后就是模拟服务器设置给Connection的移除管理基础计数的函数

void ServerClose(const PtrConnection& con) //模拟TcpServer设置的关闭连接回调函数
{
    _conns.erase(con->Id());   
}

最后就是模拟TcpServer设置给Acceptor模块的创建连接的回调函数。

void CreateConnection(int fd)     //模拟TcpServer提供给Acceptor模块的创建Connection的回调
{
    PtrConnection pt (new Connection(auto_id,fd,&loop));
    _conns.insert(std::make_pair(auto_id,pt));      //添加管理
    auto_id++;  //后续可能会有线程安全,我们需要互斥以及同步机制
    //那么接下来就是设置一些属性
    //1 设置用户的处理函数,设置上下文
    pt->SetMessageCallBack(Message);
    pt->SetCloseCallBack(Close);
    pt->SetConnectCallBack(Connect);
    pt->SetEventCallBack(Event);
    Any a;  //后续也会在TcpServer中管理
    pt->SetContext(a);
    //2 设置非活跃超时销毁
    pt->EnableInactiveRelease(5);
    //3 设置服务器的关闭连接函数
    //其实就是删除基础计数的shared_ptr
    pt->SetSvrCloseCallBack(ServerClose);
    //4 开始通信
    pt->Established();
}

最后由于我们还是没有实现Acceptor模块,那么我们也还是只能够用一个Socket来模拟实现

首先还是模拟Acceptor模块的读回调函数

void Acceptor(Socket socket)
{  
    NORMAL_LOG("enter acceptor");
    int newfd = socket.Acceptor();
    if(newfd <= 0) 
    {
        NORMAL_LOG("newfd == -1");
        if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
        ERROR_LOG("acceptor failed"); 
        abort();
    }
    //获取到新连接之后就直接调用TcpServer的函数就行了
    CreateConnection(newfd);
}

而在我们的主函数中的逻辑就是设置好一个监听模块,设置好监听新连接,然后启动我们的EventLoop模块

int main()
{
    Socket lstsock;
    lstsock.CreatServerSocket(8080);
    //为lstsock设置事件监听
    Channel lstchannel(lstsock.Fd(),&loop);
    //启动读事件监听
    lstchannel.SetReadCallBack(std::bind(Acceptor,lstsock));
    lstchannel.EnableRead();    
    loop.Start();//开启EventLoop
    return 0;
}

这里要注意的一个问题,就是我们使用std::bind绑定我们的监听套接字对象的话,那么我们的bind会在函数空间中拷贝一个lstsock对象,而这个临时的函数对象未来在销毁的时候,我们的临时的lstsock对象也会析构,那么就会调用析构函数中的Close将文件描述符关闭,那么后续就完蛋了。 由于我们对连接套接字的管理是通过Connection来进行的,Connection的Release中会有调用Close的逻辑,既然如此,我们的Socket的析构函数中不如就直接析构函数什么也不干了。 

也就是把Socket的析构函数关闭套接字的逻辑删除。

还有一个逻辑上的小问题就是:

注意我们Connection的Send函数,我们前面写的的Send函数中,是直接将数据的指针传递给SendInLoop,而这个SendInLoop是被包装成任务放在EventLoop中的任务队列中的,任务队列中的任务并不会立马执行,那么就意味着,等到我们的EventLoop执行这个SendInLoop任务的时候,该指针指向的数据早就被释放了,因为一般我们都是拿一些临时的数据直接写到缓冲区的,这时候再去执行SendInLoop就会出现野指针问题。 

所以我们的Send函数在拿到数据之后,首先要把这份数据拷贝一份,最简单的,我们可以用一个Buffer来保存一份要写入的数据,然后将Buffer传递绑定给SendInLoop。 

    void Send(const char* in,size_t len)
    {
        Buffer buf;
        buf.WriteAndPush(in,len);
        _loop->RunInLoop(std::bind(&Connection::SendInLoop,this,buf));
    }

还要注意的就是,我们的SendInLoop接收参数的时候也需要用值传参接收,而不能是引用或者指针,还是那个问题,我们的SendInLoop执行的时候,Send的临时对象也可能早就销毁了。

    void SendInLoop(Buffer buf)       //发送数据(用户层面)
    {
        if(_con_statu == CLOSED) return;
       _out_buffer.WriteBufferAndPush(buf);
       //启动写事件监控
       if(_out_buffer.ReadSize()) _channel.EnableWrite(); 
    }

如此一来,我们就需要补充一个WriteBufferAndPush函数了,这个函数也很好实现,可以复用WirteAndPush(buffer.ReadPosition(),buffer.ReadSize());        

那么我们现在就可以测试一下这段代码了:

客户端还是每秒发送一个数据,五次之后就休眠,我们测试非活跃10秒之后能不能正确释放连接。

这时候我们会发现一个逻辑上的小问题,就是ReleaseInLoop中,也就是实际上关闭连接的函数中,我们进来之后进行了一个assert(_con_statu==CLOSING) , 但是由非活跃超时而引起的调用Release并没有经过ShutDown函数,也就是并不会将状态设置为CLOSING,那么这个断言就会出问题。

所以其实我们的ReleaseInLoop中不应该有这个断言,因为超时释放并不会存在CLOSING状态,而是服务器端的强制行为。

那么修正之后接着来测试:

那么目前就已经能够将连接建立起来并且完成Connection的超时释放了。

上一篇:车辆管理的SpringBoot技术革新


下一篇:spring boot 3.3.4 网关(gateway) 集成knife4j 4.4.0