并发服务器--02(基于I/O复用——运用epoll技术)

  本文承接自上一博文I/O复用——运用Select函数

epoll介绍

  epoll是在2.6内核中提出的。和select类似,它也是一种I/O复用技术,是之前的select和poll的增强版本。

  Linux下设计并发网络程序,向来不缺少方法,比如典型的Apache模型(Process Per Connection,简称PPC),TPC(Thread PerConnection)模型,以及select模型和poll模型,那为何还要再引入epoll呢?我们先来看一下常用模型的缺点:

  PPC/TPC模型

  这两种模型思想类似,就是让每一个到来的连接一边自己做事去,别再来烦我。只是PPC是为它开了一个进程,而TPC开了一个线程。可是别烦我是有代价的,它要时间和空间啊,连接多了之后,那么多的进程/线程切换,这开销就上来了;因此这类模型能接受的最大连接数都不会高,一般在几百个左右。

  select模型

  1)最大并发数限制,因为一个进程所打开的FD(文件描述符)是有限制的,由FD_SETSIZE设置,默认值是1024/2048,因此Select模型的最大并发数就被相应限制了。自己改改这个FD_SETSIZE?想法虽好,可是先看看下面吧…

  2)效率问题,select每次调用都会线性扫描全部的FD集合,这样效率就会呈现线性下降,把FD_SETSIZE改大的后果就是,大家都慢慢来,什么?都超时了??!!

  3)内核/用户空间内存拷贝问题,如何让内核把FD消息通知给用户空间呢?在这个问题上select采取了内存拷贝方法。

  poll模型

  基本上效率和select是相同的,select缺点的2和3它都没有改掉。

  epoll的提升

  其实把select的缺点反过来那就是Epoll的优点了:

  1)epoll没有最大并发连接的限制,上限是最大可以打开文件的数目(一般远大于2048),一般跟系统内存关系很大。具体数目可以cat /proc/sys/fs/file-max察看。

  2)效率提升,epoll最大的优点就在于它只管“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,epoll的效率就会远远高于select和poll。

  3)内存拷贝,epoll在这点上使用了“共享内存”,所以这个内存拷贝也省略了。

epoll接口

  epoll操作过程用到的三个接口如下:

#include <sys/epoll.h>
int epoll_create(int size);
// 返回:若成功返回非负epoll描述符,否则返回-1,并设置errno的值
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *events);
// 返回:若成功返回0,否则返回-1,并设置errno的值
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
// 返回:若成功返回在timeout时间内就绪的文件描述符数,否则返回-1,并设置errno的值

epoll_create 

  该函数返回一个epoll的描述符(一个整数)。size用来告诉内核这个监听的数目一共有多大,不同于select()中的第一个参数给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id号/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

  如下图是刚打开服务端程序(本文ET模式服务器程序)的截图:

  并发服务器--02(基于I/O复用——运用epoll技术)

  上图中,前三个/dev/pts/1应该表示系统输入、输出及异常,socket表示监听套接字,eventepoll表示进程创建的epoll。

  另外,当有新的客户连接到服务端时,截图如下:

  并发服务器--02(基于I/O复用——运用epoll技术)

epoll_ctl

  该函数是epoll的事件注册函数。它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里注册要监听的事件类型。

  该函数的参数说明如下(fd是file descriptor的缩写,表示文件描述符):

  1)第一个参数是epoll_create函数的返回值。

  2)第二个参数表示动作:

  EPOLL_CTL_ADD:注册新的fd到epfd中;
  EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
  EPOLL_CTL_DEL:从epfd中删除一个fd。

  3)第三个参数表示需要监听的fd。

  4)第四个参数告诉内核需要监听什么事。struct epoll_event的结构如下:

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

  events可以是以下几个宏的集合:
  EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
  EPOLLOUT:表示对应的文件描述符可以写;
  EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
  EPOLLERR:表示对应的文件描述符发生错误;
  EPOLLHUP:表示对应的文件描述符被挂断;
  EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
  EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

