reactor简介

在IO多路复用中,epoll等IO多路复用工具是对IO进行管理,使用reactor模式,变为对事件的管理。

struct sockitem //socket的中间状态可以保存到这个结构体,如接收了一半的数据包
{
	int sockfd; 
	int (*callback)(int fd, int events, void *arg); //利用回调函数处理,避免区分读写和accpet
	char recvbuffer[1024]; 
	char sendbuffer[1024]; 
};

struct reactor //全局用到的变量
{
	int epfd;
	struct epoll_event events[512];
};
struct reactor *eventloop = NULL;

int recv_cb(int fd, int events, void *arg);

int send_cb(int fd, int events, void *arg) 
{

	struct sockitem *si = (struct sockitem*)arg;
	send(fd, "hello\n", 6, 0);
	struct epoll_event ev;
	ev.events = EPOLLIN | EPOLLET;
	//ev.data.fd = clientfd;
	si->sockfd = fd;
	si->callback = recv_cb;
	ev.data.ptr = si;
	epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
}

int recv_cb(int fd, int events, void *arg) 
{
	//int clientfd = events[i].data.fd;
	struct sockitem *si = (struct sockitem*)arg;
	struct epoll_event ev;
	char buffer[1024] = {0};
	int ret = recv(fd, buffer, 1024, 0);
	if (ret < 0) 
{
		if (errno == EAGAIN || errno == EWOULDBLOCK) 
        { 
			return -1;
		} else 
        {
			//出错操作
		}
		ev.events = EPOLLIN;
		//ev.data.fd = fd;
		epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
		close(fd);
		free(si);
	} else if (ret == 0) 
{
		printf("disconnect %d\n", fd);
		ev.events = EPOLLIN;
		//ev.data.fd = fd;
		epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, fd, &ev);
		close(fd);
		free(si);
	} else
    {
		printf("Recv: %s, %d Bytes\n", buffer, ret);
		struct epoll_event ev;
		ev.events = EPOLLOUT | EPOLLET;
		//ev.data.fd = clientfd;
		si->sockfd = fd;
		si->callback = send_cb;
		ev.data.ptr = si;
		epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, fd, &ev);
	}
}

int accept_cb(int fd, int events, void *arg) 
{
	struct sockaddr_in client_addr;
	memset(&client_addr, 0, sizeof(struct sockaddr_in));
	socklen_t client_len = sizeof(client_addr);
	int clientfd = accept(fd, (struct sockaddr*)&client_addr, &client_len);
	if (clientfd <= 0) return -1;
	char str[INET_ADDRSTRLEN] = {0};
	printf("recv from %s at port %d\n", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port));

	struct epoll_event ev;
	ev.events = EPOLLIN | EPOLLET;
	//ev.data.fd = clientfd;
	struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
	si->sockfd = clientfd;
	si->callback = recv_cb;
	ev.data.ptr = si;
	epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, clientfd, &ev);
	return clientfd;
}

int main(int argc, char *argv[]) 
{
	if (argc < 2) 
{
		return -1;
	}
	int port = atoi(argv[1]);
	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
	if (sockfd < 0) 
{
		return -1;
	}
	struct sockaddr_in addr;
	memset(&addr, 0, sizeof(struct sockaddr_in));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(port);
	addr.sin_addr.s_addr = INADDR_ANY;
	if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) 
{
		return -2;
	}
	if (listen(sockfd, 5) < 0) 
{
		return -3;
	}
	eventloop = (struct reactor*)malloc(sizeof(struct reactor));
	eventloop->epfd = epoll_create(1);
	struct epoll_event ev;
	ev.events = EPOLLIN;
	//ev.data.fd = sockfd; //int idx = 2000;
	

	struct sockitem *si = (struct sockitem*)malloc(sizeof(struct sockitem));
	si->sockfd = sockfd;
	si->callback = accept_cb;
	ev.data.ptr = si;
	epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, sockfd, &ev);
	pthread_t id;
	pthread_create(&id, NULL, worker_thread, NULL);
	//pthread_cond_waittime();
	while (1) 
{
		int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
		if (nready < -1) 
        {
			break;
		}
		int i = 0;
		for (i = 0;i < nready;i ++) 
        {
			if (eventloop->events[i].events & EPOLLIN) //读事件
            {
				//printf("sockitem\n");
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}
			if (eventloop->events[i].events & EPOLLOUT) //写事件
            {
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}
		}
	}
}

