网络IO多路复用简介

在介绍多路复用之前,简单介绍一下socket。

socket

socket是套接字,中文意思是插座,这个词体现在哪里?因为socket有一个fd,对应了一个网络IO,而这个IO由一个五元组(sip,dip,sport,dport,proto(传输层))决定。这个fd与五元组就像是插座的关系。
操作系统在接受到网卡通知有数据到来时,会通知相应的进程去接收数据,通知的方法是发出SIGIO信号。在应用UDP的场景中,可以利用SIGIO信号收发数据,而TCP不行,原因在于使用TCP的时候,会有很多“冗余数据”,如三次握手等,而这些数据也会产生SIGIO。

select/poll

这里将select和poll一起介绍,因为它们很类似,都是通过定期轮询的方式检验IO是否有数据或准备好(可读可写),区别在于select是使用了三个集合来分别检验可读可写和出错状态,poll是将这三个集合整合为一个来检验。这里直接贴出select和poll应用的代码。
先贴出作为服务器的初始设置。

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
    perror("socket");
    return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) 
{
    perror("bind");
    return -2;
}
    if (listen(sockfd, 5) < 0) 
{
    perror("listen");
    return -3;
}

使用select

fd_set rfds, rset;//rfds用来存储,rset用于检测
FD_ZERO(&rfds);
FD_SET(sockfd, &rfds);
int max_fd = sockfd;
int i = 0;
while (1) 
{
    rset = rfds;
    int nready = select(max_fd+1, &rset, NULL, NULL, NULL);
    if (nready < 0) 
    {
        printf("select error : %d\n", errno);
        continue;
    }
    if (FD_ISSET(sockfd, &rset)) 
    { 
        //accept
        struct sockaddr_in client_addr;
        memset(&client_addr, 0, sizeof(struct sockaddr_in));
        socklen_t client_len = sizeof(client_addr);
        int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
        if (clientfd <= 0) continue;
        char str[INET_ADDRSTRLEN] = {0};
        printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd);
        if (max_fd == FD_SETSIZE) 
        {
            printf("clientfd --> out range\n");
            break;
        }
        FD_SET(clientfd, &rfds);
        if (clientfd > max_fd) max_fd = clientfd;
        printf("sockfd:%d, max_fd:%d, clientfd:%d\n", sockfd, max_fd, clientfd);
        if (--nready == 0) continue;
    }
    for (i = sockfd + 1; i <= max_fd; i ++) 
    {
        if (FD_ISSET(i, &rset)) 
       {
           char buffer[BUFFER_LENGTH] = {0};
           int ret = recv(i, buffer, BUFFER_LENGTH, 0);
           if (ret < 0)
           {
                if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    printf("read all data");
                }
                FD_CLR(i, &rfds);
                close(i);
            } else if (ret == 0) 
            {
                printf(" disconnect %d\n", i);
                FD_CLR(i, &rfds);
                close(i);
                break;
            } else 
            {
                printf("Recv: %s, %d Bytes\n", buffer, ret);
            }
            if (--nready == 0) break;
        }
    }
}

这里INET_ADDRSTRLEN以及BUFFER_LENGTH都是宏定义。
下面是poll代码,同样使用宏定义。

struct pollfd fds[POLL_SIZE] = {0};
fds[0].fd = sockfd;
fds[0].events = POLLIN;
int max_fd = 0;//值最大的fd,用于遍历
int i = 0;
for (i = 1; i < POLL_SIZE; i ++)
{
//初始化
    fds[i].fd = -1;
fds[i].events=POLLIN;
}
while (1) 
{
    int nready = poll(fds, max_fd+1, 5);
    if (nready <= 0) continue;
    if ((fds[0].revents & POLLIN) == POLLIN) //监听fd有数据,说明有新连接
    {
        struct sockaddr_in client_addr;
        memset(&client_addr, 0, sizeof(struct sockaddr_in));
        socklen_t client_len = sizeof(client_addr);
        int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
        if (clientfd <= 0) continue;
        char str[INET_ADDRSTRLEN] = {0};
        printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n",inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd);
        fds[clientfd].fd = clientfd;
        fds[clientfd].events = POLLIN;
        if (clientfd > max_fd) max_fd = clientfd;
        if (--nready == 0) continue;
}
    for (i = sockfd + 1; i <= max_fd; i ++) 
    {
        if (fds[i].revents & (POLLIN|POLLERR)) 
        {
            char buffer[BUFFER_LENGTH] = {0};
            int ret = recv(i, buffer, BUFFER_LENGTH, 0);
            if (ret < 0) 
            {
                if (errno == EAGAIN || errno == EWOULDBLOCK) 
                {
                    printf("read all data");
                }
                //close(i);
                fds[i].fd = -1;
            } else if (ret == 0) 
            {
                printf(" disconnect %d\n", i);
                close(i);
                fds[i].fd = -1;
                break;
            } else 
            {
                printf("Recv: %s, %d Bytes\n", buffer, ret);
            }
            if (--nready == 0) break;
        }
    }
}