epoll_wait

  该函数等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合;maxevents告诉内核返回的events的最大大小,这个maxevents的值不能大于创建epoll_create()时的size,也必须大于0;参数timeout是超时时间(毫秒,0会立即返回,-1将永久阻塞)。该函数返回需要处理的数目,如返回0表示已超时。

epoll工作模式

  这部分要详细参考博文Epoll在LT和ET模式下的读写方式(已附于文章最后)。

  在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)

  从字面上看,意思是:
  * EAGAIN:再试一次
  * EWOULDBLOCK:如果这是一个阻塞socket,操作将被block
  * perror输出:Resource temporarily unavailable

  总结:
  这个错误表示资源暂时不够,可能read时,读缓冲区没有数据,或者,write时,写缓冲区满了。
  遇到这种情况,如果是阻塞socket,read/write就要阻塞掉。而如果是非阻塞socket,read/write立即返回-1,同时errno设置为EAGAIN。所以,对于阻塞socket, read/write返回-1代表网络出错了。但对于非阻塞socket, read/write返回-1不一定网络真的出错了,可能是Resource temporarily unavailable。 这时我们应该再试,直到Resource available。
  
  综上,对于non-blocking的socket,正确的读写操作为:
  读:忽略掉errno = EAGAIN的错误,下次继续读。
  写:忽略掉errno = EAGAIN的错误,下次继续写。
  对于select和epoll的LT模式,这种读写方式是没有问题的。 但对于epoll的ET模式,这种方式还有漏洞。

  epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

  LT模式:只要某个监听中的文件描述符处于readable/writable状态,无论什么时候进行epoll_wait都会返回该描述符;

  ET模式:只有某个监听中的文件描述符从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该描述符。

  ET模式下套接字要设定为non-blocking

  我们先来看一下Linux官方给出的手册关于LT和ET的一个例子

  “

  The epoll event distribution interface is able to behave both as
edge-triggered (ET) and as level-triggered (LT). The difference
between the two mechanisms can be described as follows. Suppose that
this scenario happens: 1. The file descriptor that represents the read side of a pipe (rfd)
is registered on the epoll instance.
2. A pipe writer writes 2 kB of data on the write side of the pipe.
3. A call to epoll_wait(2) is done that will return rfd as a ready
file descriptor.
4. The pipe reader reads 1 kB of data from rfd.
5. A call to epoll_wait(2) is done. If the rfd file descriptor has been added to the epoll interface
using the EPOLLET (edge-triggered) flag, the call to epoll_wait(2)
done in step 5 will probably hang despite the available data still
present in the file input buffer; meanwhile the remote peer might be
expecting a response based on the data it already sent. The reason
for this is that edge-triggered mode delivers events only when
changes occur on the monitored file descriptor. So, in step 5 the
caller might end up waiting for some data that is already present
inside the input buffer. In the above example, an event on rfd will
be generated because of the write done in 2 and the event is consumed
in 3. Since the read operation done in 4 does not consume the whole
buffer data, the call to epoll_wait(2) done in step 5 might block
indefinitely.
An application that employs the EPOLLET flag should use nonblocking
file descriptors to avoid having a blocking read or write starve a
task that is handling multiple file descriptors. The suggested way
to use epoll as an edge-triggered (EPOLLET) interface is as follows: i with nonblocking file descriptors; and
ii by waiting for an event only after read(2) or write(2)
return EAGAIN. By contrast, when used as a level-triggered interface (the default,
when EPOLLET is not specified), epoll is simply a faster poll(2), and
can be used wherever the latter is used since it shares the same
semantics.

  ”

  从上述例子,我们可以看出:

  1. 在LT模式下,只要某个监听中的文件描述符处于readable/writable状态,无论什么时候进行epoll_wait都会返回该描述符。所以,只要读缓冲区有数据或者写缓冲区仍有空间,那就可读或可写, 都不会因套接字设定为blocking或者non-blocking而导致epoll_wait进入无限期等待;

  2. 在ET模式下,只有某个监听中的文件描述符从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该描述符。所以当我们没有把读缓冲区的数据全部读完或者没有把写缓冲区的空间写满就返回,套接字就会一直处于不可读或者不可写的状态,这样read/write会阻塞直到套接字可读或可写,但在这种情况下,套接字不可能变为可读或可写,所以read/write会一直阻塞下去,从而导致epoll_wait无限期阻塞于该套接字而无法返回。所以,要将套接字设定为non-blocking,让epoll_waite可以及时返回。

  下边两图可形象展示LT和ET的区别:

  从socket读数据:
  并发服务器--02(基于I/O复用——运用epoll技术)

  往socket写数据:

  并发服务器--02(基于I/O复用——运用epoll技术)

  所以,在epoll的ET模式下,正确的读写方式为:
  读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
  写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