这里有一个问题,listenfd是和其他fd一样,加入到epoll中监听读事件的循环。这样每次循环就只能accpet一次,只能产生一个新连接,其余都是与listenfd无关的操作。对于截接入量较大的场景,响应速度较慢。可以用一个线程专门处理listenfd,一个线程处理其他clientfd。

void *worker_thread(void *arg) //处理clientfd
{
	while (1) 
{
		int nready = epoll_wait(eventloop->epfd, eventloop->events, 512, -1);
		if (nready < -1) 
        {
			break;
		}
		int i = 0;
		for (i = 0;i < nready;i ++) 
        {
			if (eventloop->events[i].events & EPOLLIN) 
            {
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}

			if (eventloop->events[i].events & EPOLLOUT) 
            {
				struct sockitem *si = (struct sockitem*)eventloop->events[i].data.ptr;
				si->callback(si->sockfd, eventloop->events[i].events, si);
			}
		}
	}
}

void *thread_cb(void *arg)//处理listenfd
{
    //lfd,re,si已定义
    struct pollfd pfd={0};//只监听一个fd,没必要用epoll
    pfd.fd=lfd;//lfd是监听fd
    pfd.events=POLLIN;
    while(1)
    {
        int ret=poll(&pfd,lfd+1,-1);
        if(ret==-1)
        {
            printf("poll error,errno=%d\n",errno);
            return NULL;
        }
        if(pfd.revents&POLLIN)
        {
            struct sockaddr_in cli_addr={0};
            struct epoll_event ev={0};
            socklen_t cli_len=sizeof(struct sockaddr_in);
            int cfd=accept(lfd,(struct sockaddr*)&cli_addr,&cli_len);
            if(cfd<0)
            {
                printf("accpet error,errno=%d\n",errno);
                return NULL;
            }
            char ip[64]={0};
            printf("recv from %s at port %d\n",inet_ntop(AF_INET,&client_addr.sin_addr,ip,sizeof(ip)),ntohs(client_addr.sin_port));
            si=(struct sockitem*)malloc(sizeof(struct sockitem));
            si->sockfd=cfd;
            si->cb=recv_cb;
            ev.events=EPOLLIN;
            ev.data.ptr=si;
            epoll_ctl(re->epfd,EPOLL_CTL_ADD,cfd,&ev);
        }
    }
}

如果接入速度还不符合要求,我们还可以设置多个listenfd开多个线程,每个线程监听一个listenfd,同时监听clientfd的线程也开多个,这样就是“多对多”。这样会产生一个问题,如果客户端前后session的数据有关联,会出现A线程调用B线程中的数据,导致不同线程之间数据共享(竞争),这样的话就需要加锁,但是效率会下降。解决方法是,做到把数据和业务逻辑相分离,线程运行时信息不存到服务器本身的进程中,而是存到单独的缓存中,这样就可以做到隔离客户端前后之间的依赖关系。这样一来,就相当于服务器不存储数据,单处理业务。客户端的请求由不同的服务线程响应也没关系,直接根据客户端id去缓存中取数据即可。
对于多进程的结构,方法也是一样,不过还有几个问题。一是,多进程监听同一端口,需要listen()之后直接fork(),listenfd就会被继承到子进程中,与父进程一致。这里再提一下,epoll_create()在fork()之前或之后都可以,虽然效果不同,但是没有太大影响。二是惊群的问题,一个连接来了由哪个进程响应,通过共享内存的一把锁实现,保证任一时刻epoll中只有一个fd加入。

最后,再提一个问题,为什么UDP很少用epoll,而TCP却用的多。UDP对于服务器而言,只有一个fd,没必要使用多路复用。TCP是一个连接就有一个fd,所以需要epoll去管理。

上一篇:Java中对字符串的一些操作


下一篇:泛型 --- 通配符和有条件通配符的使用