这里还要提示一下,select和poll函数的阻塞,是阻塞在函数内部。另外,在连接断开之后,要及时把集合中对应的已断开fd删去,避免浪费空间。

epoll

epoll与select/poll不同,内部用红黑树维护。这里有一点先说明,epoll不是任何时候效率都比select/poll高,当IO数少时,select和poll的效率反而更高。

int epoll_fd = epoll_create(EPOLL_SIZE);
struct epoll_event ev, events[EPOLL_SIZE] = {0};
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
while (1) 
{
int nready = epoll_wait(epoll_fd, events, EPOLL_SIZE, -1);
	if (nready == -1) 
{
		printf("epoll_wait\n");
		break;
	}
	int i = 0;
	for (i = 0;i < nready;i ++) 
{
		if (events[i].data.fd == sockfd) 
        {
			struct sockaddr_in client_addr;
			memset(&client_addr, 0, sizeof(struct sockaddr_in));
			socklen_t client_len = sizeof(client_addr);
			int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
			if (clientfd <= 0) continue;
			char str[INET_ADDRSTRLEN] = {0};
			printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd);

			ev.events = EPOLLIN | EPOLLET;
			ev.data.fd = clientfd;
			epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
		} else {
			int clientfd = events[i].data.fd;
			char buffer[BUFFER_LENGTH] = {0};
			int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
			if (ret < 0) 
            {
				if (errno == EAGAIN || errno == EWOULDBLOCK) 
                {
			    	printf("read all data");
				}	
				close(clientfd);	
				ev.events = EPOLLIN | EPOLLET;
				ev.data.fd = clientfd;
				epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
			} else if (ret == 0) 
            {
				printf(" disconnect %d\n", clientfd);
					
				close(clientfd);
				ev.events = EPOLLIN | EPOLLET;
				ev.data.fd = clientfd;
				epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);	
				break;
			} else {
				printf("Recv: %s, %d Bytes\n", buffer, ret);
			}	
		}
	}
}

有几点需要说明一下。一是sockfd没必要设为非阻塞,因为sockfd是否阻塞与epoll没关系。虽然设为非阻塞会快一点,但是那是在epoll检测完成之后,recv/send的过程时由于非阻塞快了一点。第二,epoll_wait()的阻塞相当于是带时间的条件等待,时间到或是条件满足(即检测到有IO)就返回。第三,events数组的值没必要设置得很大,经验值是连接数的1%即可。第四,注意要及时close(),否则服务端会出现大量CLOSE_WAIT。同时也要记得及时从epoll红黑树结构中及时删除节点,避免epoll中存在大量僵尸节点。
最后,还要说明一下水平触发和边沿触发。epoll默认水平触发。边沿触发是当socket接收到数据时会触发一次epoll_wait(),之后不管数据是否读完都不会再触发epoll_wait();水平触发则是socket中只要有数据就一直触发。在实际应用中,对于大数据一般用水平触发,小数据用边沿触发。这里还有两点要注意。一是虽然ET+循环读也可以做到LT的效果,但是对于大数据读写,一直循环会导致无法响应其他IO。另外,对于listenfd,它适用LT,这样可以避免多个连接同时到达而漏掉部分IO的问题,当然也可以ET+循环,但不建议。
当然,这里epoll代码还有可改进的地方,例如改成reactor写法,这里不再介绍。

上一篇:【TcaplusDB知识库】[Generic表]根据部分Key字段值读取数据示例代码


下一篇:Network - 网络io与select/poll/epoll