示例

示例1:回射程序(LT模式)

  这里服务器端程序主要摘自博文IO多路复用之epoll总结,不过修复了部分Bug,具体如下:

 #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h> #include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h> #define PORT 9877
#define BUFFERSIZ 1024
#define LISTENQ 1024
#define FDSIZE 1024
#define EPOLLEVENTS 100 //函数声明
//创建套接字并进行绑定
static int socket_bind(int port);
//IO多路复用epoll
static void do_epoll(int listenfd);
//事件处理函数
static void
handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
//处理接收到的连接
static void handle_accpet(int epollfd, int listenfd);
//读处理
static void do_read(int epollfd, int fd, char *buf);
//写处理
static void do_write(int epollfd, int fd, char *buf);
//添加事件
static void add_event(int epollfd, int fd, int state);
//修改事件
static void modify_event(int epollfd, int fd, int state);
//删除事件
static void delete_event(int epollfd, int fd, int state);
//close socket
void Close(int fd); int main(int argc, char *argv[])
{
int listenfd;
listenfd = socket_bind(PORT);
listen(listenfd, LISTENQ);
do_epoll(listenfd);
exit();
} static int socket_bind(int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET, SOCK_STREAM, );
if (listenfd == -)
{
perror("socket error:");
exit();
}
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -)
{
perror("bind error: ");
exit();
}
return listenfd;
} static void do_epoll(int listenfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int num;
char buf[BUFFERSIZ];
memset(buf, , BUFFERSIZ);
//创建一个描述符
epollfd = epoll_create(FDSIZE);
//添加监听描述符事件
add_event(epollfd, listenfd, EPOLLIN);
for ( ; ; )
{
//获取已经准备好的描述符事件
if ((num = epoll_wait(epollfd, events, EPOLLEVENTS, -)) == -)
{
perror("epoll_pwait");
exit();
}
handle_events(epollfd, events, num, listenfd, buf);
}
Close(epollfd);
} static void
handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf)
{
int i;
int fd;
// 遍历
for (i = ; i < num; i++)
{
fd = events[i].data.fd;
//根据描述符的类型和事件类型进行处理
if ((fd == listenfd) && (events[i].events & EPOLLIN))
handle_accpet(epollfd, listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
}
static void handle_accpet(int epollfd, int listenfd)
{
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddrlen);
if (clifd == -)
perror("accpet error:");
else
{
printf("accept a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
//添加一个客户描述符和事件
add_event(epollfd, clifd, EPOLLIN);
}
} static void do_read(int epollfd, int fd, char *buf)
{
int nread;
nread = read(fd, buf, BUFFERSIZ);
if (nread == -)
{
perror("read error:");
//Close(fd);
delete_event(epollfd, fd, EPOLLIN);
}
else if (nread == )
{
fprintf(stderr, "client close.\n");
//Close(fd);
delete_event(epollfd, fd, EPOLLIN);
}
else
{
printf("read message is : %s", buf);
//修改描述符对应的事件,由读改为写
modify_event(epollfd, fd, EPOLLOUT);
}
} static void do_write(int epollfd, int fd, char *buf)
{
int nwrite;
nwrite = write(fd, buf, strlen(buf));
if (nwrite == -)
{
perror("write error:");
//Close(fd);
delete_event(epollfd, fd, EPOLLOUT);
}
else
modify_event(epollfd, fd, EPOLLIN);
memset(buf, , BUFFERSIZ);
} static void add_event(int epollfd, int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if((epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) == -)
{
perror("epoll_ctl: add");
}
} static void delete_event(int epollfd, int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if((epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev)) == -)
{
perror("epoll_ctl: del");
}
Close(fd);
// 如果描述符fd已关闭,再从epoll中删除fd,则会出现epoll failed: Bad file descriptor问题
// 所以要先从epoll中删除fd,在关闭fd. 具体可参考博文http://www.cnblogs.com/scw2901/p/3907657.html
} static void modify_event(int epollfd, int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if((epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev)) == -)
{
perror("epoll_ctl: mod");
}
} void Close(int fd)
{
if((close(fd)) < )
{
perror("close socket error");
exit();
}
}

