1. 网络编程中的四种IO模型
- 阻塞IO模型,默认socket都是阻塞的,就是IO操作都要等待操作完成以后才能返回;
- 非阻塞IO模型,就是IO操作时不等待,立即返回,但需要不断的去询问内核,数据是否准备好了,如果准备好了,就主动调用函数去处理数据,使用fcntl设置socket为非阻塞;
- 多路复用模型,就是事件驱动IO,也就是说检测到描述符上发生了事件,才去处理,典型的就是select和epoll;
- 异步IO模型,就是发起IO操作后,立即返回去做其他的事,然后内核会等待数据准备完成后,将数据拷贝到用户内存中,并给用户进程发送一个信号,告知IO操作已完成;
2. epoll函数
2.1 epoll的两种工作模式
2.1.1 LT模式(又叫水平模式,类似于select/poll):
完全靠内核驱动,只要某个文件描述符有变化,就会一直通知我们的应用程序,直到处理完毕为止。
2.1.2 ET模式(又叫边缘触发模式,必须将socket设置为非阻塞):
此种情况下,当文件描述符有变化时,epoll只会通知应用程序一次,并将描述符从监视队列中清除,直到应用程序处理完该次变化,如果应用程序没有处理,那epoll就不会再次关注该文件描述符,这样其实可能会造成丢包的。
这时应用程序需要自己维护一张fds的表格,把从epoll_wait得到的状态信息登记到这张表格,然后应用程序可以选择遍历这张表格,对处于忙碌状态的fds进行操作。
2.1.3 水平模式和边沿模式的选择
ET比LT对应用程序的要求更多,需要程序员设计的部分也更多,看上去LT好像要简单很多,但是当我们要求对fd有超时控制时,LT也同样需要对fds进行遍历,此时不如使用本来就要遍历的ET。
而且由于epoll_wait每次返回的fds数量是有限的,在大并发的模式下,LT将非常的繁忙,所有的fds都要在它的队列中产生状态信息,而每次只有一部分fds能返回给应用程序。
而ET只要epoll_wait返回一次fds之后,这些fds就会从队列中删除,只有当fd重新变为空闲状态时才重新加入到队列中,这就是说,随着epoll_wait的返回,队列中的fds是在减少的,这样在大并发的情况下,ET模式将会更加具有优势。
2.2 epoll函数原型
2.2.1 epoll_create
int epoll_create(int size); //创建一个epoll的句柄,size用来告诉内核要监听的数目
返回值:
>0 返回创建成功的epoll句柄
-1 失败
2.2.2 epoll_ctl
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数, 注册要监听的事件类型:
参数说明:
- epfd epoll_create返回的句柄
- op 表示动作,用3个宏表示:EPOLL_CTL_ADD 注册新的fd到epfd中,EPOLL_CTL_MOD,修改已经注册的fd的监听事件,EPOLL_CTL_DEL,从epfd中删除一个fd。
- fd 表示要监听的fd(一般就是socket函数生成的文件描述符)
- event 告诉内核要监听什么事
struct epoll_event结构如下:
struct epoll_event { __uint32_t events; //多个宏的集合,表示对应文件描述符可读、可写、紧急可读等等 epoll_data_t data; //一个联合体,详细介绍见下面 }; typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; }epoll_data_t;
2.2.3 epoll_wait
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
参数说明如下:
- events 根据events中事件,决定是调用accept回应一个连接,还是调用read或者write读写文件
- maxevents 告诉内核这个events多大,并且不能大于epoll_create中的size
功能说明:
等侍注册在epfd(epoll生成的文件描述符)上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。
并 且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。当epoll_wait返回时根据返回值(大于0)调用accept。
2.3 epoll的实现
2.3.1 epoll函数调用过程
socket/bind/listen/epoll_create/epoll_ctl/epoll_wait/accept/read/write/close
2.3.2 代码实现
首先对CTCP类做一下补充,将socket设置为非阻塞:
int CTcp::SetNoblock (int nSock) { assert (m_nSock != -1); int nFlags; if ( nSock == -1 ) { nSock = m_nSock; } if ((nFlags = fcntl (nSock, F_GETFL, 0)) < 0) return 0; nFlags = nFlags | O_NONBLOCK; if (fcntl (nSock, F_SETFL, nFlags) < 0) return 0; return 1; }
然后基于CTCP类,实现CEpollServer类,代码如下:
//EpollServer.h #ifndef __EPOLL_SERVER_H__ #define __EPOLL_SERVER_H__ #include "SxTcp.h" //Tcp类 class CEpollServer { //构造函数 public: CEpollServer (); virtual ~CEpollServer (); //公有成员函数 public: int CreateEpoll(const char* szIp, int nPort, int nSize); int ProcessEpoll(); int CloseEpoll(); //私有成员变量 private: CTcp m_cTcp; int m_nEpollFd; }; #endif
#include "EpollServer.h" #include <sys/epoll.h> #include "TypeError.h" #include <assert.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <stdio.h> CEpollServer::CEpollServer () { m_nEpollFd = -1; } CEpollServer::~CEpollServer () { CloseEpoll(); m_cTcp.Close(); } /*创建epoll句柄 入参: szIp 服务器ip地址 nPort 要绑定的端口 nSize 要监听的文件描述符数量 出参:1: 成功 ; 0: 失败 */ int CEpollServer::CreateEpoll(const char* szIp, int nPort, int nSize) { assert(szIp != nullptr); int iRet = 0; int size = (nSize > 0 ? nSize : DEFAULT_EPOLL_FD_NUM); iRet = m_cTcp.Open(); if ( iRet == 0 ) { return SOCKET_ERROR; } iRet = m_cTcp.Bind(szIp, nPort); if ( iRet == 0 ) { return BIND_ERROR; } iRet = m_cTcp.SetNoblock(); if ( iRet == 0 ) { return SETSOCKOPT_ERROR; } iRet = m_cTcp.Listen(nSize+1);//监听描述符数量要比epoll的多? if ( iRet == 0) { return LISTEN_ERROR; } if ( m_nEpollFd != -1 ) { CloseEpoll(); } m_nEpollFd = epoll_create(size); if ( m_nEpollFd == -1) { return EPOLL_CREATE_ERROR; } return 1; } /*处理epoll事件 出参:1: 成功 ; 0: 失败 */ int CEpollServer::ProcessEpoll() { assert(m_nEpollFd != -1); int nFds = 0; int connFd = -1, readFd = -1, writeFd = -1; int n = 0, nSize = 0; int nListenFd = -1; char buf[MAX_READ_SIZE] = {0}; struct sockaddr_in clientAddr; socklen_t clilen; struct epoll_event ev, events[20]; memset((void*)&ev, 0, sizeof(ev)); nListenFd = m_cTcp.GetHandle(); ev.data.fd = nListenFd; ev.events = EPOLLIN|EPOLLET; if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, nListenFd, &ev) == -1 ) { return EPOLL_CTL_ERROR; } while(1) { n = 0; nSize = 0; nFds = epoll_wait(m_nEpollFd, events, 20, 500); for (int i = 0; i< nFds; ++i) { memset(buf, 0, MAX_READ_SIZE); if (events[i].data.fd == nListenFd ) { while ( (connFd = accept(nListenFd, (sockaddr*)&clientAddr, &clilen)) > 0 ) { m_cTcp.SetNoblock(connFd); //ET模式需设置为非阻塞的 ev.data.fd = connFd; ev.events = EPOLLIN|EPOLLET; if ( epoll_ctl(m_nEpollFd, EPOLL_CTL_ADD, connFd, &ev) == -1 ) { return EPOLL_CTL_ERROR; } } if ( connFd == -1 && errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR ) { return ACCEPT_ERROR; } continue; } else if(events[i].events & EPOLLIN) { readFd = events[i].data.fd; if (readFd < 0) { continue; } //读取数据 while ( (nSize = read(readFd, buf+n, MAX_READ_SIZE - 1)) > 0 ) { n += nSize; } //EAGAIN说明读到结尾了 if (nSize == -1 && errno != EAGAIN ) { fprintf(stderr, "epoll read failed\n"); //ngleLog::WriteLog(ERROR, "%s", "epoll read fialed"); } fprintf(stdout, "read data is:%s\n", buf); ev.data.fd = readFd; ev.events = EPOLLOUT|EPOLLET;//边沿模式(ET) epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, readFd, &ev); } else if(events[i].events & EPOLLOUT) { writeFd = events[i].data.fd; //写数据 strncpy(buf, "hello client", sizeof(buf)-1); int dataSize = strlen(buf); n = dataSize; while(n > 0) { nSize = write(writeFd, buf + dataSize - n, n); if (nSize < n) { if (nSize == -1 && errno != EAGAIN) { break; } } n -= nSize; } ev.data.fd = writeFd; ev.events = EPOLLIN|EPOLLET; epoll_ctl(m_nEpollFd, EPOLL_CTL_MOD, writeFd, &ev); } } } } /* 关闭epoll文件描述符 */ int CEpollServer::CloseEpoll() { if (m_nEpollFd != -1) { close (m_nEpollFd); m_nEpollFd = -1; } return 1; }
将上面CEpollServer类和TCP类结合起来编译成一个动态库,makefile如下:
LIB_DIR=./lib src=$(wildcard *.cpp) obj=$(patsubst %.cpp,%.o,$(src)) PIC=-fPIC LIBSO=-shared #CC=g++ -gdwarf-2 -gstrict-dwarf CC=g++ -g %.o:%.cpp $(CC) -c $< $(PIC) network:$(obj) $(CC) -o libnetwork.so $^ $(LIBSO) cp -f libnetwork.so ../test/lib clean: rm -f *.o *.so
然后实现TestEpollServer.cpp,如下:
注意:下面ConfigIni和SingleLog都是我本人测试时候写的库,如需使用下面代码,需要修改!
#include "../../readini/ConfigIni.h" #include <string> #include "../../network/EpollServer.h" #include "../../log/SingleLog.h" CEpollServer g_clEpollServer; #define FILEDIR "./socket.ini" //epoll server int epoll_server_init() { int iRet = -1; string strIp; int nPort = 0, nEpollNum = 0, nTimeout = 0; ConfigIni::Init(string(FILEDIR)); strIp = ConfigIni::ReadStr(string("SERVER"), string("Addr")); if (strIp == "") { SingleLog::WriteLog(ERROR,"read server addr failed"); return iRet; } nPort = ConfigIni::ReadInt(string("SERVER"), string("Port")); if ( nPort == -1 ) { SingleLog::WriteLog(ERROR,"read server port failed"); return iRet; } nEpollNum = ConfigIni::ReadInt(string("SERVER"), string("MaxEpollNum")); if ( nEpollNum == -1 ) { SingleLog::WriteLog(ERROR,"read server epoll num failed"); return iRet; } nTimeout = ConfigIni::ReadInt(string("SERVER"), string("Timeout")); if ( nTimeout == -1 ) { SingleLog::WriteLog(ERROR,"read server timeout failed"); return iRet; } iRet = g_clEpollServer.CreateEpoll(strIp.c_str(), nPort, nEpollNum); if ( iRet == 0 ) { SingleLog::WriteLog(ERROR, "epoll create failed"); return -1; } return 0; } void epoll_server_run() { g_clEpollServer.ProcessEpoll(); } int main() { SingleLog::Init(); if (epoll_server_init() == -1) { return -1; } epoll_server_run(); return 0; }
//TestClient.cpp #include <stdio.h> #include <iostream> #include <string.h> #include "../../network/SxTcp.h" using namespace std; int main() { CTcp tcp; int iRet = 0; int iFd = 0; char buf[128] = {0}; iRet = tcp.Open(); if (iRet == 0) { perror("socket create failed"); return -1; } iRet = tcp.Connect("192.168.233.250", 6666); if (iRet == 0) { perror("socket connect failed"); return -1; } while(1) { memset(buf, 0, sizeof(buf)); cout << "please input some string:"; cin >> buf; iRet = tcp.Send(buf, strlen(buf)); if (iRet < -1 && errno != EAGAIN) { perror("send failed"); return -1; } else if(iRet == 0) { perror("connect is closed"); return -1; } memset(buf, 0, sizeof(buf)); iRet = tcp.Recv(buf, sizeof(buf)); if (iRet < 0 && errno != EAGAIN) { perror("recv failed"); return -1; } else if(iRet == 0) { perror("socket not connect"); return -1; } fprintf(stdout, "recv data is:%s\n", buf); } return 0; }
分别编译TestEpollServer.cpp和TestClient.cpp,生成服务端和客户端应用程序,即可实现通信。