回顾一下“"开篇 -- 知其然,知其所以然"”中的两段代码,第一段虽然只使用1个线程但却也只能处理一个socket,第二段虽然能处理成百上千个socket但却需要创建同等数量的线程,分开来看都不完美,如果1个线程能够处理成百上千个socket就太好了!
问题在于,当前的实现中1个线程只能阻塞的recv等待网络数据的到来,recv在数据到来之前会挂起并让出cpu直到数据到来后才能继续执行,在此之前cpu是空闲的,并且你也无法获得cpu使用权。
如果可以趁着这个socket数据没到来之前先处理其他socket而不是苦苦等待一个socket,那一个线程是不是就可以处理多个socket的呢?答案是肯定的。
通过设置socket为非阻塞模式(O_NONBLOCK),我们在调用recv的时候就不会因为没有数据而挂起了,recv会立即返回并在没有数据的情况下设置errno=EWOULDBLOCK,通过检查返回值和errno,我们便可以获知recv发生了什么。
【初窥门径】在这个前提下,我们如何在1个线程中同时管理多个socket呢?没那么复杂,我们只需要写一个while(1)死循环,不停的遍历所有的socket,对每个socket调用非阻塞的recv尝试读取一段数据进行处理,并通过send返回应答即可,大致代码如下:
int main() { ... fcntl(listen_fd, O_NONBLOCK...); /* 设置非阻塞 */ listen(listen_fd); /* 监听套接字*/ int fd_array[10000] = {0}; /*fd下标的数组*/ while (1) {
sleep(1); // 睡眠1秒, 避免cpu负载过高 new_fd = accept(listen_fd); /* 尝试accept一个新socket*/ if (new_fd >= 0) { ... ... fcntl(new_fd, O_NONBLOCK...); fd_array[new_fd]= new_fd; ... } foreach(fd in fd_array) { if (fd == 0) continue; int n = recv(fd, request); /* 尝试从socket recv一段数据*/ if (n > 0) { ... /* 处理request */ send(fd, response); } if (fd has error) { close(fd); fd_array[fd] = 0; } } } }
首先启动了监听套接字,并设置了非阻塞,然后进入while(1)死循环。在每次循环头部首先调用accept尝试获取一个socket,由于非阻塞的原因,如果没有新连接会立即返回-1,否则设置新socket为非阻塞并放入fd_array数组中记录。接着,由于你不知道哪些socket有数据,于是只能遍历所有曾经accept获得到socket,调用非阻塞recv尝试读取数据,如果的确读到了数据则处理并send返回应答,如果socket发生了错误则关闭socket。
这段代码成功的实现了1个线程处理多个socket的目标,是完全可行的,但并不完美。你可以回顾一下代码,其中的while(1)死循环将导致这个线程毫不停歇的对socket一遍又一遍的轮询,无论socket是否真的有数据到来,这样简单粗暴的实现会让程序总是100%cpu满负载运转,造成不必要的资源浪费(假设机器只有1颗cpu,还有一堆进程等待cpu调度,势必会对其他进程造成极大的影响)。
我们还是思考怎么解决这个现状,切忌天马行空。既然while(1)忙轮询造成cpu负载高,那是否可以在while(1)头部sleep一会呢,当然可以通过sleep让出cpu给其他进程使用,但如果sleep太久导致socket数据不被及时处理也会是一个大问题,所以还必须保证sleep挂起的时间足够短,索性就sleep 1毫秒,问题差不多就解决了。
讲到这里,总算抛足了砖头该看看玉了。回顾一下我们一步一步改进的过程,总算到了这个节骨眼上,貌似基于手头上的工具很难再有所改进了。其实,linux内核开发者也注意到了这一点,为了解决这个切实的问题在内核中实现了一系列的api,目的就是避免我们忙轮询所有socket,转而由内核主动通知哪些socket有数据可读,我们在编码时就不必为遍历socket和sleep多少秒纠结了,新的api会sleep直到某些socket有数据可读才返回,并且直接告诉我们具体是哪些socket可读从而避免了遍历所有socket。
为了避免误导,这里提示一下:上述只提到了非阻塞模式下的recv操作,没有提到send。实际上,阻塞模式下的socket调用send同样会阻塞,这是由于TCP协议栈滑动窗口已满造成的,可以简单理解为数据拥塞的情况下导致send同样阻塞。在非阻塞模式下,调用socket的send会因为数据拥塞而返回失败,errno同样为EWOULDBLOCK,数据没发送出去只能不停的重试去send,和轮询recv的道理是类似的。为了避免引入太多阅读理解负担,所以在这里理解到这个程度已经完全足够了。
了解了背景,接下来直奔主题,看看新的api怎么用,怎么结合到之前的代码中。这里有个背景需要介绍一下,linux内核在实现这个功能的时候也并不是一步就做到了今天的样子,它至少经历了select,poll 两个版本的API实现后,才有了今天广泛使用的API:kqueue(freebsd), epoll(linux)。由于我们主要接触的都是linux系统,并且两者从原理上大同小异,所以对freebsd上的kqueue不做介绍,而对于select和poll两个版本的实现由于已经基本没有实用价值,所以暂时不做介绍,有兴趣可以在看完epoll之后搜索引擎简单了解一下。
【春暖花开】我们马上看一段epoll的使用片段(通过man epoll你可以在manpage里看到epoll这段代码),并与我们上面的非阻塞版本代码进行比较,看清两份代码实现之间的差异,然后逐个介绍其中涉及的API:
struct epoll_event ev, *events; for(;;) { // 相当于我们的while(1) nfds = epoll_wait(kdpfd, events, maxevents, -1); // 相当于我们的sleep(1) for(n = 0; n < nfds; ++n) { // 相当于我们的for遍历所有socket if(events[n].data.fd == listener) { // 相当于我们尝试accept新连接 client = accept(listener, (struct sockaddr *) &local, &addrlen); if(client < 0){ perror("accept"); continue; } setnonblocking(client); ev.events = EPOLLIN | EPOLLET; ev.data.fd = client; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) { fprintf(stderr, "epoll set insertion error: fd=%d0, client); return -1; } } else do_use_fd(events[n].data.fd); // 相当于我们recv,处理,send一个socket } }
可以看一下代码中的注释, 比对我们实现的非阻塞忙轮询版本代码, 会发现代码逻辑基本能够一一对应,一方面要accept新的socket,一方面要处理已有socket的读与写。为了学习epoll,我们需要关注差异在哪里,以及差异带来了什么好处,解决了什么问题。
首先笨拙的sleep被换成了epoll_wait,它的第1参数kdpfd是epoll的句柄(epoll_create创建),这个句柄中此前被注册了希望被epoll管理的socket(epoll_ctl注册)。当epoll_wait被调用后,会检查注册其上的socket是否有数据到来或者是否有剩余空间发送数据,如果都没有则会挂起,就像sleep一样睡眠,但与sleep的最大区别在于sleep多久是我们拍脑袋指定一个很小的数值,而epoll_wait会在任意socket可读或者可写的时候返回,这是由内核检测注册其上的socket并在满足条件时唤醒epoll_wait返回的,这就解决了sleep少则cpu繁忙sleep多则增加socket处理延迟的麻烦问题。
epoll_wait的第2,3个参数分别指定了一个struct epoll_event数组events和数组的大小maxevents,这是一个in/out参数,也就是epoll_wait在返回前会对数组内容进行赋值,其中记录的是发生了可读或者可写或者错误事件的socket以及具体发生的事件类型。这里的新名词”可读事件"表示有数据到来,"可写事件"表示内核缓冲区有剩余发送空间,“错误事件“表示socket发生了一些网络错误。既然epoll_wait在返回时把发生读写事件的socket写到了数组里,我们还需要遍历所有socket吗?当然不必了!借助epoll_wait,我们不必在那些没有任何动静的socket上做无用的recv和send尝试,只要是epoll_wait记录在数组里的socket一定是发生了特定的事件,这又帮我们解决了一个大麻烦。
for(n = 0; n < nfds; ++n) 遍历struct epoll_event数组,对于listener这个监听socket,调用accept得到新连接,并通过调用epoll_ctl注册到epoll句柄上以便之后的epoll_wait可以检测该socket的读写事件,对于非监听socket则调用do_use_fd函数去读写与请求处理,这里manpage并没有给出什么实际的代码,因为那些与epoll已经没有必然联系了。
现在你对epoll应该有了一个差不多的认识,但涉及到的结构体和API还没有详细的去看参数与返回值,使用上要注意什么也没有涉及。 在详细学习API之前,首先记住一点概念,epoll监听的是fd(文件描述符)的读,写,错误事件,与socket或者说tcp socket还是udp socket没有必然联系,epoll负责的仅仅是”事件触发“,正合本篇博客标题。
1,创建一个epoll句柄:
int epoll_create(int size)
这里的size参数意义为epoll管理的fd个数的一个建议值,简单说就是预分配多少个fd的管理空间,如果不足会扩容,所以称为建议值,一般填个1000,10000的都无所谓。
2,向epoll句柄注册,删除,修改socket:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; EPOLL_CTL_ADD // 注册fd EPOLL_CTL_MOD // 修改fd EPOLL_CTL_DEL // 删除fd
这个函数有3种功能,一个是注册(EPOLL_CTL_ADD)fd到epoll,一个是从epoll删除fd(EPOLL_CTL_DEL),一个是向epoll修改一个已注册的fd(EPOLL_CTL_MOD)。
第1个参数epfd是epoll句柄,第二个参数op是指上述3个操作类型之一,第三个参数是一个结构体,epoll_event的第一个成员events表示希望epoll监测fd的什么事件,常用包含:EPOLLIN(可读),EPOLLOUT(可写),EPOLLERR(错误),EPOLLHUP(也是错误),你可以通过位或的方式同时包含多个事件。data是一个union,你可以使用其中的一个字段记录一些信息,也就是一个用户参数,在epoll_wait返回的epoll_event数组中会返回给调用者使用。
3,检测fd事件并返回相关信息:
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
epfd是epoll句柄(epoll_create),events是用户分配的数组,maxevents是数组的大小,timeout表示多少毫秒没有任何socket发生事件则超时返回的时间,-1表示不超时,函数返回在events数组里填充了多少个fd,于是我们就可以访问events数组里特定数量的fd进行处理了。
4,关闭epoll句柄:
int close(int fd);
也就是关闭epoll_create的返回值,注册其上的fd不会被关闭,仅仅是从这个句柄上取消了注册。
我们使用epoll完成事件触发所需要做的所有操作都是依靠上述4个接口而已,在上面的代码示例里也对其使用方法和时机有大概的了解了。参考manpage,你应该有能力实现一个用epoll监听fd=0(终端标准输入)的程序,并将读到的文本回显到终端上的小demo,如果你感兴趣可以在这里停下自己去探索一下再回来。
再次回顾一下上述manpage里的示例代码,在for循环遍历epoll_wait返回的fd数组的时候有一处if else的判定,对于fd=listener则调用了accept相关的逻辑,对于其他的则调用了do_use_fd的逻辑,也就是用户使用epoll的时候必须对epoll_wait返回的fd属于什么应用逻辑进行区分对待,从代码来看会令代码比较冗长,缺乏共性的提取和问题的抽象,用起来并不方便。代码里为了像epoll注册一个fd,需要对struct epoll_event结构各字段赋值,然后调用epoll_ctl,显得过于繁琐。
为了方便自己使用,我们考虑对epoll进行一定程度的封装和抽象,对接口进行简化,对过程进行抽象,对细节进行隐藏,让epoll用起来轻松愉快~