servepoll

  客户端程序:

 #include <netinet/in.h>
#include <sys/socket.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <errno.h> #define BUFFERSIZ 1024
#define SERV_PORT 9877
#define FDSIZE 1024
#define EPOLLEVENTS 100 void handle_connection(int sockfd);
void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf);
void add_event(int epollfd, int fd, int state);
void delete_event(int epollfd, int fd, int state);
void modify_event(int epollfd, int fd, int state); ssize_t Read(int fd, void *ptr, size_t nbytes);
void Write(int fd, void *ptr, size_t nbytes);
ssize_t writen(int fd, const void *vptr, size_t n);
void Writen(int fd, void *ptr, size_t nbytes); void Close(int fd); int main(int argc, char **argv)
{
if(argc != )
perror("usage: tcpcli <IPaddress>"); int sockfd;
struct sockaddr_in servaddr;
if( (sockfd = socket(AF_INET, SOCK_STREAM, )) < )
{
perror("socket error\n");
exit();
} bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
if((inet_pton(AF_INET, argv[], &servaddr.sin_addr)) <= )
{
perror("inet_pton error");
exit();
} if((connect(sockfd, (struct sockaddr*) &servaddr, sizeof(servaddr))) < )
{
perror("connect error\n");
exit();
} handle_connection(sockfd); Close(sockfd);
exit();
} void handle_connection(int sockfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
char buf[BUFFERSIZ];
int num;
epollfd = epoll_create(FDSIZE);
add_event(epollfd, sockfd, EPOLLIN);
add_event(epollfd, STDIN_FILENO, EPOLLIN);
for ( ; ; )
{
num = epoll_wait(epollfd, events, EPOLLEVENTS, -);
handle_events(epollfd, events, num, sockfd, buf);
}
Close(epollfd);
} void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf)
{
int fd;
int i;
int stdineof = ;
for (i = ; i < num; i++)
{
fd = events[i].data.fd;
if(fd == sockfd)
{
int nread;
nread = Read(sockfd, buf, BUFFERSIZ);
if (nread == )
{
if(stdineof == )
return;
fprintf(stderr, "server close\n");
Close(sockfd);
Close(epollfd);
exit();
}
Write(fileno(stdout), buf, nread);
}
else
{
int nread;
nread = Read(fd, buf, BUFFERSIZ);
if (nread == )
{
stdineof = ;
fprintf(stderr, "no inputs\n");
Close(fd);
continue;
}
modify_event(epollfd, sockfd, EPOLLOUT);
Writen(sockfd, buf, nread);
modify_event(epollfd, sockfd, EPOLLIN);
memset(buf, , BUFFERSIZ);
}
}
} void add_event(int epollfd, int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == -)
{
perror("epoll_ctl: add");
}
} void delete_event(int epollfd, int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -)
{
perror("epoll_ctl: del");
}
} void modify_event(int epollfd, int fd, int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
if(epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -)
{
perror("epoll_ctl: mod");
}
} ssize_t Read(int fd, void *ptr, size_t nbytes)
{
ssize_t n; if ( (n = read(fd, ptr, nbytes)) == -)
perror("read error");
return(n);
} void Write(int fd, void *ptr, size_t nbytes)
{
if (write(fd, ptr, nbytes) != nbytes)
perror("write error");
} /* Write "n" bytes to a descriptor. */
ssize_t writen(int fd, const void *vptr, size_t n)
{
size_t nleft;
ssize_t nwritten;
const char *ptr; ptr = vptr;
nleft = n;
while (nleft > ) {
if ( (nwritten = write(fd, ptr, nleft)) <= ) {
if (nwritten < && errno == EINTR)
nwritten = ; /* and call write() again */
else
return(-); /* error */
} nleft -= nwritten;
ptr += nwritten;
}
return(n);
} void Writen(int fd, void *ptr, size_t nbytes)
{
if (writen(fd, ptr, nbytes) != nbytes)
perror("writen error");
} void Close(int fd)
{
if((close(fd)) < )
{
perror("close error");
exit();
}
}

