目录
模块设计
模块实现
模块测试纠错
模块设计
Connection模块是对通信连接也就是通信套接字的整体的管理模块,对连接的所有操作都是通过这个模块提供的接口来完成的。
那么他具体要进行哪些方面的管理呢?
首先每个通信连接都需要维护用户态的输入缓冲区的输出缓冲区,也就是Conenction内部需要包含两个Buffer对象。 那么针对缓冲区,需要提供什么接口呢?除去缓冲区本身的接口之外,我们的Connection需要提供 Send 接口,这个Send接口其实就是将要发送的数据写入到用户输出缓冲区中,并启动写事件监听。
其次,还也需要对每个通信套接字本身做管理,能够完成套接字的种种操作,其实就是内部需要包含一个Socket对象,未来关闭套接字时能够直接调用Socket的接口
还需要对套接字的监听事件做管理,每一个通信套接字的Channel对象都是包含在Conenction内部的,那么我们的Conenction就需要提供几个关于事件管理的接口,需要能够设置几个事件的回调函数,以及设置需要监控的事件。
一个Connection 对象还需要保存协议解析的上下文信息,也就是一个Any对象,未来要提供接口用于获取内部的上下文信息的指针。
最后,一个Connection也需要保存回调函数,这些回调函数就是由用户设置给TcpServer模块,TcpServer模块再设置给Connection模块,而Connection内部再加一些自己的处理,在自己内部实现一些回调函数最终设置给 Channel 模块中,用于事件触发之后的回调处理。
那么大致需要提供哪些功能性接口给上层:
1 发送数据接口,其实就是把数据放到发送缓冲区,监听写事件,写事件触发时在调用Channel中的处理方法进行实际的发送,写入内核缓冲区
2 关闭连接接口,这个关闭连接的接口是提供给用户的关闭连接接口,但是他并不是直接调用Socket的Close接口,而是先要将输入和输出缓冲区的数据都处理完之后,再调用一个我们私有的真正关闭连接的接口
3 启动超时销毁功能
4 取消超时销毁功能
这两个接口都是提供给TcpServer模块的,同时TcpServer模块再提供给用户的接口
5 协议切换,协议其实就是规定一个数据到来之后该如何处理,它取决于上下文以及数据的业务处理函数,所以协议切换其实就是重新设置上下文以及重新设置一套回调函数。
这时候我们就需要考虑一个很重要的问题: Connection 对象如何管理?
首先,所有的Connection 对象肯定是要交给TcpServer 模块进行整体的管理的,这个我们还是很好理解,因为未来我们的Acceptor获取到新连接之后,需要创建Connection对象,而创建Connection对象的接口就只能是TcpServer模块提供给他的,也就是由TcpServer来创建,创建之后,他也需要对所有的Connection进行管理。
那么怎么管理呢?new一个Connection对象然后使用指针进行管理?那么会不会有这样一种情况:连接有多种操作,如果其中一个操作把连接关闭释放了,后续的操作再调用Conenction的接口不就是野指针访问了吗?那么程序会触发硬件异常而崩溃。这是我们不希望看到的。即便 TcpServer中可能还保存着这个对象的指针,但是指针指向的资源却可能已经释放了。
我们可以使用智能指针 share_ptr 来对Connction 进行管理。这样一来,我们能够保证,任何一个操作在进行的时候,只要函数栈帧已经展开,参数接受了Conenction的shared_ptr对象,那么在函数运行期间,该函数栈帧内的shared_ptr 对象至少还保留了一个Connection 的计数,该资源不会被立即释放,也就不会造成野指针的访问。就算其他的操作将连接释放了,也只是将shared_ptr的计数减一,或者说将TcpServer中所管理的Conncetion的基础的shared_ptr对象释放了,并不会实际释放Connection的资源。
基于使用sharet_ptr来管理Connection的思想,我们未来设置接口的时候,就需要传递Connection 的智能指针对象或者引用了,而不是直接传递原始指针。
那么Connection类该如何设计?
他需要的成员:
每一个连接都需要有一个在服务器内部的唯一标识,也就是id,为什么不直接使用 fd ?因为我们对fd 的管理也不是直接使用原始的fd 的,而是使用Socket来管理,我们需要将其与系统的IO的部分进行解耦。 同时,未来这个连接的id也是她所对应的定时任务的id。
其次,他需要一个Channel对象、一个Socket对象,两个Buffer对象以及一个Any对象
再有,他需要保存四个回调函数,这四个回调函数是由用户设置的,分别是连接建立时执行的回调函数,新数据到来时执行的回调方法,连接关闭时执行的回调方法以及任意事件到来执行的回调方法。
由于Connection设计到超时的管理,那么我们还需要一个值来表示是否启动了超时管理。
以及每一个Cconnection对象需要和一个EventLoop对象绑定,所以他还需要一个EventLoop的指针。
这是几个简单的成员,我们还需要一个成员就是连接所处的状态。
这个状态并不是站在内核层面的状态,而是站在应用层角度的状态。
状态有以下几种: 1 连接建立中: 此时我们已经从底层将连接的文件描述符获取上来,并为其创建了Connection对象,但是Connection内部的各项设置还未完成
2 连接建立完成,Connection对象的各项设置已经完成,可以进行通信了
3 连接待关闭状态,对端或者本端需要关闭这个连接,但是在实际关闭连接之前我们还需要把缓冲区的数据全部处理完
4 连接关闭状态,处于这个状态的时候,就可以把套接字关闭,以及资源释放了
那么这个状态我们可以使用枚举来定义。
enum CONN_STATU
{
CLOSED, //关闭状态,不再进行任何操作,释放资源
CONNECTING, //连接待处理,还需要进行各项设置才能开始监听通信
CONNECTED, //连接建立,可以通信
CLOSING //连接带关闭,尽快处理缓冲区剩余数据
};
同时还需要一个变量用来保存是否开启了超时释放机制,只需要用一个bool类型就行了。
那么Connection内部的成员大概就这些:
class Connection;
using PtrConnection = std::shared_ptr<Connection>;
class Connection :public std::enable_shared_from_this<Connection>
{
private:
uint64_t _id; //连接的id,也是连接对应的超时任务的id
int _sockfd; //保存对应的描述符
EventLoop* _loop; //绑定的EventLoop
CONN_STATU _con_statu; //连接状态
Socket _socket; //套接字操作模块
Channel _channel; //事件管理模块
Buffer _in_buffer; //输入缓冲区
Buffer _out_buffer; //输出缓冲区
Any _context; //连接上下文
bool _enable_inactive_release; //标识是否开启非活跃连接超时释放机制
using EventCallBack = std::function<void(const PtrConnection&)>; //用户设置的任意事件回调
using ConnectCallBack = std::function<void(const PtrConnection&)>; //用户设置的连接建立回调
using MessageCallBack = std::function<void(const PtrConnection&,Buffer*)>; //用户设置的新数据回调,需要传递输入缓冲区
using CloseCallBack = std::function<void(const PtrConnection&)>; //用户设置的连接关闭回调
using SvrCloseCallBack = std::function<void(const PtrConnection&)>; //TcpServer设置的连接关闭回调
ConnectCallBack _connect_cb;
MessageCallBack _message_cb;
CloseCallBack _close_cb;
EventCallBack _event_cb;
SvrCloseCallBack _svr_close_cb;
而对于接口的设计,我们需要在实现的过程中再来一步一步设计,因为Connection的设计有点复杂,我们就算在这里列出来了也没意义。
模块实现
首先,我们的Connection需要基本的构造函数,在构造函数中,我们需要传入 id ,loop ,socket这三个基本的成员。同时,我们需要为channel对象设置四个回调函数。
void HandlerRead(); //读事件回调
void HandlerWrite();//写事件回调
void HandlerClose();//挂断事件回调
void HandlerError();//错误事件回调
void HandlerEvent();//任意事件回调
public: //功能接口
Connection(uint64_t id , int sockfd ,EventLoop* loop)
:_id(id),_sockfd(sockfd),_loop(loop),_con_statu(CONNECTING),_socket(sockfd),_channel(_sockfd,_loop),_enable_inactive_release(false)
{
_channel.SetReadCallBack(std::bind(&Connection::HandlerRead,this));
_channel.SetWriteCallBack(std::bind(&Connection::HandlerWrite,this));
_channel.SetCloseCallBack(std::bind(&Connection::HandlerClose,this));
_channel.SetErrorCallBack(std::bind(&Connection::HandlerError,this));
_channel.SetEventCallBack(std::bind(&Connection::HandlerEvent,this));
}
那么我们现在就需要实现五个回调函数,
首先读事件回调,他的思路很简单,只需要从套接字中读取数据,然后放在输入缓冲区,再来调用用户传进来的新数据回调函数就行了。
void HandlerRead() //读事件回调
{
// 1 从套接字读取数据
#define READ_SIZE 65535
char buffer[READ_SIZE] = {0};
int ret = _socket.Read(buffer,READ_SIZE-1);
if(ret < 0 ) //对Socket的Read做处理,只有真正出错的时候才返回 -1 ,其他的时候都返回 >= 0 的值
{
//说明套接字出错,那么此时也不能直接关闭连接,因为输出缓冲区中还有数据待发送,所以是调用ShutDown接口
ShutDown(); //先处理剩余数据,再实际关闭连接
}
// 2 放入输入缓冲区
_in_buffer.WriteAndPush(buffer,ret);
// 3 调用新数据回调
if(_in_buffer.ReadSize()) //可能没读到数据,被信号打断了或者其他原因
{
if(_message_cb) _message_cb(shared_from_this(),&_in_buffer); //shared_from_this() 会从当前对象的第一个创建的shared_ptr中进行拷贝
}
}
这里我们可能会疑惑,为什么调用完_message_cb 之后不添加写事件监控,也就是这样一行代码:
if(_out_buffer.ReadSize()) _channel.EnableWrite(); //如果写入了新的数据,那么开启写事件监控
其实是因为,未来用户只能调用我们Connection提供的Send接口来发送数据,但是Send接口我们也懂,只会将数据写入到输出缓冲区中,我们在Send函数的实现中,只有实际写入了再来启动写事件的监控更加合理。
shared_from_this
这里有一个细节,就是我们前面声明类的时候,Connection 类继承了 std::enable_shared_from_this这个类,这个继承关系是为了我们能够使用 shared_from_this这个功能。
这个用法叫做自我引用。
为什么我们需要引进一个shared_from_this这样的接口呢?
我们说了使用shared_ptr对所有的Connection对象进行管理,这样能够防止在操作的过程中资源被释放。 但是,我们在给 _message_cb 这样的回调函数传参的时候,如何保证传给他的shared_ptr对象是和管理Conenction 的shared_ptr的对象共享计数呢?
因为如果我们直接使用 shared_ptr<Connection> p (this) ,这样创建一个只能指针对象传参的时候,他的计数是独立的,并不会和TcpServer中管理Conenction的shared_ptr共享计数,那么我们就需要一个办法能够创建出一个和Conenction 的管理的shared_ptr对象共享技术的智能指针进行传参,而shared_from_this就可以解决这样的问题。
std::enable_shared_from_this<T> 内部维护了一个 std::weak_ptr<T>。当第一个 std::shared_ptr<T> 开始管理该对象时,这个 weak_ptr 被初始化。之后,当 shared_from_this() 被调用时,它将基于这个已经存在的 weak_ptr 返回一个新的 std::shared_ptr<T>,这个新的 shared_ptr 与原有的 shared_ptr 共享对对象的所有权。
那么使用这个接口,我们就能保证在这些回调函数在执行的时候,即使其他的地方调用了_svr_close_cb把TcpServer模块中的基础计数的智能指针释放了,这份资源也还存在,至少在我们这次函数栈帧内还存在,不会出现野指针的问题。
第二个函数就是HandlerWrite ,首先这是设置给channel的回调函数,也就是说只有当写事件触发时才会调用,那么我们直接调用write接口是不会被阻塞的。 当然我们需要判断Write的返回值,判断是否出错,如果写入数据出错了,那么我们就没必要再继续处理数据了,即使处理了也不可能再发出去,那么这时候我们就需要调用实际关闭连接的接口。
同时,我们还要考虑一种情况,就是,此时其实是读出错之后,调用ShutDown而监听调用的写事件,那么这时候写完这一次数据之后就需要关闭连接。 其实也就是判断连接是否为待关闭状态。
void HandlerWrite() //写事件回调
{
// 1 向套接字写入数据
int ret =_socket.Send(_out_buffer.ReadPosition(),_out_buffer.ReadSize()); //直接尝试全部写完
// 2 判断是否写完
if(ret < 0) //在 Socket 的Send接口中做处理,只有真正出错时才返回 -1 ,那么这时候不需要再继续任何处理数据的操作了,直接关闭连接
{
Release(); // 这个接口是实际关闭连接的接口
}
if(ret == _out_buffer.ReadSize()) //说明写完了,那么可以关闭写事件的监控了
{
_channel.DisableWwrite(); //关闭写事件监控,直到下一次
}
//否则就表示没写完,那么就先不管比写事件监控
_out_buffer.MoveWriteOffset(ret); //不管怎么样都要移动读偏移
//然后判断连接是否是待关闭状态,如果是,写完这次数据我们就必须要关闭连接了
if(_con_statu == CLOSING) Release();
}
然后就是任意事件回调,任意事件回调我们只需要判断是否启动了超时连接,如果启动了,那么就需要刷新定时任务。 同时我们也需要执行以下用户的任意回调函数,除此之外就没其他的操作了。
void HandlerEvent() //任意事件回调
{
if(_enable_inactive_release) //如果启动了非活跃超时释放
_loop->RefreshTimerTask(_id);
if(_event_cb) _event_cb(shared_from_this());
}
然后就是挂断事件回调,挂断事件也很简答,因为可能在挂断事件触发的时候,也触发了读事件,那么我们可以先处理以下数据,然后直接调用Release关闭连接,处不处理都行,反正也无法返回了,但是最好处理以下,因为可能是一些功能性请求。
void HandlerClose() //挂断事件回调
{
if(_in_buffer.ReadSize())
_message_cb(shared_from_this(),&_in_buffer);
Release();
}
最后就是错误事件回调,错误事件触发的时候,我们的处理方法和挂断事件是一样的。
void HandlerError() //错误事件回调
{
HandlerClose();
}
为什么我们实现这些接口不需要包装一层 _loop->RunInLoop() 呢? 还是那句话,因为这些IO事件的回调一定是在对应的EventLoop线程执行的,所以没有线程安全问题,但是其他的操作就可能有了,比如我们的Release以及ShutDown等操作,这些对连接的操作未来都可能是在其他线程中执行的,可能是在别的模块中被调用的,所以需要包装一层。
再来实现几个简单的接口,也就是设置回调函数的方法:
void SetMessageCallBack(const MessageCallBack& cb){_message_cb = cb;}
void SetConnectCallBack(const ConnectCallBack& cb){_connect_cb = cb;}
void SetCloseCallBack(const CloseCallBack& cb){_close_cb = cb;}
void SetEventCallBack(const EventCallBack& cb){_event_cb = cb;}
void SetSvrCloseCallBack(const SvrCloseCallBack& cb){_svr_close_cb = cb;}
目前我们大概就能完成连接待处理状态的操作了。然后设置Connection的属性并让其开始通信。
就如同上面的这几个设置回调函数的接口,我们都是在创建出来一个对象之后,正式通信之前设置的,除此之外,我们还可以设置启动和取消非活跃连接销毁机制。
我们要注意的是,这两个接口其实有可能在通信的过程中被调用,如果是在通信之前被调用,那么是没有线程安全问题的,但是如果是在连接已经开始通信之后再被调用,那么我们要保证线程安全,就需要封装一层函数放到 EventLoop 的RunInLoop 中运行,比如这些接口其实都是在TcpServer中调用的,我们无法确定具体在哪个线程进行执行。
void EnableInactiveReleaseInLoop(uint64_t delay = 30) //启动非活跃连接销毁
{
_enable_inactive_release = true;
//判断是否已经有超时任务
if(_loop->HasTimerTask(_id)) //说明这个连接有定时任务在时间轮,那么直接刷新延迟、
{
_loop->RefreshTimerTask(_id); //直接刷新,前面的时间轮的刷新策略需要更新,再刷新的时候需要启动任务,也就是 _is_cancled = true
}
else //没有定时任务,那么需要添加
{
_loop->AddTimerTask(_id,delay,std::bind(&Connection::Release,this)); //把销毁任务添加进去
}
}
void DisableInactuveReleaseInLoop() //取消非活跃连接销毁
{
_enable_inactive_release = false;
//判断时间轮中是否有该任务,如果有,那么需要取消
if(_loop->HasTimerTask(_id))
_loop->CancelTimerTask(_id);
}
void EnableInactiveRelease(uint64_t delay) //启动非活跃连接销毁
{
_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop,this,delay));
}
void DisableInactuveRelease() //取消非活跃连接销毁
{
_loop->RunInLoop(std::bind(&Connection::DisableInactuveReleaseInLoop,this));
}
当设置完这些之后,我们就可以启动连接,开始通信了,我们可以设置一个 Established () 接口以供外界或者说上层的TcpServer调用。 但是为了线程安全,我们需要将实际的操作封装成一个任务交给loop。
void EstablishedInLoop()
{
if(_con_statu!=CONNECTING) abort(); //出错,因为这个函数调用时一定是处于连接待处理阶段的,不可能是其他的状态
_con_statu = CONNECTED; //更新状态
_channel.EnableRead(); //启动读事件监听
if(_connect_cb) _connect_cb(shared_from_this()); //调用用户设置的连接建立回调
}
void Established()
{
_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop,this));
}
同时,在准备阶段也需要设置上下文的接口,这个接口我们就不关心线程安全的问题了,因为一般来说我们的逻辑没问题的话是不会重复设置的。
void SetContext(const Any& context){_context = context;} //设置上下文
那么已经步入通信阶段之后,我们需要提供一个接口给用户用于发送数据,也就是向输出缓冲区写入数据,这个接口也不是线程安全的,所以也需要封装成任务交给 RunInLoop
void SendInLoop(const char* in , size_t len) //发送数据(用户层面)
{
_out_buffer.WriteAndPush(in,len);
//启动写事件监控
if(_out_buffer.ReadSize()) _channel.EnableWrite();
}
void Send(const char* in,size_t len)
{
_loop->RunInLoop(std::bind(&Connection::SendInLoop,this,in,len));
}
然后就是设置上下文的接口,同时要注意线程安全问题。
设置上下文其实就是重新设置上下文以及四个回调函数。
void UpgradeInLoop(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
{
_context = context;
_connect_cb = con;
_message_cb = msg;
_event_cb = evt;
_close_cb = clo;
}
具体执行的操作就是这样的,但是 Upgrade 这个接口有点特殊,他是线程不安全的,所以必须在EventLoop线程中执行,同时,这还不够,他必须被立马执行,不能放到任务队列中,因为如果如果放到任务队列中,那么如果此时有读事件到来,那么就会先进行事件处理,也就是会先使用老的处理函数进行处理,然后才更新回调方法,这是不对的,我们必须在调用Upgrade的时候立即将协议的回调函数和上下文进行替换。或许换一种说法:我们必须在EventLoop线程中调用Upgrade函数,如果在其他线程内调用,那么直接报错。 绝对不能在其他的线程中调用这个函数。
那么我们可以在EventLoop中再提供一个接口
void AssertInLoop()const{assert(_thread_id == std::this_thread::get_id());}
而我们的Upgrade接口的实现就是这样的:
void Upgrade(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
{
_loop->AssertInLoop();
_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,con,msg,clo,evt));
}
我们还需要提供一个接口用于获取上下文,这个接口可以不进行封装,
Any* GetContext() {return &_context;}
然后就是关闭连接的接口了,我们有两套接口,首先来实现ShutDown,也就是用户关闭连接的接口,这个接口也是需要注意线程安全问题,需要封装成任务。
在这个接口中,我们需要处理输入缓冲区和输出缓冲区的数据,然后再调用Release接口关闭连接。
void ShutDownInLoop()
{
_con_statu = CLOSING; //设置连接待关闭
if(_in_buffer.ReadSize()) //有数据待处理
if(_message_cb) _message_cb(shared_from_this(),&_in_buffer);
//所有数据都处理完之后,处理待发送数据
if(_out_buffer.ReadSize()) //如果有数据待发送
{
_channel.EnableWrite(); //启动写事件监听
//触发写事件之后,在写事件回调中会调用Release进行连接释放
}
else Release(); //如果没有数据待发送就直接关闭
}
void ShutDown()
{
_loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
}
最后就是实际释放连接的Release接口了
这个接口需要做很多事情:
首先,连接状态设置为 CLOSED, 然后取消定时任务,移除所有事件监控,关闭文件描述符,以及调用用户的回调函数,最后释放掉TcpServer所管理的基础计数
void ReleaseinLoop() //实际关闭连接
{
assert(_con_statu == CLOSING);
_con_statu = CLOSED; //更新状态
if(_loop->HasTimerTask(_id)) _loop->CancelTimerTask(_id); //取消定时任务
_channel.Remove(); //移除事件监控
_socket.Close(); //关闭套接字
if(_close_cb) _close_cb(shared_from_this()); //先调用用户设置的关闭连接调用
if(_svr_close_cb) _svr_close_cb(shared_from_this()); //再调用TcpServer提供的接口用于移除管理,删除基础计数
//注意调用的顺序,因为Release接口并不需要保留一个shared_ptr参数,那么就意味着,可能调用完 _svr_close_cb之后,计数就为0,把资源释放了,那么这时候已经无法找到_close_cb了
}
void Release() //不提供给外部
{
_loop->RunInLoop(std::bind(&Connection::ReleaseinLoop,this));
}
最后我们在设置两个接口,用于获取 sockfd 和 id
int Fd()const {return _sockfd;}
uint64_t Id()const {return _id;}
模块测试纠错
未来主Reactor线程获取到一个新连接之后,就不是创建一个Channel对象了,而是创建一个Connection对象,至于如何创建一个Connection对象,这需要有TcpServer提供并进行管理。不过我们这里由于还没有实现TcpServer ,我们就直接使用全局函数来模拟了。
首先模拟用户设置的四个回调函数
void Message(const PtrConnection&con,Buffer* inbuffer) //模拟用户新数据回调
{
char buf[1024] = {0};
inbuffer->ReadAndPop(buf,inbuffer->ReadSize()); //先不嵌入协议以及业务逻辑,我们直接读出来然后发回去
NORMAL_LOG("read: %s and reply it",buf);
con->Send(buf,strlen(buf));
return ;
}
void Close(const PtrConnection&con)
{
NORMAL_LOG("连接关闭: %d",con->Fd());
}
void Connect(const PtrConnection&con)
{
NORMAL_LOG("连接建立: %d",con->Fd());
}
//任意事件回调
void Event(const PtrConnection&con)
{
NORMAL_LOG("事件到来:%d",con->Fd());
}
然后模拟TcpServer模块的管理Connection对象的unordered_map 以及设置的连接id,直接用全局的模拟一下
std::unordered_map<uint64_t,PtrConnection> _conns; //管理Connection的基础计数,使用_id作为key值,有TcpServer维护
uint64_t auto_id = 0; //模拟TcpServer维护的一个CON_ID
模拟一个主Reactor的EventLoop模块,我们目前测试只用一个线程来测试
//简单使用一个全局的 EventLoop 对象来进行测试
EventLoop loop;
然后就是模拟服务器设置给Connection的移除管理基础计数的函数
void ServerClose(const PtrConnection& con) //模拟TcpServer设置的关闭连接回调函数
{
_conns.erase(con->Id());
}
最后就是模拟TcpServer设置给Acceptor模块的创建连接的回调函数。
void CreateConnection(int fd) //模拟TcpServer提供给Acceptor模块的创建Connection的回调
{
PtrConnection pt (new Connection(auto_id,fd,&loop));
_conns.insert(std::make_pair(auto_id,pt)); //添加管理
auto_id++; //后续可能会有线程安全,我们需要互斥以及同步机制
//那么接下来就是设置一些属性
//1 设置用户的处理函数,设置上下文
pt->SetMessageCallBack(Message);
pt->SetCloseCallBack(Close);
pt->SetConnectCallBack(Connect);
pt->SetEventCallBack(Event);
Any a; //后续也会在TcpServer中管理
pt->SetContext(a);
//2 设置非活跃超时销毁
pt->EnableInactiveRelease(5);
//3 设置服务器的关闭连接函数
//其实就是删除基础计数的shared_ptr
pt->SetSvrCloseCallBack(ServerClose);
//4 开始通信
pt->Established();
}
最后由于我们还是没有实现Acceptor模块,那么我们也还是只能够用一个Socket来模拟实现
首先还是模拟Acceptor模块的读回调函数
void Acceptor(Socket socket)
{
NORMAL_LOG("enter acceptor");
int newfd = socket.Acceptor();
if(newfd <= 0)
{
NORMAL_LOG("newfd == -1");
if(errno ==EAGAIN ||errno ==EWOULDBLOCK || errno ==EINTR) return;
ERROR_LOG("acceptor failed");
abort();
}
//获取到新连接之后就直接调用TcpServer的函数就行了
CreateConnection(newfd);
}
而在我们的主函数中的逻辑就是设置好一个监听模块,设置好监听新连接,然后启动我们的EventLoop模块
int main()
{
Socket lstsock;
lstsock.CreatServerSocket(8080);
//为lstsock设置事件监听
Channel lstchannel(lstsock.Fd(),&loop);
//启动读事件监听
lstchannel.SetReadCallBack(std::bind(Acceptor,lstsock));
lstchannel.EnableRead();
loop.Start();//开启EventLoop
return 0;
}
这里要注意的一个问题,就是我们使用std::bind绑定我们的监听套接字对象的话,那么我们的bind会在函数空间中拷贝一个lstsock对象,而这个临时的函数对象未来在销毁的时候,我们的临时的lstsock对象也会析构,那么就会调用析构函数中的Close将文件描述符关闭,那么后续就完蛋了。 由于我们对连接套接字的管理是通过Connection来进行的,Connection的Release中会有调用Close的逻辑,既然如此,我们的Socket的析构函数中不如就直接析构函数什么也不干了。
也就是把Socket的析构函数关闭套接字的逻辑删除。
还有一个逻辑上的小问题就是:
注意我们Connection的Send函数,我们前面写的的Send函数中,是直接将数据的指针传递给SendInLoop,而这个SendInLoop是被包装成任务放在EventLoop中的任务队列中的,任务队列中的任务并不会立马执行,那么就意味着,等到我们的EventLoop执行这个SendInLoop任务的时候,该指针指向的数据早就被释放了,因为一般我们都是拿一些临时的数据直接写到缓冲区的,这时候再去执行SendInLoop就会出现野指针问题。
所以我们的Send函数在拿到数据之后,首先要把这份数据拷贝一份,最简单的,我们可以用一个Buffer来保存一份要写入的数据,然后将Buffer传递绑定给SendInLoop。
void Send(const char* in,size_t len)
{
Buffer buf;
buf.WriteAndPush(in,len);
_loop->RunInLoop(std::bind(&Connection::SendInLoop,this,buf));
}
还要注意的就是,我们的SendInLoop接收参数的时候也需要用值传参接收,而不能是引用或者指针,还是那个问题,我们的SendInLoop执行的时候,Send的临时对象也可能早就销毁了。
void SendInLoop(Buffer buf) //发送数据(用户层面)
{
if(_con_statu == CLOSED) return;
_out_buffer.WriteBufferAndPush(buf);
//启动写事件监控
if(_out_buffer.ReadSize()) _channel.EnableWrite();
}
如此一来,我们就需要补充一个WriteBufferAndPush函数了,这个函数也很好实现,可以复用WirteAndPush(buffer.ReadPosition(),buffer.ReadSize());
那么我们现在就可以测试一下这段代码了:
客户端还是每秒发送一个数据,五次之后就休眠,我们测试非活跃10秒之后能不能正确释放连接。
这时候我们会发现一个逻辑上的小问题,就是ReleaseInLoop中,也就是实际上关闭连接的函数中,我们进来之后进行了一个assert(_con_statu==CLOSING) , 但是由非活跃超时而引起的调用Release并没有经过ShutDown函数,也就是并不会将状态设置为CLOSING,那么这个断言就会出问题。
所以其实我们的ReleaseInLoop中不应该有这个断言,因为超时释放并不会存在CLOSING状态,而是服务器端的强制行为。
那么修正之后接着来测试:
那么目前就已经能够将连接建立起来并且完成Connection的超时释放了。