多路复用并发模型 -- poll
#include<poll.h>
int poll(struct pollfd *fds, unsigned int nfds, int timeout);
struct pollfd {
int fd; //轮询的文件描述符
short events; //等待的事件
short revents; //实际发生的事件
}
nfds : fds 数组大小
timeout : 超时时间(毫秒), 0立即返回,负数一直等待
超时返回 0, 出错返回 -1, 正常返回 > 0
多路复用并发模型 -- poll
事件类型
POLLIN 有数据可读
POLLRDNORM 有普通数据可读
POLLRDBAND 有优先数据可读
POLLPRI 有紧迫数据可读
POLLOUT 可以写数据(不会阻塞)
POLLWRNORM 可以写普通数据
POLLWRBAND 可以写优先数据
POLLMSGSIGPOLL 消息可用
POLLER 发生错误 (revents 可用)
POLLHUP 文件描述符挂起 (revents 可用)
POLLNVAL 指定的文件描述符非法(revents可用)
多路复用并发模型 -- poll
用法
POLLIN = POLLRDNORM | POLLBAND
监控可读事件 events = POLLIN | POLLPRI
POLLOUT = POLLWRNORM
监控可写事件 events = POLLOUT | POLLWRBAND
同时监控多个事件 events = XX1 | XX2 | XX3 | ...
若干个事件发生,返回值 revents = XX_N | XX_M | ...
多路复用并发模型 -- poll
poll 模型和 select 本质上差不多,都是fd 轮询
优点:
比之 select,木有最大文件描述符限制
缺点:
和select一样,包含大量文件描述符时,系统开销会很大
#include<stdio.h>
#include<unistd.h>
#include<string.h> #include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h> #include<sys/poll.h>
#include<sys/time.h> #define SRV_PORT 0xabcd
#define CONN_MAX 10000 void PollProcss(struct pollfd *pfds,int *plen)
{
int i;
int fd;
int iRet;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
char szMsg[1000] = { 0 };
char szBuff[1000] = { 0 }; //monitor POLLIN is 1 or 0
if (pfds[0].revents & POLLIN)
{
read(0,szMsg,1000);
for (i = 2; i < *plen; ++i)
{
write(pfds[i].fd, szMsg, strlen(szMsg));
}
}
if (pfds[1].revents & POLLIN)
{
fd = accept(pfds[1].fd, (struct sockaddr*)&addr, &addrlen);
if (fd < 0)
{
perror("Fail to accept!");
return;
}
if (*plen == CONN_MAX)
{
printf("\rConnect over limit\n");
write(fd, "Connect over limit", 20);
close(fd);
}
else
{
write(fd, "Welcome", 8);
printf("\r[%d]New connect from %s[%d]\n", fd, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); pfds[*plen].fd = fd;
pfds[*plen].events = POLLIN; (*plen)++;
}
}
//TranslateMessage DispatchMessage
for (i = 3; i < *plen; ++i)
{
if (pfds[i].fd.revents & POLLIN)
{
memset(szBuff, 0, 1000); iRet = read((pfds[i].fd, szBuff, 1000);
if (iRet < 0)
{
perror("Fail to read!");
break;
}
//Disconnect
else if (iRet == 0)
{
int j;
//Back cover front
for (j = i; j < *plen; ++j)
{
pfds[j].fd = pfds[j + 1].fd;
}
(*plen++);
//
i--; /* That's the same thing as the top
pfds.fd=pfds[*plen-1].fd; //replace pfds[i].fd to last fd
(*plen)--;
i--;
*/ }
//Normal processing data
else
{
printf("\r[%d]Recv:%s\n", pfds[i].fd, szBuff);
} }
} return;
} void PollServer()
{
int fd;
int iRet;
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr); fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0)
{
perror("Fail to socket!");
return;
} addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(SRV_PORT); iRet = bind(fd, (struct sockaddr*)&addr, addrlen);
if (iRet)
{
perror("Fail to bind!");
close(fd);
return;
} iRet = listen(fd, 100);
if (iRet)
{
perror("Fail to listen!");
close(fd);
return;
} // /////////////init pollfd
//struct pollfd pfds[CONN_MAX];
struct pollfd *pfds = (struct pollfd*)malloc(CONN_MAX *sizeof(struct pollfd));
int nfds = 0; //fd num of array:pfds //monitor stdin
pfds[0].fd = 0;
pfds[0].events = POLLIN;
nfds++; //monitor tcp server fd
pfds[1].fd = fd;
pfds[1].events = POLLIN;
nfds++;
//////////////// printf("Start server ok..\n"); while (1)
{
fprintf(stderr, "Send:");
iRet = poll(pfds, nfds, -1);
if (iRet == 0)
{
//timeout
continue;
}
eles if (iRet < 0)
{
perror("Fail to poll!");
break;
}
else
{
PollProcss(pfds, &nfds);
}
}
close(fd);
return;
} int main()
{
TcpServer(); return 0;
}