cliepoll

  程序运行截图如下:

  1)客户端主动连接主动关闭

  客户端:

  并发服务器--02(基于I/O复用——运用epoll技术)

  服务器端:

  并发服务器--02(基于I/O复用——运用epoll技术)

  2)客户端主动连接被动关闭

  客户端:

  并发服务器--02(基于I/O复用——运用epoll技术)

  服务器端:

  并发服务器--02(基于I/O复用——运用epoll技术)

示例2:回射程序(ET模式)

  这里我们只给出服务器端的程序。这部分程序部分参考自博文Epoll在LT和ET模式下的读写方式。具体程序如下:

 #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h> #include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h> #define PORT 9877
#define BUFFERSIZ 1024
#define LISTENQ 1024
#define FDSIZE 1024
#define EPOLLEVENTS 100 //函数声明
//创建套接字并进行绑定
int socket_bind(int port);
//IO多路复用epoll
void do_epoll(int listenfd);
//事件处理函数
void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
//处理接收到的连接
void handle_accpet(int epollfd, int listenfd);
//读处理
void do_read(int epollfd, int fd, char *buf);
//写处理
void do_write(int epollfd, int fd, char *buf); //设置socket连接为非阻塞模式
void setnonblocking(int sockfd);
//close socket
void Close(int fd); int main(int argc, char *argv[])
{
int listenfd;
listenfd = socket_bind(PORT);
listen(listenfd, LISTENQ);
do_epoll(listenfd);
exit();
} int socket_bind(int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET, SOCK_STREAM, );
if (listenfd == -)
{
perror("socket error:");
exit();
}
setnonblocking(listenfd);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -)
{
perror("bind error: ");
exit();
}
return listenfd;
} void do_epoll(int listenfd)
{
int epollfd;
struct epoll_event ev;
struct epoll_event events[EPOLLEVENTS];
int num;
char buf[BUFFERSIZ];
memset(buf, , BUFFERSIZ);
//创建一个描述符
epollfd = epoll_create(FDSIZE);
//添加监听描述符事件
ev.events = EPOLLIN;
ev.data.fd = listenfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &ev) == -)
{
perror("epoll_ctl: add");
exit();
} for ( ; ; )
{
//获取已经准备好的描述符事件
if ((num = epoll_wait(epollfd, events, EPOLLEVENTS, -)) == -)
{
perror("epoll_wait");
exit();
}
handle_events(epollfd, events, num, listenfd, buf);
}
Close(epollfd);
} void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf)
{
int i;
int fd;
// 遍历
for (i = ; i < num; i++)
{
fd = events[i].data.fd;
//根据描述符的类型和事件类型进行处理
if ((fd == listenfd) && (events[i].events & EPOLLIN))
handle_accpet(epollfd, listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd, fd, buf);
else if (events[i].events & EPOLLOUT)
do_write(epollfd, fd, buf);
}
} void handle_accpet(int epollfd, int listenfd)
{
int connfd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen;
struct epoll_event ev;
while ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &cliaddrlen)) > )
{
printf("accept a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
setnonblocking(connfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev) == -)
{
perror("epoll_ctl: add");
exit();
}
}
if (connfd == -)
{
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
perror("accept");
}
} void do_read(int epollfd, int fd, char *buf)
{
int nread;
int n = ;
struct epoll_event ev;
while ((nread = read(fd, buf + n, BUFFERSIZ-)) > )
{
n += nread;
}
if (nread == - && errno != EAGAIN)
{
perror("read error"); ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -)
{
perror("epoll_ctl: mod");
}
Close(fd);
}
else if(nread == )
{
perror("client close"); ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -)
{
perror("epoll_ctl: mod");
}
Close(fd);
}
else
{
printf("read message is : %s", buf);
ev.data.fd = fd;
ev.events = EPOLLOUT | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -)
{
perror("epoll_ctl: mod");
}
}
} void do_write(int epollfd, int fd, char *buf)
{
int nwrite, data_size = strlen(buf);
int n = data_size;
struct epoll_event ev;
int flag = ;
while (n > )
{
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n)
{
if (nwrite == - && errno != EAGAIN)
{
flag = ;
perror("write error"); ev.data.fd = fd;
ev.events = EPOLLOUT | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -)
{
perror("epoll_ctl: mod");
}
Close(fd);
}
break;
}
n -= nwrite;
} if(flag != )
{
ev.data.fd = fd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -)
{
perror("epoll_ctl: mod");
}
}
// 这句很重要
memset(buf, , BUFFERSIZ);
} void setnonblocking(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < )
{
perror("Error: fcntl(F_GETFL)\n");
exit();
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < )
{
perror("Error: fcntl(F_SETFL)\n");
exit();
}
} void Close(int fd)
{
if((close(fd)) < )
{
perror("close socket error");
exit();
}
}

