在介绍多路复用之前,简单介绍一下socket。
socket
socket是套接字,中文意思是插座,这个词体现在哪里?因为socket有一个fd,对应了一个网络IO,而这个IO由一个五元组(sip,dip,sport,dport,proto(传输层))决定。这个fd与五元组就像是插座的关系。
操作系统在接受到网卡通知有数据到来时,会通知相应的进程去接收数据,通知的方法是发出SIGIO信号。在应用UDP的场景中,可以利用SIGIO信号收发数据,而TCP不行,原因在于使用TCP的时候,会有很多“冗余数据”,如三次握手等,而这些数据也会产生SIGIO。
select/poll
这里将select和poll一起介绍,因为它们很类似,都是通过定期轮询的方式检验IO是否有数据或准备好(可读可写),区别在于select是使用了三个集合来分别检验可读可写和出错状态,poll是将这三个集合整合为一个来检验。这里直接贴出select和poll应用的代码。
先贴出作为服务器的初始设置。
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
perror("socket");
return -1;
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0)
{
perror("bind");
return -2;
}
if (listen(sockfd, 5) < 0)
{
perror("listen");
return -3;
}
使用select
fd_set rfds, rset;//rfds用来存储,rset用于检测
FD_ZERO(&rfds);
FD_SET(sockfd, &rfds);
int max_fd = sockfd;
int i = 0;
while (1)
{
rset = rfds;
int nready = select(max_fd+1, &rset, NULL, NULL, NULL);
if (nready < 0)
{
printf("select error : %d\n", errno);
continue;
}
if (FD_ISSET(sockfd, &rset))
{
//accept
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0) continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd);
if (max_fd == FD_SETSIZE)
{
printf("clientfd --> out range\n");
break;
}
FD_SET(clientfd, &rfds);
if (clientfd > max_fd) max_fd = clientfd;
printf("sockfd:%d, max_fd:%d, clientfd:%d\n", sockfd, max_fd, clientfd);
if (--nready == 0) continue;
}
for (i = sockfd + 1; i <= max_fd; i ++)
{
if (FD_ISSET(i, &rset))
{
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(i, buffer, BUFFER_LENGTH, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
printf("read all data");
}
FD_CLR(i, &rfds);
close(i);
} else if (ret == 0)
{
printf(" disconnect %d\n", i);
FD_CLR(i, &rfds);
close(i);
break;
} else
{
printf("Recv: %s, %d Bytes\n", buffer, ret);
}
if (--nready == 0) break;
}
}
}
这里INET_ADDRSTRLEN以及BUFFER_LENGTH都是宏定义。
下面是poll代码,同样使用宏定义。
struct pollfd fds[POLL_SIZE] = {0};
fds[0].fd = sockfd;
fds[0].events = POLLIN;
int max_fd = 0;//值最大的fd,用于遍历
int i = 0;
for (i = 1; i < POLL_SIZE; i ++)
{
//初始化
fds[i].fd = -1;
fds[i].events=POLLIN;
}
while (1)
{
int nready = poll(fds, max_fd+1, 5);
if (nready <= 0) continue;
if ((fds[0].revents & POLLIN) == POLLIN) //监听fd有数据,说明有新连接
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0) continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n",inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd);
fds[clientfd].fd = clientfd;
fds[clientfd].events = POLLIN;
if (clientfd > max_fd) max_fd = clientfd;
if (--nready == 0) continue;
}
for (i = sockfd + 1; i <= max_fd; i ++)
{
if (fds[i].revents & (POLLIN|POLLERR))
{
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(i, buffer, BUFFER_LENGTH, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
printf("read all data");
}
//close(i);
fds[i].fd = -1;
} else if (ret == 0)
{
printf(" disconnect %d\n", i);
close(i);
fds[i].fd = -1;
break;
} else
{
printf("Recv: %s, %d Bytes\n", buffer, ret);
}
if (--nready == 0) break;
}
}
}
这里还要提示一下,select和poll函数的阻塞,是阻塞在函数内部。另外,在连接断开之后,要及时把集合中对应的已断开fd删去,避免浪费空间。
epoll
epoll与select/poll不同,内部用红黑树维护。这里有一点先说明,epoll不是任何时候效率都比select/poll高,当IO数少时,select和poll的效率反而更高。
int epoll_fd = epoll_create(EPOLL_SIZE);
struct epoll_event ev, events[EPOLL_SIZE] = {0};
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
while (1)
{
int nready = epoll_wait(epoll_fd, events, EPOLL_SIZE, -1);
if (nready == -1)
{
printf("epoll_wait\n");
break;
}
int i = 0;
for (i = 0;i < nready;i ++)
{
if (events[i].data.fd == sockfd)
{
struct sockaddr_in client_addr;
memset(&client_addr, 0, sizeof(struct sockaddr_in));
socklen_t client_len = sizeof(client_addr);
int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);
if (clientfd <= 0) continue;
char str[INET_ADDRSTRLEN] = {0};
printf("recvived from %s at port %d, sockfd:%d, clientfd:%d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);
} else {
int clientfd = events[i].data.fd;
char buffer[BUFFER_LENGTH] = {0};
int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0);
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
printf("read all data");
}
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
} else if (ret == 0)
{
printf(" disconnect %d\n", clientfd);
close(clientfd);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = clientfd;
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev);
break;
} else {
printf("Recv: %s, %d Bytes\n", buffer, ret);
}
}
}
}
有几点需要说明一下。一是sockfd没必要设为非阻塞,因为sockfd是否阻塞与epoll没关系。虽然设为非阻塞会快一点,但是那是在epoll检测完成之后,recv/send的过程时由于非阻塞快了一点。第二,epoll_wait()的阻塞相当于是带时间的条件等待,时间到或是条件满足(即检测到有IO)就返回。第三,events数组的值没必要设置得很大,经验值是连接数的1%即可。第四,注意要及时close(),否则服务端会出现大量CLOSE_WAIT。同时也要记得及时从epoll红黑树结构中及时删除节点,避免epoll中存在大量僵尸节点。
最后,还要说明一下水平触发和边沿触发。epoll默认水平触发。边沿触发是当socket接收到数据时会触发一次epoll_wait(),之后不管数据是否读完都不会再触发epoll_wait();水平触发则是socket中只要有数据就一直触发。在实际应用中,对于大数据一般用水平触发,小数据用边沿触发。这里还有两点要注意。一是虽然ET+循环读也可以做到LT的效果,但是对于大数据读写,一直循环会导致无法响应其他IO。另外,对于listenfd,它适用LT,这样可以避免多个连接同时到达而漏掉部分IO的问题,当然也可以ET+循环,但不建议。
当然,这里epoll代码还有可改进的地方,例如改成reactor写法,这里不再介绍。