通过上一篇博客的学习,你应该对基于epoll的事件触发机制有所掌握,并且通过阅读sio.c/sio.h应该也学会了如何封装epoll以及如何通过设计令epoll更加实用(用户回调,用户参数)。
简单回顾一下sio.h中的关键接口,我们接下来将会基于”sio事件触发层“来实现更加上层的“sio_stream(TCP)层”,“sio_dgram(UDP)层”,这种分层设计对于任何架构与程序设计都是非常普遍且有效的。
在sio层,
1,我们可以注册一个fd,提供一个事件回调函数和一个用户参数:
struct sio_fd *sio_add(struct sio *sio, int fd, sio_callback_t callback, void *arg);
2,我们可以修改事件回调函数和用户参数:
void sio_set(struct sio *sio, struct sio_fd *sfd, sio_callback_t callback, void *arg);
3,当然也可以将fd取消注册,注意仅仅是从epoll中取消注册,并不会关闭fd:
void sio_del(struct sio *sio, struct sio_fd *sfd);
4,为了监视事件的发生,我们有如下4个注册读写的接口:
void sio_watch_write(struct sio *sio, struct sio_fd *sfd);
void sio_unwatch_write(struct sio *sio, struct sio_fd *sfd);
void sio_watch_read(struct sio *sio, struct sio_fd *sfd);
void sio_unwatch_read(struct sio *sio, struct sio_fd *sfd);
5,最后,我们用这个函数执行事件循环,我们的回调函数就会随着事件的发生而被sio调用了:
void sio_run(struct sio *sio, int timeout_ms);
快速回顾了"sio事件触发层“的接口后,我们先从sio_dgram(UDP)层入手,看看如何基于sio层实现sio_dgram层,从而给用户提供使用便捷的UDP框架,这一部分代码可以见sio_dgram.h, sio_dgram.c,可以通过git阅读:
sio_dgram.h:https://code.csdn.net/qq120848369/simple_io/tree/master/sio_dgram.h
sio_dgram.c:https://code.csdn.net/qq120848369/simple_io/tree/master/sio_dgram.c
在编写sio_dgram接口的时候,我将自己作为一个sio层的用户看待,同时我希望我的实现能够被复用,给我的上层用户友好的接口,抱着这种思路,很快就可以获知用户的基本需求,设计出最简的接口:
1,首先,我要么希望创建一个UDP服务端套接字,绑定到一个端口上接受请求,或者我希望创建一个udp套接字向其他服务发包,总结起来就是创建udp socket,绑定到一个(ip,port)上,对于服务端套接字,应该由用户指定port,对于客户端来说只需要系统随机选择一个可用端口就足够了。根据socket编程基础,bind时指定ip=0和port=0的含义即令操作系统随机选定IP和Port,所以创建UDP套接字的接口其实可以统一为:
struct sio_dgram *sio_dgram_open(struct sio *sio, const char *ipv4, uint16_t port, sio_dgram_callback_t callback, void *arg);
这个函数将创建一个udp socket并设置成非阻塞,bind到(ipv4,port)上,并且将socket注册到sio中,监视socket的读事件,当读取到udp包的时候回调用户的callback并传入用户参数arg。 听起来非常简单,因为sio层是现成的,所以sio_dgram要做的就是按照sio的接口去注册fd,并在读事件回调的时候执行真正的recvfrom读取一个udp包,并回调用户的sio_dgram_callback_t callback,从而交由用户响应处理,它的实现如下:
struct sio_dgram *sio_dgram_open(struct sio *sio, const char *ipv4, uint16_t port, sio_dgram_callback_t callback, void *arg)
{
int sock = socket(AF_INET, SOCK_DGRAM, );
if (sock == -)
return NULL;
fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK); int bufsize = ;
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)); struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ipv4);
if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -) {
close(sock);
return NULL;
}
struct sio_dgram *sdgram = calloc(, sizeof(*sdgram));
sdgram->sock = sock;
sdgram->user_callback = callback;
sdgram->user_arg = arg;
sdgram->sfd = sio_add(sio, sock, _sio_dgram_callback, sdgram);
if (!sdgram->sfd) {
sio_dgram_close(sio, sdgram);
return NULL;
}
sio_watch_read(sio, sdgram->sfd);
return sdgram;
}
这里创建了一个sio_dgram结构,将用户的回调函数和参数存储在里面,并sio_add注册sock到sio,提供了一个_sio_dgram_callback函数,同时将arg设置为sio_dgram,最后设置监视sock的读事件,看一下_sio_dgram_callback函数做了什么:
static void _sio_dgram_read(struct sio *sio, struct sio_dgram *sdgram)
{
struct sockaddr_in source;
socklen_t len = sizeof(source);
int64_t size = recvfrom(sdgram->sock, sdgram->inbuf, , , (struct sockaddr *)&source, &len);
if (size > ) {
sdgram->user_callback(sio, sdgram, &source, sdgram->inbuf, size, sdgram->user_arg);
}
} static void _sio_dgram_callback(struct sio *sio, struct sio_fd *sfd, enum sio_event event, void *arg)
{
struct sio_dgram *sdgram = arg; switch (event) {
case SIO_READ:
_sio_dgram_read(sio, sdgram);
break;
case SIO_WRITE:
break;
case SIO_ERROR:
break;
default:
return;
}
}
_sio_dgram_callback是一个标准sio回调接口,它只处理SIO_READ事件,即udp socket可读,这里arg就是之前注册sio_dgram结构:
struct sio_dgram {
int sock;
struct sio_fd *sfd;
sio_dgram_callback_t user_callback;
void *user_arg;
char inbuf[];
};
接着调用了_sio_dgram_read处理SIO_READ事件,在函数中调用了recvfrom函数读取一个udp包,如果读取成功则回调用户函数,用户可以在回调函数中获知udp包的data和size,也能获取自己的用户参数,同时也可以得知客户端地址。
除此之外,你可以可以通过sio_dgram_set随时修改udp socket的用户回调函数和用户参数:
void sio_dgram_set(struct sio *sio, struct sio_dgram *sdgram, sio_dgram_callback_t callback, void *arg)
{
sdgram->user_callback = callback;
sdgram->user_arg = arg;
}
2,通过上面的接口,我们对sio_dgram层的核心设计有了一个清晰的认识,既然sio_dgram在收到了udp包后回调了用户,那么用户如何回复一个应答呢:
int sio_dgram_response(struct sio *sio, struct sio_dgram *sdgram, struct sockaddr_in *source, const char *data, uint64_t size);
用户在回调函数中,调用这个函数可以向客户端返回一个包,返回值-1表示失败,0表示成功,实现很简单,因为我们对UDP SOCKET设置过非阻塞,所以这里如果socket缓冲区已满,那么发包将立即失败(这个处理方式值得留意,后面将于TCP对比),失败的处理交给用户自己:
int sio_dgram_response(struct sio *sio, struct sio_dgram *sdgram, struct sockaddr_in *source, const char *data, uint64_t size)
{
int64_t ret = sendto(sdgram->sock, data, size, , (struct sockaddr *)source, sizeof(*source));
if (ret > )
ret = ;
return ret; // -1 or 0
}
这个接口使用struct sockaddr_in *指定发送目的地,应该在用户回调函数里配合使用最方便,从函数命名response上也可以看出。 如果用户希望在回调函数之外向某个地址发包又怎么办呢,其实也很简单:
int sio_dgram_write(struct sio *sio, struct sio_dgram *sdgram, const char *ipv4, uint16_t port, const char *data, uint64_t size)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ipv4); int64_t ret = sendto(sdgram->sock, data, size, , (struct sockaddr *)&addr, sizeof(addr));
if (ret > )
ret = ;
return ret; // -1 or 0
}
用户直接指定(ipv4,port)就可以向指定地址发包了,避免用户自己操作struct sockaddr_in,隐藏了繁琐的细节。
3,如果你希望暂时不处理UDP包,或者你希望将udp socket注册到其他的sio中,可以使用sio_dgram_detach接口和sio_dgram_attach接口:
int sio_dgram_attach(struct sio *sio, struct sio_dgram *sdgram)
{
sdgram->sfd = sio_add(sio, sdgram->sock, _sio_dgram_callback, sdgram);
if (!sdgram->sfd)
return -;
sio_watch_read(sio, sdgram->sfd);
return ;
} void sio_dgram_detach(struct sio *sio, struct sio_dgram *sdgram)
{
sio_del(sio, sdgram->sfd);
sdgram->sfd = NULL;
}
detach调用sio_del将udp socket从当前的sio中取消注册,而attach则将udp socket注册到指定的sio上,并监视读事件。
4,如果你希望关闭udp socket,那么调用sio_dgram_close就可以了,它会首先将udp socket从sio中取消注册,这里的if判断是为了保证一个detached的sio_dgram同样可以被正确的close,即便它不属于任何sio。之后将关闭udp socket,释放sio_dgram结构。
void sio_dgram_close(struct sio *sio, struct sio_dgram *sdgram)
{
if (sdgram->sfd) {
sio_del(sio, sdgram->sfd);
}
close(sdgram->sock);
free(sdgram);
}
关于sio_dgram的所有函数都讲解完了,用sio_dgram来作为sio的第一个用户主要是因为UDP足够简单,让我们可以清晰的了解到如何基于sio构建上层的sio_dgram,给用户提供udp编程的便捷接口。有了这个基础,我相信TCP部分的学习将变得足够轻松,TCP同样需要与sio完成类似的交互设计,我们更多的关注点在于TCP自身的复杂性,这一部分将会是学习的重中之重,也是绝大多数人迷惑的地方,废话不多说,让我们轻装上阵!
下面开始讲解如何基于sio设计一套上层的tcp接口,以便简化tcp编程的复杂度。通过上面的sio_dgram,你应该已经对如何基于sio层开发更上层的udp接口有了一个清晰的认识,对于tcp来讲整个过程是大同小异的,复杂性体现在tcp编程自身,因为tcp是有状态的,有连接的可靠协议,所以我们必须对建立连接,监听连接,发送数据,读取数据各个接口进行封装,隐藏实现,并给用户提供编程接口,实现起来要比sio_dgram做更多的工作,下面就一步一步来讲解一下sio_stream,代码可以在这里看到:
sio_stream.h:https://code.csdn.net/qq120848369/simple_io/tree/master/sio_stream.h
sio_stream.c:https://code.csdn.net/qq120848369/simple_io/tree/master/sio_stream.c
1,和udp一样,为了使用tcp,首先要创建一个套接字,要么用于监听连接,要么用于发起连接,即tcp socket有两种不同的用途,需要我们分别对待处理。 这里先从最简单的监听套接字看起,因为它做的事情比较简单,下面的接口用于创建一个监听套接字:
struct sio_stream *sio_stream_listen(struct sio *sio, const char *ipv4, uint16_t port, sio_stream_callback_t callback, void *arg);
这个函数将会创建一个tcp socket并设置为非阻塞模式,之后调用bind绑定到(ipv4,port)上进行监听,并将监听socket向sio注册并设置为监测读事件,设置读事件的回调函数为_sio_accept_callback,当sio发现tcp socket可读的时候会回调_sio_accept_callback进行处理,最后将一个struct sio_steram对象返回,它记录这个tcp socket相关的所有数据。这里需要用户提供一个sio_stream_callback_t用户回调函数和void*arg用户参数,当_sio_accept_callback接受到新连接的时候,会回调用户回调函数通知用户进行处理。
struct sio_stream *sio_stream_listen(struct sio *sio, const char *ipv4, uint16_t port, sio_stream_callback_t callback, void *arg)
{
int sock = socket(AF_INET, SOCK_STREAM, );
if (sock == -)
return NULL;
fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK);
int on = ;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ipv4);
if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -) {
close(sock);
return NULL;
}
listen(sock, ); struct sio_stream *stream = _sio_stream_new(sock, SIO_STREAM_LISTEN, callback, arg);
stream->sfd = sio_add(sio, sock, _sio_accept_callback, stream);
if (!stream->sfd) {
sio_stream_close(sio, stream);
return NULL;
}
sio_watch_read(sio, stream->sfd);
return stream;
}
在讲解_sio_accept_callback之前,首先看一下struct sio_stream里存储了哪些字段来描述这个tcp socket:
// TCP流事件
enum sio_stream_event {
SIO_STREAM_ACCEPT, /**< 接收了新连接 */
SIO_STREAM_DATA, /**< 接收了新数据 */
SIO_STREAM_ERROR, /**< 连接出现错误 */
SIO_STREAM_CLOSE, /**< 连接被关闭 */
}; struct sio;
struct sio_fd;
struct sio_stream;
struct sio_buffer; // 用户事件回调
typedef void (*sio_stream_callback_t)(struct sio *sio, struct sio_stream *stream, enum sio_stream_event event, void *arg); enum sio_stream_type {
SIO_STREAM_LISTEN,
SIO_STREAM_CONNECT,
SIO_STREAM_NORMAL,
}; // 封装TCP连接
struct sio_stream {
enum sio_stream_type type; /**< socket类型 */
int sock; /**< socket描述符 */
struct sio_fd *sfd; /**< 注册在sio上的socekt */
sio_stream_callback_t user_callback; /**< 用户回调 */
void *user_arg; /**< 用户参数 */
struct sio_buffer *inbuf; /**< 读缓冲 */
struct sio_buffer *outbuf; /**< 写缓冲 */
};
观察struct sio_stream对象,可以看到enum sio_stream_type标识了这个socket的类型是监听套接字,连接套接字,还是一个已经连接完成的套接字,你可以对应到TCP协议的状态中去,比如LISTEN, SYN_SENT, ESTABLISHED, 这很容易理解,目前还看不到这个标记的任何作用,我们只是创建的时刻标记下来,后面讲解的某个接口会用到这个字段。 sock记录了tcp socket,sfd是注册到sio的句柄,user_callback是用户注册的回调函数,user_arg是用户参数,inbuf和outbuf是读与写的缓冲区,sio_buffer是可以动态扩容的高效缓冲区实现。
这里关注一下用户的回调函数类型sio_stream_callback_t,它接受struct sio, struct sio_stream,其中enum sio_stream_event表示这个tcp socket发生了什么tcp层的事件,主要包含SIO_STREAM_ACCEPT接受了一个连接,SIO_STREAM_DATA读到了新的数据,SIO_STREAM_ERROR连接发生了错误,SIO_STREAM_CLOSE连接被对端关闭,用户回调函数根据event类型对struct sio_stream进行不同的操作即可。
那么回到sio_stream_listen函数,看一下_sio_accept_callback是如何accpet获取新连接并回调上层用户的:
static void _sio_accept_callback(struct sio *sio, struct sio_fd *sfd, enum sio_event event, void *arg)
{
struct sio_stream *acceptor = arg; int sock = accept(sfd->fd, NULL, NULL);
if (sock == -)
return;
fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK);
int nodelay = ;
setsockopt(sock, SOL_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); struct sio_stream *stream = _sio_stream_new(sock, SIO_STREAM_NORMAL, acceptor->user_callback, acceptor->user_arg);
stream->sfd = sio_add(sio, sock, _sio_stream_callback, stream);
if (!stream->sfd) {
return sio_stream_close(sio, stream);
}
sio_watch_read(sio, stream->sfd);
acceptor->user_callback(sio, stream, SIO_STREAM_ACCEPT, acceptor->user_arg);
}
因为sio_stream层向sio层注册sio_fd的时候设置的arg是struct sio_stream对象,所以在这里首先取出sio_stream,从而确定是哪个tcp socket发生了读事件,这里没有校验sio层返回的event是否为SIO_READ,因为对于监听套接字来说,它应该永远不会出现SIO_ERROR。 既然sio层通知sio_stream层监听套接字可读,那么我们首先accept获取一个tcp socket并设置为非阻塞,同时设置TCP的Nodelay选项(一个优化选项,可以百度了解一下TCP_NODELAY),之后会调用_sio_stream_new创建一个struct sio_stream对象并将这个accept得到的sock设置进去,这个struct sio_stream的类型是SIO_STREAM_NORMAL,也就是说连接已建立。这里注意默认accept来的struct sio_stream的用户回调函数会继承监听套接字的用户回调函数,用户参数也一样。 之后会将sock注册到sio层并监听读事件,最后回调用户并传入这个accept来的sio_stream对象,以便上层用户对新连接进行处理。
这里需要注意sio_add时提供一个_sio_stream_callback回调函数,这是处理一个已建立连接套接字数据收发的一个回调函数,在函数内会自动完成数据的接收和发送,从而为用户隐藏网络IO的细节,这个函数留在后面讲解。 我们在看完了监听套接字做的事情之后,接着看一个连接套接字是如何工作的,监听套接字和连接套接字做的事情是非常相似的,放在一起对比更利于理解,在看完这两个过程后再回头去看_sio_stream_callback做的事情就很清晰了。
2,对于tcp来说,想要主动建立到对端的连接需要调用connect函数,这里对应sio_stream中的函数:
struct sio_stream *sio_stream_connect(struct sio *sio, const char *ipv4, uint16_t port, sio_stream_callback_t callback, void *arg)
调用该函数可以向(ipv4,port)发起主动连接返回一个struct sio_stream对象标识这个TCP连接,当该连接接收到数据或者出现错误时,会回调callback函数并传入arg从而由用户决定下一步的动作。 它的实现如下:
struct sio_stream *sio_stream_connect(struct sio *sio, const char *ipv4, uint16_t port, sio_stream_callback_t callback, void *arg)
{
int sock = socket(AF_INET, SOCK_STREAM, );
if (sock == -)
return NULL;
fcntl(sock, F_SETFL, fcntl(sock, F_GETFL) | O_NONBLOCK);
int nodelay = ;
setsockopt(sock, SOL_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ipv4);
int ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
if (ret == - && errno != EINPROGRESS) {
close(sock);
return NULL;
}
struct sio_stream *stream = _sio_stream_new(sock, ret == 0? SIO_STREAM_NORMAL :SIO_STREAM_CONNECT, callback, arg);
stream->sfd = sio_add(sio, sock, ret == ? _sio_stream_callback : _sio_connect_callback, stream);
if (!stream->sfd) {
sio_stream_close(sio, stream);
return NULL;
}
if (ret == )
sio_watch_read(sio, stream->sfd);
else
sio_watch_write(sio, stream->sfd);
return stream;
}
首先创建一个tcp socket,设置为非阻塞模式,并设置tcp_nodelay选项,然后发起一次非阻塞connect,如果返回-1并且errno为EINPROGRESS那么connect正在异步执行,我们需要监测socket的写事件并在写事件发生时检查连接是否成功,如果errno为其他值则表示连接建立失败。如果返回0表示连接立即建立成功,那么只需要监测socket的读事件等待数据到来即可。对于connect返回0来说,连接已经建立完成,所以设置的sio回调函数是_sio_stream_callback来处理数据读写,否则connect正在内核异步处理,我们注册_sio_connect_callback来等待异步建立的结果,这里通过sio_add即注册到了sio上监听此socket的相关事件了。
这里特别关注一下_sio_connect_callback是如何处理异步connect的结果的,当sio通知_sio_connect_callback写事件发生或者错误发生的时候,我们需要对socket的状态进行变更,并修改监听的事件以便投入正常的数据收发流程:
static void _sio_connect_callback(struct sio *sio, struct sio_fd *sfd, enum sio_event event, void *arg)
{
struct sio_stream *stream = arg; int ret, error;
socklen_t optlen = sizeof(error);
switch (event) {
case SIO_WRITE:
ret = getsockopt(sfd->fd, SOL_SOCKET, SO_ERROR, &error, &optlen);
if (ret == && error == ) {
stream->type = SIO_STREAM_NORMAL;
sio_watch_read(sio, sfd);
if (!sio_buffer_length(stream->outbuf))
sio_unwatch_write(sio, sfd);
sio_set(sio, sfd, _sio_stream_callback, stream);
break;
}
case SIO_ERROR:
stream->user_callback(sio, stream, SIO_STREAM_ERROR, stream->user_arg);
break;
default:
break;
}
}
函数首先取出struct sio_stream对象,如果发生的SIO_WRITE事件那么说明异步connect完成,我们需要调用getsockopt检查连接是否建立成功,需要同时检查返回值ret和error,如果连接建立成功,那么连接的状态变更为SIO_STREAM_NORMAL进入数据收发状态,注册监听读事件,如果写缓冲区已经有用户写入的数据,那么继续维持写事件的注册,否则就取消监听写事件,最后将sio事件回调函数修改为_sio_stream_callback函数,就此连接正式建立完成。 如果getsockopt返回连接建立失败或者sio通知SIO_ERROR发生错误,那么直接回调用户通知SIO_STREAM_ERROR,由用户决定如何处理这个struct sio_stream,用户应该总是将连接关闭。
3,从上面的监听和连接两个接口的实现上来看,最终一个已连接的TCP socket会注册在sio上,并在事件发生时回调_sio_stream_callback,所有流程最终都聚集在这一个函数中:
static void _sio_stream_callback(struct sio *sio, struct sio_fd *sfd, enum sio_event event, void *arg)
{
struct sio_stream *stream = arg;
int error = ;
switch (event) {
case SIO_READ:
error = _sio_stream_read(sio, sfd, stream);
break;
case SIO_WRITE:
error = _sio_stream_write(sio, sfd, stream);
break;
case SIO_ERROR:
error = ;
break;
default:
return;
}
if (error == ) { // error
stream->user_callback(sio, stream, SIO_STREAM_ERROR, stream->user_arg);
} else if (error == ) { // peer-close
stream->user_callback(sio, stream, SIO_STREAM_CLOSE, stream->user_arg);
}
}
如果发生了SIO_READ读事件,那么调用_sio_stream_read处理,如果发生了SIO_WRITE事件则调用_sio_stream_write,如果发生了SIO_ERROR则直接标记error=1。
这里error=0表示连接没有发生任何错误,error=1表示连接发生了错误,error=2表示对端关闭了连接,如果发生了错误那么需要回调用户通知SIO_STREAM_ERROR,用户总是应该将连接关闭,如果对端关闭了连接那么需要回调用户SIO_STREAM_CLOSE,用户总是应该将连接关闭,如果error=0则不需要通知用户任何事情。
详细看一下_sio_stream_read做了什么:
static int _sio_stream_read(struct sio *sio, struct sio_fd *sfd, struct sio_stream *stream)
{
char *space;
sio_buffer_reserve(stream->inbuf, ); /* 4KB per read */
sio_buffer_space(stream->inbuf, &space, NULL); int64_t bytes = read(sfd->fd, space, );
if (bytes == -) {
if (errno != EINTR && errno != EAGAIN)
return ;
} else if (bytes == ) {
return ;
} else {
sio_buffer_seek(stream->inbuf, bytes);
stream->user_callback(sio, stream, SIO_STREAM_DATA, stream->user_arg);
}
return ;
}
这个函数首先要求sio_stream的inbuf(struct sio_buffer类型)至少在末尾保留4096字节的空闲空间,然后调用sio_buffer_space获得实际的缓冲区首地址space,调用read并将数据读入到space开始的4096字节空间内,返回值bytes表示实际读到的字节数,应该总是小于等于4096,如果read返回-1并且errno属于EINTR或者EAGAIN那么表示只是被信号中断或者数据尚未到来,那么本次就相当于什么都没发生,等待sio下次通知即可,其他errno均是致命错误,需要返回1表示连接出现错误。如果bytes==0,表示对端关闭了连接,那么需要返回2。如果bytes > 0则表示读到了实际的数据,需要将sio_buffer的写指针向后移动bytes字节,然后回调用户SIO_STREAM_DATA让用户处理数据并返回0表示没有发生任何错误。
对于_sio_stream_write函数来说,它做的事情就是将堆积在outbuf中的数据写出:
static int _sio_stream_write(struct sio *sio, struct sio_fd *sfd, struct sio_stream *stream)
{
char *data;
uint64_t size;
sio_buffer_data(stream->outbuf, &data, &size);
int64_t bytes = write(sfd->fd, data, size);
if (bytes == -) {
if (errno != EINTR && errno != EAGAIN)
return ;
} else {
sio_buffer_erase(stream->outbuf, bytes);
if (bytes == size)
sio_unwatch_write(sio, sfd);
}
return ;
}
首先取出outbuf中的缓冲区首地址到data中,长度为size,然后调用write向对端发送,如果返回-1并且是致命错误则返回1表示连接出现错误,否则的确发送bytes字节出去,首先将写缓冲区头部的bytes字节擦除,然后判断是否将所有数据发出,如果全部发出则可以取消监听写事件,最后返回0表示没有发生任何错误。
这里补充一个知识点,对于tcp socket来说,阻塞模式下read和write都可能会挂起等待,对于read来说,如果此时此刻对端还没有给本端发送数据或者数据还没到达,read都会阻塞等待,对于write来说,如果对端迟迟不接收数据,或者本端发送了大量数据的同时对端不去read数据,那么根据TCP协议滑动窗口是可以知道两边的TCP滑动窗口都将塞满,write无法继续往内核塞更多的数据就会挂起等待,直到对端调用read从TCP协议栈里取出数据,才能让数据流继续流动。
所以,对于我们编写异步网络通讯的时候,我们从来也不会阻塞read等待数据,而是依靠epoll通知数据到来,对于write来说,我们从来也不会阻塞write直到将所有数据写出,而是在滑动窗口塞满的情况下将数据缓冲在应用层的buffer里并注册写事件,等待epoll通知我们可以写了,那么说明滑动窗口有空闲空间了,我们这时候再调用非阻塞write尝试往里再放一些数据去发送就会成功。
对于这种异步网络通讯模式情景下,由于sio_stream框架层一旦有数据到来会分配更大的读缓冲区去容纳read来的数据,一旦用户向一个已经拥塞的滑动窗口中继续write数据就会失败,从而导致数据在应用层缓冲区不断堆积,所以上层编程用户必须尽快的处理读缓冲区中的请求(每次回调用户通知SIO_STREAM_DATA时,用户应该一次性解析缓冲区中的所有请求),并且主动检查写缓冲区堆积是否过大,及时的发现拥塞的连接进行写流量控制或者剔除连接,对于写缓冲区堆积状况可以通过如下函数检查:
uint64_t sio_stream_pending(struct sio_stream *stream)
{
return sio_buffer_length(stream->outbuf);
}
它返回struct sio_stream中的Outbuf的堆积字节数,如果超过用户的容忍范围,用户应该主动做出一些处理,比如关闭连接或者过等待堆积数量下降到一定值后再继续发送,也就是所谓的流量控制。
TCP的核心已经讲解完毕,下面简单看一下sio_stream_attach和sio_stream_detach接口,它们用于将struct sio_stream从一个struct sio中脱离,并注册到其他的struct sio中,这主要用于完成多线程网络编程用途,这里也将看到struct sio_stream中的type(SIO_STREAM_LISTEN,SIO_STREAM_CONNECT,SIO_STREAM_NORMAL)的用途:
void sio_stream_detach(struct sio *sio, struct sio_stream *stream)
{
sio_del(sio, stream->sfd);
stream->sfd = NULL;
} int sio_stream_attach(struct sio *sio, struct sio_stream *stream)
{
switch (stream->type) {
case SIO_STREAM_LISTEN:
stream->sfd = sio_add(sio, stream->sock, _sio_accept_callback, stream);
if (!stream->sfd)
return -;
sio_watch_read(sio, stream->sfd);
break;
case SIO_STREAM_CONNECT:
stream->sfd = sio_add(sio, stream->sock, _sio_connect_callback, stream);
if (!stream->sfd)
return -;
sio_watch_write(sio, stream->sfd);
break;
case SIO_STREAM_NORMAL:
stream->sfd = sio_add(sio, stream->sock, _sio_stream_callback, stream);
if (!stream->sfd)
return -;
sio_watch_read(sio, stream->sfd);
if (sio_buffer_length(stream->outbuf))
sio_watch_write(sio, stream->sfd);
break;
default:
return -;
}
return ;
}
detach操作很简单,只是将tcp socket从当前的sio上取消注册,并设置struct sio_stream::sfd为NULL,而attach则需要根据sio_stream的类型注册不同的事件到sio,
对于监听套接字只需要监测读事件,对于连接套接字需要注册写事件,对于已连接的套接字需要注册读事件,如果写缓冲区有堆积还需要注册写事件。
用户怎么发送数据的? 别急,是通过调用这个接口发送数据的,你会看到写缓冲区是如何工作的:
int sio_stream_write(struct sio *sio, struct sio_stream *stream, const char *data, uint64_t size)
{
uint64_t len = sio_buffer_length(stream->outbuf);
if (len || stream->type == SIO_STREAM_CONNECT) {
sio_buffer_append(stream->outbuf, data, size);
return ;
}
int64_t bytes = write(stream->sock, data, size);
if (bytes == -) {
if (errno != EINTR && errno != EAGAIN)
return -;
bytes = ;
} else if (bytes == size) {
return ;
}
sio_buffer_append(stream->outbuf, data + bytes, size - bytes);
sio_watch_write(sio, stream->sfd);
return ;
}
如果写缓冲区已经有数据堆积,为了保证数据顺序性,必须将数据继续追加到写缓冲区末尾,等待写事件发生后顺序写出。如果sio_stream是一个连接套接字,那么必须将数据暂时写到写缓冲区并等待连接建立成功后由写事件触发并写出,因为此时此刻socket还没建立完成是不能发送数据的。 其他情况下,说明socket是一个已连接的套接字并且当前写穿缓冲区尚未有堆积,那么直接调用write写出数据,如果写数据发生错误,那么返回-1,用户检查函数返回值后应该总是关闭连接。 如果数据全部写出那么不需要做任何其他动作,如果发送被信号中断或者由于滑动窗口满导致发送失败,或者因为滑动窗口空间不足导致只写出了部分数据,那么剩余的数据需要追加到写缓冲区末尾,并注册写事件等待epoll通知时再继续尝试写出。
最后,如果用户希望关闭一个struct sio_stream,只需要调用这个函数即可:
void sio_stream_close(struct sio *sio, struct sio_stream *stream)
{
if (stream->sfd)
sio_del(sio, stream->sfd);
close(stream->sock);
sio_buffer_free(stream->inbuf);
sio_buffer_free(stream->outbuf);
free(stream);
}
这个函数首先检查了stream->sfd是否为NULL,这与sio_stream_detach有关,如果sio_stream当前注册在sio上那么sfd就不为NULL,所以应该首先从sio中移除,之后在关闭socket,释放读写缓冲区,最后释放sio_stream结构体。 if判断很关键,因为一个sio_stream极有可能从监听套接字所在sio中detach,并交给其他线程的sio管理,但可能因为应用层的控制策略或者因为向其他sio attach的时候发生失败需要关闭掉连接,此时sio_stream不属于任何sio,所以必须能够正确处理这个场景,保证接口语义明确。
关于TCP的部分讲解完成,最后一篇中将会通过使用sio_stream和sio_dgram开发上层网络程序,从而对异步网络编程有更深刻的理解。