servepoll2

参考资料

  IO多路复用之epoll总结

  Epoll在LT和ET模式下的读写方式

  epoll精髓

  Linux Epoll介绍和程序实例

特附博文Epoll在LT和ET模式下的读写方式

  在一个非阻塞的socket上调用read/write函数,返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)。从字面上看,EAGAIN,再试一次;EWOULDBLOCK,如果这是一个阻塞socket,操作将被block,perror输出: Resource temporarily unavailable。

  总结:
  这个错误表示资源暂时不够,能read时,读缓冲区没有数据,或者write时,写缓冲区满了。遇到这种情况,如果是阻塞socket,read/write就要阻塞掉;而如果是非阻塞socket,read/write则立即返回-1, 同时errno设置为EAGAIN。
  所以,对于阻塞socket,read/write返回-1代表网络出错了。但对于非阻塞socket,read/write返回-1不一定网络真的出错了。可能是Resource temporarily unavailable。这时你应该再试,直到Resource available。

  综上,对于non-blocking的socket,正确的读写操作为:
  读:忽略掉errno = EAGAIN的错误,下次继续读
  写:忽略掉errno = EAGAIN的错误,下次继续写

  对于select和epoll的LT模式,这种读写方式是没有问题的。但对于epoll的ET模式,这种方式还有漏洞。

  epoll的两种模式LT和ET
  二者的差异在于level-trigger模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。

  所以,在epoll的ET模式下,正确的读写方式为:

  读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
  写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

  正确的读

 n = ;
while ((nread = read(fd, buf + n, BUFSIZ-)) > ) {
n += nread;
}
if (nread == - && errno != EAGAIN) {
perror("read error");
}

  正确的写

 int nwrite, data_size = strlen(buf);
