多路复用并发模型 -- poll
#include<poll.h>
int poll(struct pollfd *fds, unsigned int nfds, int timeout);
struct pollfd {
int fd; //轮询的文件描述符
short events; //等待的事件
short revents; //实际发生的事件
}
nfds : fds 数组大小
timeout : 超时时间(毫秒), 0立即返回,负数一直等待
超时返回 0, 出错返回 -1, 正常返回 > 0
多路复用并发模型 -- poll
事件类型
POLLIN 有数据可读
POLLRDNORM 有普通数据可读
POLLRDBAND 有优先数据可读
POLLPRI 有紧迫数据可读
POLLOUT 可以写数据(不会阻塞)
POLLWRNORM 可以写普通数据
POLLWRBAND 可以写优先数据
POLLMSGSIGPOLL 消息可用
POLLER 发生错误 (revents 可用)
POLLHUP 文件描述符挂起 (revents 可用)
POLLNVAL 指定的文件描述符非法(revents可用)
多路复用并发模型 -- poll
用法
POLLIN = POLLRDNORM | POLLBAND
监控可读事件 events = POLLIN | POLLPRI
POLLOUT = POLLWRNORM
监控可写事件 events = POLLOUT | POLLWRBAND
同时监控多个事件 events = XX1 | XX2 | XX3 | ...
若干个事件发生,返回值 revents = XX_N | XX_M | ...
多路复用并发模型 -- poll
poll 模型和 select 本质上差不多,都是fd 轮询
优点:
比之 select,木有最大文件描述符限制
缺点:
和select一样,包含大量文件描述符时,系统开销会很大
#include<stdio.h> #include<unistd.h> #include<string.h> #include<sys/socket.h> #include<netinet/in.h> #include<arpa/inet.h> #include<sys/poll.h> #include<sys/time.h> #define SRV_PORT 0xabcd #define CONN_MAX 10000 void PollProcss(struct pollfd *pfds,int *plen) { int i; int fd; int iRet; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); char szMsg[1000] = { 0 }; char szBuff[1000] = { 0 }; //monitor POLLIN is 1 or 0 if (pfds[0].revents & POLLIN) { read(0,szMsg,1000); for (i = 2; i < *plen; ++i) { write(pfds[i].fd, szMsg, strlen(szMsg)); } } if (pfds[1].revents & POLLIN) { fd = accept(pfds[1].fd, (struct sockaddr*)&addr, &addrlen); if (fd < 0) { perror("Fail to accept!"); return; } if (*plen == CONN_MAX) { printf("\rConnect over limit\n"); write(fd, "Connect over limit", 20); close(fd); } else { write(fd, "Welcome", 8); printf("\r[%d]New connect from %s[%d]\n", fd, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); pfds[*plen].fd = fd; pfds[*plen].events = POLLIN; (*plen)++; } } //TranslateMessage DispatchMessage for (i = 3; i < *plen; ++i) { if (pfds[i].fd.revents & POLLIN) { memset(szBuff, 0, 1000); iRet = read((pfds[i].fd, szBuff, 1000); if (iRet < 0) { perror("Fail to read!"); break; } //Disconnect else if (iRet == 0) { int j; //Back cover front for (j = i; j < *plen; ++j) { pfds[j].fd = pfds[j + 1].fd; } (*plen++); // i--; /* That's the same thing as the top pfds.fd=pfds[*plen-1].fd; //replace pfds[i].fd to last fd (*plen)--; i--; */ } //Normal processing data else { printf("\r[%d]Recv:%s\n", pfds[i].fd, szBuff); } } } return; } void PollServer() { int fd; int iRet; struct sockaddr_in addr; socklen_t addrlen = sizeof(addr); fd = socket(PF_INET, SOCK_STREAM, 0); if (fd < 0) { perror("Fail to socket!"); return; } addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons(SRV_PORT); iRet = bind(fd, (struct sockaddr*)&addr, addrlen); if (iRet) { perror("Fail to bind!"); close(fd); return; } iRet = listen(fd, 100); if (iRet) { perror("Fail to listen!"); close(fd); return; } // /////////////init pollfd //struct pollfd pfds[CONN_MAX]; struct pollfd *pfds = (struct pollfd*)malloc(CONN_MAX *sizeof(struct pollfd)); int nfds = 0; //fd num of array:pfds //monitor stdin pfds[0].fd = 0; pfds[0].events = POLLIN; nfds++; //monitor tcp server fd pfds[1].fd = fd; pfds[1].events = POLLIN; nfds++; //////////////// printf("Start server ok..\n"); while (1) { fprintf(stderr, "Send:"); iRet = poll(pfds, nfds, -1); if (iRet == 0) { //timeout continue; } eles if (iRet < 0) { perror("Fail to poll!"); break; } else { PollProcss(pfds, &nfds); } } close(fd); return; } int main() { TcpServer(); return 0; }