在前面我们介绍了循环服务器,并发服务器模型。简单的循环服务器每次只能处理一个请求,即处理的请求是串行的,效率过低;并发服务器可以通过创建多个进程或者是线程来并发的处理多个请求。但是当客户端增加时,就需要创建更多的进程或者线程,就会导致系统负载最终转移到进程或线程的切换开销上。
为了减少这类开销,而使系统处理能力集中在核心业务上,就要求我们降低并发的进程或线程数目,因此又实现了一个更高级的IO复用循环服务器。I/O复用的循环服务器一般创建两个线程,一个是客户端连接处理线程,专门用来处理客户端的连接,当有客户端到来的时候,此线程把客户端的套接字描述符放到一块公共的区域中。另一个是业务处理线程,此线程轮循(select)客户端套接字描述符集合中有没有数据到来,如果有数据到来,那么就进行处理。这样,客户 端的增加并不会造成系统进程或线程数的明显增加,而使其处理能力与CPU和内存直接相关。
TCP并发服务器模型 I/O多路复用模型伪代码
/* TCP并发服务器模型 I/O多路复用 */ /* 服务器主进程 */ socket(); bind(); listen(); pthread_create( ); //创建客户端连接线程和业务处理线程 pthread_join( ); //等待线程结束 close( ); //关闭服务器套接字 /* 连接处理线程 */ while(1) { accept( ); //接受一个客户端连接 store();//存储客户端套接字描述符到一个公共集合中 } /* 业务处理线程 */ while(1) { get( ); //取出可用的客户端套接字描述符 select( ); //设置监听读写文件描述符集合 recv( ); process( ); send( ); close( ); }
一个I/O多路复用模型的例子
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <time.h> #include <string.h> #include <stdio.h> #include <pthread.h> #include <sys/select.h> #define BUFFLEN 1024 #define SERVER_PORT 12349 #define BACKLOG 5 #define CLIENTNUM 1024 /**最大支持客户端数量*/ /*可连接客户端的文件描述符数组*/ int connect_host[CLIENTNUM]; int connect_number = 0; static void *handle_request(void *argv) { time_t now; /*时间*/ char buff[BUFFLEN];/*收发数据缓冲区*/ int n = 0; int maxfd = -1;/*最大侦听文件描述符*/ fd_set scanfd; /*侦听描述符集合*/ struct timeval timeout; /*超时*/ timeout.tv_sec = 1; /* 阻塞1秒后超时返回 */ timeout.tv_usec = 0; int i = 0; int err = -1; for(;;) { /*最大文件描述符值初始化为-1*/ maxfd = -1; FD_ZERO(&scanfd);/*清零文件描述符集合*/ for(i=0;i<CLIENTNUM;i++)/*将文件描述符放入集合*/ { if(connect_host[i] != -1)/*合法的文件描述符*/ { FD_SET(connect_host[i], &scanfd);/*放入集合*/ if(maxfd < connect_host[i])/*更新最大文件描述符值*/ { maxfd = connect_host[i]; } } } /*select等待*/ err = select(maxfd + 1, &scanfd, NULL, NULL, &timeout) ; switch(err) { case 0:/*超时*/ break; case -1:/*错误发生*/ break; default:/*有可读套接字文件描述符*/ if(connect_number<=0) break; for(i = 0;i<CLIENTNUM;i++) { /*查找激活的文件描述符*/ if(connect_host[i] != -1) if(FD_ISSET(connect_host[i],&scanfd)) { memset(buff, 0, BUFFLEN);/*清零*/ n = recv(connect_host[i], buff, BUFFLEN,0);/*接收发送方数据*/ if(n > 0 && !strncmp(buff, "TIME", 4))/*判断是否合法接收数据*/ { memset(buff, 0, BUFFLEN);/*清零*/ now = time(NULL);/*当前时间*/ sprintf(buff, "%24s\r\n",ctime(&now));/*将时间拷贝入缓冲区*/ send(connect_host[i], buff, strlen(buff),0);/*发送数据*/ } /*关闭客户端*/ close(connect_host[i]); /*更新文件描述符在数组中的值*/ connect_host[i] = -1; connect_number --; /*客户端计数器减1*/ } } break; } } return NULL; } static void *handle_connect(void *argv) { int s_s = *((int*)argv) ;/*获得服务器侦听套接字文件描述符*/ int s_c = -1;/*连接客户端文件描述符*/ struct sockaddr_in from; int len = sizeof(from); /*接收客户端连接*/ for(;;) { int i = 0; int s_c = accept(s_s, (struct sockaddr*)&from, &len);/*接收客户端的请求*/ printf("a client connect, from:%s\n",inet_ntoa(from.sin_addr)); /*查找合适位置,将客户端的文件描述符放入*/ for(i=0;i<CLIENTNUM;i++) { if(connect_host[i] == -1)/*找到*/ { /*放入*/ connect_host[i]= s_c; /*客户端计数器加1*/ connect_number ++; /*继续轮询等待客户端连接*/ break; } } } return NULL; } int main(int argc, char *argv[]) { int s_s; /*服务器套接字文件描述符*/ struct sockaddr_in local; /*本地地址*/ int i = 0; memset(connect_host, -1, CLIENTNUM); /*建立TCP套接字*/ s_s = socket(AF_INET, SOCK_STREAM, 0); /*初始化地址接哦股*/ memset(&local, 0, sizeof(local));/*清零*/ local.sin_family = AF_INET;/*AF_INET协议族*/ local.sin_addr.s_addr = htonl(INADDR_ANY);/*任意本地地址*/ local.sin_port = htons(SERVER_PORT);/*服务器端口*/ /*将套接字文件描述符绑定到本地地址和端口*/ int err = bind(s_s, (struct sockaddr*)&local, sizeof(local)); err = listen(s_s, BACKLOG);/*侦听*/ pthread_t thread_do[2];/*线程ID*/ /*创建线程处理客户端连接*/ pthread_create(&thread_do[0],/*线程ID*/ NULL,/*属性*/ handle_connect,/*线程回调函数*/ (void*)&s_s); /*线程参数*/ /*创建线程处理客户端请求*/ pthread_create(&thread_do[1],/*线程ID*/ NULL,/*属性*/ handle_request,/*线程回调函数*/ NULL); /*线程参数*/ /*等待线程结束*/ for(i=0;i<2;i++) pthread_join(thread_do[i], NULL); close(s_s); return 0; }