n = data_size;
while (n > ) {
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n) {
if (nwrite == - && errno != EAGAIN) {
perror("write error");
}
break;
}
n -= nwrite;
}

  正确的accept,accept 要考虑 2 个问题
  1)阻塞模式 accept 存在的问题
  考虑这种情况:TCP连接被客户端夭折,即在服务器调用accept之前,客户端主动发送RST终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在accept调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept调用上,就绪队列中的其他描述符都得不到处理。

  解决办法是把监听套接口设置为非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1,这时源自Berkeley的实现会在内核中处理该事件,并不会将该事件通知给epool,而其他实现把errno设置为ECONNABORTED或者EPROTO错误,我们应该忽略这两个错误。

  2)ET模式下accept存在的问题
  考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。

  解决办法是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。

  综合以上两种情况,服务器应该使用非阻塞地accept,accept在ET模式下的正确使用方式为:

 while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > ) {
handle_client(conn_sock);
}
if (conn_sock == -) {
if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR)
perror("accept");
}

  一道腾讯后台开发的面试题
  使用Linux epoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理?

  第一种最普遍的方式:
  需要向socket写数据的时候才把socket加入epoll,等待可写事件。
  接受到可写事件后,调用write或者send发送数据。
  当所有数据都写完后,把socket移出epoll。

  这种方式的缺点是,即使发送很少的数据,也要把socket加入epoll,写完后在移出epoll,有一定操作代价。

  一种改进的方式:
  开始不把socket加入epoll,需要向socket写数据的时候,直接调用write或者send发送数据。如果返回EAGAIN,把socket加入epoll,在epoll的驱动下写数据,全部数据发送完毕后,再移出epoll。

  这种方式的优点是:数据不多的时候可以避免epoll的事件处理,提高效率。

  最后贴一个使用epoll、ET模式的简单HTTP服务器代码:

 #include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/sendfile.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <fcntl.h>
#include <errno.h> #define MAX_EVENTS 10
#define PORT 8080 //设置socket连接为非阻塞模式
void setnonblocking(int sockfd) {
int opts; opts = fcntl(sockfd, F_GETFL);
if(opts < ) {
perror("fcntl(F_GETFL)\n");
exit();
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < ) {
perror("fcntl(F_SETFL)\n");
exit();
}
} int main(){
struct epoll_event ev, events[MAX_EVENTS];
int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;
struct sockaddr_in local, remote;
char buf[BUFSIZ]; //创建listen socket
if( (listenfd = socket(AF_INET, SOCK_STREAM, )) < ) {
perror("sockfd\n");
exit();
}
setnonblocking(listenfd);
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_addr.s_addr = htonl(INADDR_ANY);;
local.sin_port = htons(PORT);
if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < ) {
perror("bind\n");
exit();
}
listen(listenfd, ); epfd = epoll_create(MAX_EVENTS);
if (epfd == -) {
perror("epoll_create");
exit(EXIT_FAILURE);
} ev.events = EPOLLIN;
ev.data.fd = listenfd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
} for (;;) {
nfds = epoll_wait(epfd, events, MAX_EVENTS, -);
if (nfds == -) {
perror("epoll_pwait");
exit(EXIT_FAILURE);
} for (i = ; i < nfds; ++i) {
fd = events[i].data.fd;
if (fd == listenfd) {
while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote,
(size_t *)&addrlen)) > ) {
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -) {
perror("epoll_ctl: add");
exit(EXIT_FAILURE);
}
}
if (conn_sock == -) {
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
perror("accept");
}
continue;
}
if (events[i].events & EPOLLIN) {
n = ;
while ((nread = read(fd, buf + n, BUFSIZ-)) > ) {
n += nread;
}
if (nread == - && errno != EAGAIN) {
perror("read error");
}
ev.data.fd = fd;
ev.events = events[i].events | EPOLLOUT;
if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -) {
perror("epoll_ctl: mod");
}
}
if (events[i].events & EPOLLOUT) {
sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", );
int nwrite, data_size = strlen(buf);
n = data_size;
while (n > ) {
nwrite = write(fd, buf + data_size - n, n);
if (nwrite < n) {
if (nwrite == - && errno != EAGAIN) {
perror("write error");
}
break;
}
n -= nwrite;
}
close(fd);
}
}
} return ;
}
上一篇:关于doctype


下一篇:CentOS6.5 配置防火墙+允许指定ip访问端口