I/O复用

select

select系统调用的用途是:在一段指定时间内,监听用户感兴趣的文件描述符上的可读、可写和异常等事件。

#include<sys/select.h>
/* Check the first NFDS descriptors each in READFDS (if not NULL) for read
   readiness, in WRITEFDS (if not NULL) for write readiness, and in EXCEPTFDS
   (if not NULL) for exceptional conditions.  If TIMEOUT is not NULL, time out
   after waiting the interval specified therein.  Returns the number of ready
   descriptors, or -1 for errors.

   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern int select (int __nfds, fd_set *__restrict __readfds,
		   fd_set *__restrict __writefds,
		   fd_set *__restrict __exceptfds,
		   struct timeval *__restrict __timeout);
  1. nfds 参数指定被监听的文件描述符的总数。它通常被设置为sclect监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的。
  2. readfds、writefds 和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。应用程序调用select函数时,通过这3个参数传人自己感兴趣的文件描述符。select调用返回时,内核将修改它们来通知应用程序哪些文件描述符已经就绪。这3个参数是fd_set结构指针类型。fd_set 结构体的定义如下:
#
#include <typesizes.h>
/* Number of descriptors that can fit in an `fd_set'.  */
#define __FD_SETSIZE		1024

/* The fd_set member is required to be an array of longs.  */
typedef long int __fd_mask;

/* Some versions of <linux/posix_types.h> define this macros.  */
#undef	__NFDBITS
/* It's easier to assume 8-bit bytes than to get CHAR_BIT.  */
#define __NFDBITS	(8 * (int) sizeof (__fd_mask))
#define	__FD_ELT(d)	((d) / __NFDBITS)
#define	__FD_MASK(d)	((__fd_mask) (1UL << ((d) % __NFDBITS)))

/* fd_set for select and pselect.  */
typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
  } fd_set;

由以上定义可见,fd_set 结构体仅包含一个整型数组,该数组的每个元素的每一位(bit)标记一个文件描述符。fd_set 能容纳的文件描述符数量由FD_SETSIZE指定,这就限制了select能同时处理的文件描述符的总量。由于位操作过于烦琐,我们应该使用下面的一系列宏来访问fd_set 结构体中的位:

/* Access macros for `fd_set'.  */
#define	FD_SET(fd, fdsetp)	__FD_SET (fd, fdsetp)	//设置fdset的位fd
#define	FD_CLR(fd, fdsetp)	__FD_CLR (fd, fdsetp)	//清除fdset的位fd
#define	FD_ISSET(fd, fdsetp)	__FD_ISSET (fd, fdsetp)		//测试fdset的位fd是否被设置
#define	FD_ZERO(fdsetp)		__FD_ZERO (fdsetp)	//清除fdset的所有位
  1. timeout 参数用来设置select函数的超时时间。它是一个timeval结构类型的指针,采用指针参数是因为内核将修改它以告诉应用程序select等待了多久。不过我们不能完全信任sclect调用返回后的timeout值,比如调用失败时timeout值是不确定的。timeval 结构体的定由以上定义如下:
/* A time value that is accurate to the nearest
   microsecond but also has a range of years.  */
struct timeval
{
  __time_t tv_sec;		/* 秒.  */
  __suseconds_t tv_usec;	/* 微秒.  */
};

由以上定义可见,select 给我们提供了一个微秒级的定时方式。如果给timeout变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。如果给timeout传递NULL,则select将一直阻塞, 直到某个文件描述符就绪。

select成功时返回就绪(可读、可写和异常)文件描述符的总数。如果在超时时间内没有任何文件描述符就绪,select 将返回0. select 失败时返回-1并设置ermo。如果在select等待期间,程序接收到信号,则select立即返回-1,并设置ermo为EINTR。

文件描述符就绪条件

哪些情况下文件描述符可以被认为是可读、可写或者出现异常,对于select的使用非常关键。在网络编程中,下列情况下socket可读:

  • socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。
  • socket通信的对方关闭连接。此时对该socket的读操作将返回0。口监听socket上有新的连接请求。
  • socket上有未处理的错误。此时我们可以使用getsockopt 来读取和清除该错误。下列情况下socket可写:
    • socket内核发送缓存区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT.此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
    • socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个 SIGPIPE信号。
    • socket使用非阻塞connect连接成功或者失败(超时)之后。
    • socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。

poll

poll系统调用和select类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。poll 的原型如下:

int poll(pollfd *__fds, nfds_t __nfds, int __timeout)
Poll the file descriptors described by the NFDS structures starting at
FDS. If TIMEOUT is nonzero and not -1, allow TIMEOUT milliseconds for
an event to occur; if TIMEOUT is -1, block until an event occurs.
Returns the number of file descriptors with events, zero if timed out,
or -1 for errors.

This function is a cancellation point and therefore not marked with
__THROW.
  • fds 参数是一个polfd结构类型的数组,它指定所有文件描述符上发生的可读、可写和异常等事件。pollfd 结构体的定义如下:
/* Data structure describing a polling request.  */
struct pollfd
  {
    int fd;			/* 文件描述符。  */
    short int events;		/* 注册的事件类型。  */
    short int revents;		/* 实际发生的事件类型,由内核填充。  */
  };

其中,fd 成员指定文件描述符; events 成员告诉poll监听fd上的哪些事件,它是一系列事件的按位或; revents 成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。poll支持的事件类型如表所示。
I/O复用

  • nfds参数指定被监听事件集合fds的大小。其类型nfds_t的定义如下:
/* Type used for the number of file descriptors.  */
typedef unsigned long int nfds_t;
  • timcout 参数指定poll的超时值,单位是毫秒。当timeout为-1时,poll 调用将永远阻塞,直到某个事件发生;当timcout为0时,poll调用将立即返回。poll系统调用的返回值的含义与select相同。

epoll

epoll是Linux特有的I/O复用函数。它在实现和使用上与select、poll 有很大差异。首先,epoll 使用一组函数来完成任务,而不是单个函数。其次,epoll 把用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传人文件描述符集或事件集。但epoll需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。这个文件描述符使用如下epoll_create 函数来创建:

#include<sys/epoll.h>

/* Creates an epoll instance.  Returns an fd for the new instance.
   The "size" parameter is a hint specifying the number of file
   descriptors to be associated with the new instance.  The fd
   returned by epoll_create() should be closed with close().  */
extern int epoll_create (int __size) __THROW;

size参数现在并不起作用,只是给内核一个提示, 告诉它事件表需要多大。该函数返回的文件描述符将用作其他所有epoll 系统调用的第一个参 数,以指定要访问的内核事件表。下面的函数用来操作epoll的内核事件表:

#include<sys/epoll.h>

/* Manipulate an epoll instance "epfd". Returns 0 in case of success,
   -1 in case of error ( the "errno" variable will contain the
   specific error code ) The "op" parameter is one of the EPOLL_CTL_*
   constants defined above. The "fd" parameter is the target of the
   operation. The "event" parameter describes which events the caller
   is interested in and any associated user data.  */
extern int epoll_ctl (int __epfd, int __op, int __fd,
		      struct epoll_event *__event) __THROW;
//epoll_ctl 成功时返回0,失败则返回-1并设置ermo.

fd参数是要操作的文件描述符,op 参数则指定操作类型。操作类型有如下3种:

/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
#define EPOLL_CTL_ADD 1	/* Add a file descriptor to the interface.  */
#define EPOLL_CTL_DEL 2	/* Remove a file descriptor from the interface.  */
#define EPOLL_CTL_MOD 3	/* Change file descriptor epoll_event structure.  */

event参数指定事件,它是epoll_event 结构指针类型。epoll_event 的定义如下:

struct epoll_event
{
  uint32_t events;	/* Epoll events */
  epoll_data_t data;	/* User data variable */
} __EPOLL_PACKED;

其中events成员描述事件类型。epoll 支持的事件类型和poll基本相同。表示epoll事件类型的宏是在poll对应的宏前加上“E",比如epoll的数据可读事件是EPOLLIN但epoll有两个额外的事件类型一EPOLLET 和EPOLLONESHOT它们对于epoll的高效运作非常关键,我们将在后面讨论它们。data 成员用于存储用户数据,其类型epoll_data_t的定义如下:

typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

epoll_data_t是一个联合体,其4个成员中使用最多的是fd,它指定事件所从属的目标文件描述符。ptr成员可用来指定与fd相关的用户数据。但由于epoll_data_t 是一个联合体,我们不能同时使用其ptr成员和fd成员,因此,如果要将文件描述符和用户数据关联起来, 以实现快速的数据访问,只能使用其他手段,比如放弃使用epoll_data_t的fd成员,而在ptr指向的用户数据中包含fd。

epoll_wait

epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:

/* Wait for events on an epoll instance "epfd". Returns the number of
   triggered events returned in "events" buffer. Or -1 in case of
   error with the "errno" variable set to the specific error code. The
   "events" parameter is a buffer that will contain triggered
   events. The "maxevents" is the maximum number of events to be
   returned ( usually size of "events" ). The "timeout" parameter
   specifies the maximum wait time in milliseconds (-1 == infinite).

   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern int epoll_wait (int __epfd, struct epoll_event *__events,
		       int __maxevents, int __timeout);
//成功时返回就绪的文件描述符的个数,失败时返回-1并设置ermo。

timeout 参数的含义与poll接口的timeout 参数相同。maxevents 参数指定最多监听多少个事件,它必须大于0。

epoll_ wait 函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd 参数指定)中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait 检测到的就绪事件,而不像seleet和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件描述符的效率。

LT和ET

epoll对文件描述符的操作有两种模式: LT (Level Trigger,电平触发)模式和eT ( Edge Trigger,边沿触发)模式。LT 模式是默认的工作模式,这种模式下epoll相当于一个效率高的poll当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll 将以ET模式来操作该文件描述符。ET 模式是epoll的高效工作模式。

对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait 时,epoll_wait 还会再次向应用程序通告此事件,直到该事件被处理。而对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll wait 调用将不再向应用程序通知这一事件。 可见,ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高。

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

void addfd(int epollfd, int fd, bool enable_et)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if (enable_et)
    {
        event.events |= EPOLLET;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

void lt(epoll_event *events, int number, int epollfd, int listenfd)
{
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; i++)
    {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd)
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
            addfd(epollfd, connfd, false);
        }
        else if (events[i].events & EPOLLIN)
        {
            printf("event trigger once\n");
            memset(buf, '\0', BUFFER_SIZE);
            int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
            if (ret <= 0)
            {
                close(sockfd);
                continue;
            }
            printf("get %d bytes of content: %s\n", ret, buf);
        }
        else
        {
            printf("something else happened \n");
        }
    }
}

void et(epoll_event *events, int number, int epollfd, int listenfd)
{
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; i++)
    {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd)
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
            addfd(epollfd, connfd, true);
        }
        else if (events[i].events & EPOLLIN)
        {
            printf("event trigger once\n");
            while (1)
            {
                memset(buf, '\0', BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                if (ret < 0)
                {
                    if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                    {
                        printf("read later\n");
                        break;
                    }
                    close(sockfd);
                    break;
                }
                else if (ret == 0)
                {
                    close(sockfd);
                }
                else
                {
                    printf("get %d bytes of content: %s\n", ret, buf);
                }
            }
        }
        else
        {
            printf("something else happened \n");
        }
    }
}

int main(int argc, char *argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char *ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd, true);

    while (1)
    {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if (ret < 0)
        {
            printf("epoll failure\n");
            break;
        }
        lt(events, ret, epollfd, listenfd);
        //et( events, ret, epollfd, listenfd );
    }
    close(listenfd);
    return 0;
}

EPOLLONESHOT

即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程(或进程,下同)在读取完某个socket.上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN 再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时 刻都只被一个线程处理。这-一点可以使用epoll的EPOLLONESHOT事件实现。

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次, 除非我们使用epoll_ctl 函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来思考,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket。

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024

struct fds
{
    int epollfd;
    int sockfd;
};

int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

void addfd(int epollfd, int fd, bool oneshot)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    if (oneshot)
    {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

void reset_oneshot(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

void *worker(void *arg)
{
    int sockfd = ((fds *)arg)->sockfd;
    int epollfd = ((fds *)arg)->epollfd;
    printf("start new thread to receive data on fd: %d\n", sockfd);
    char buf[BUFFER_SIZE];
    memset(buf, '\0', BUFFER_SIZE);
    while (1)
    {
        int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
        if (ret == 0)
        {
            close(sockfd);
            printf("foreiner closed the connection\n");
            break;
        }
        else if (ret < 0)
        {
            if (errno == EAGAIN)
            {
                reset_oneshot(epollfd, sockfd);
                printf("read later\n");
                break;
            }
        }
        else
        {
            printf("get content: %s\n", buf);
            sleep(5);
        }
    }
    printf("end thread receiving data on fd: %d\n", sockfd);
}

int main(int argc, char *argv[])
{
    if (argc <= 2)
    {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return 1;
    }
    const char *ip = argv[1];
    int port = atoi(argv[2]);

    int ret = 0;
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int listenfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
    assert(ret != -1);

    ret = listen(listenfd, 5);
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd != -1);
    addfd(epollfd, listenfd, false);

    while (1)
    {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if (ret < 0)
        {
            printf("epoll failure\n");
            break;
        }

        for (int i = 0; i < ret; i++)
        {
            int sockfd = events[i].data.fd;
            if (sockfd == listenfd)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof(client_address);
                int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
                addfd(epollfd, connfd, true);
            }
            else if (events[i].events & EPOLLIN)
            {
                pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create(&thread, NULL, worker, (void *)&fds_for_new_worker);
            }
            else
            {
                printf("something else happened \n");
            }
        }
    }
    close(listenfd);
    return 0;
}

从工作线程函数worker来看,如果一个工作线程处理完某个socket上的一次请求(我们用休眠5s来模拟这个过程)之后,又接收到该socket上新的客户请求,则该线程将继续为这个socket服务。并且因为该socket上注册了EPOLLONESHOT事件,其他线程没有机会接触这个socket,如果工作线程等待5s后仍然没收到该socket.上的下一批客户数据,则它将放弃为该socket服务。同时,它调用reset_oneshot 函数来重置该socket上的注册事件,这将使epoll有机会再次检测到该socket上的EPOLLIN事件,进而使得其他线程有机会为该socket服务。

由此看来,尽管一个socket在不同时间可能被不同的线程处理,但同一时刻肯定只有一个线程在为它服务。这就保证了连接的完整性,从而避免了很多可能的竞态条件。

三种I/O复用函数的比较

I/O复用

上一篇:mysql left join中on后加条件判断和where中加条件的区别


下一篇:监听器Listener