几种服务器端IO模型的简单介绍及实现

几种服务器端IO模型的简单介绍及实现
几种服务器端IO模型的简单介绍及实现
#include <Winsock2.h>
#include <cstdio>
#include <cstdlib>
#include <cassert>
#include <iostream>
#include <string>
using namespace std;

#pragma  comment(lib,"ws2_32.lib")

int init_win_socket()
{
    WSADATA wsaData;

    if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) 
    {
        return -1;
    }

    return 0;
}

#define Server_Port 10286
#define MAX_LINE 16384

#define  FD_SETSIZE 1024

struct fd_state 
{
    char buffer[MAX_LINE];
    size_t buffer_used;

    int writing;
    size_t n_written;
    size_t write_upto;
};

struct fd_state * alloc_fd_state(void)
{
    struct fd_state *state = (struct fd_state *)malloc(sizeof(struct fd_state));
    if (!state)
        return NULL;
    state->buffer_used = state->n_written = state->writing =
        state->write_upto = 0;
    memset(state->buffer,0,MAX_LINE);
    return state;
}

void free_fd_state(struct fd_state *state)
{
    free(state);
}

int set_socket_nonblocking(int fd)
{
    unsigned long mode = 1;
    int result = ioctlsocket(fd, FIONBIO, &mode);
    if (result != 0)
    {
        return -1;
        printf("ioctlsocket failed with error: %ld\n", result);
    }
    return 0;
}

int do_read(int fd, struct fd_state *state)
{
    char buf[1024];
    int i;
    int result;
    while (1) 
    {
        memset(buf,0,1024);
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i=0; i < result; ++i)  
        {
            if (state->buffer_used < sizeof(state->buffer))
                state->buffer[state->buffer_used++] = buf[i];
        }
    }
    state->writing = 1;
    state->write_upto = state->buffer_used;
    printf("Receive data: %s  size: %d\n",state->buffer+state->n_written,state->write_upto-state->n_written);

    if (result == 0) 
    {
        return 1;
    } 
    else if (result < 0) 
    {
#ifdef WIN32
        if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
            return 0;
#else
        if (errno == EAGAIN)
            return 0;
#endif
        return -1;
    }

    return 0;
}

int do_write(int fd, struct fd_state *state)
{
    while (state->n_written < state->write_upto) 
    {
        int result = send(fd, state->buffer + state->n_written,
            state->write_upto - state->n_written, 0);
        if (result < 0) 
        {
#ifdef WIN32
            if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
                return 0;
#else
            if (errno == EAGAIN)
                return 0;
#endif
            return -1;
        }
        assert(result != 0);
        printf("Send data: %s \n",state->buffer+ state->n_written);
        state->n_written += result;
    }

    if (state->n_written == state->buffer_used)
        state->n_written = state->write_upto = state->buffer_used = 0;

    state->writing = 0;

    return 0;
}

void run()
{
    int listener;
    struct fd_state *state[FD_SETSIZE];
    struct sockaddr_in sin;
    int i, maxfd;
    fd_set readset, writeset, exset;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(Server_Port);

    for (i = 0; i < FD_SETSIZE; ++i)
        state[i] = NULL;

    listener = socket(AF_INET, SOCK_STREAM, 0);
    set_socket_nonblocking(listener);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR,(const char *)&one, sizeof(one));

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) 
    {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) 
    {
        perror("listen");
        return;
    }

    printf("Server is listening ... \n");

    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_ZERO(&exset);

    while (1) 
    {
        maxfd = listener;

        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);

        FD_SET(listener, &readset);

        for (i=0; i < FD_SETSIZE; ++i) 
        {
            if (state[i]) 
            {
                if (i > maxfd)
                    maxfd = i;
                FD_SET(i, &readset);
                if (state[i]->writing) 
                {
                    FD_SET(i, &writeset);
                }
            }
        }

        if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) 
        {
            perror("select");
            return;
        }

        //check if listener can accept
        if (FD_ISSET(listener, &readset)) 
        {
            struct sockaddr_in ss;
            int slen = sizeof(ss);
            int fd = accept(listener, (struct sockaddr*)&ss, &slen);
            if (fd < 0) 
            {
                perror("accept");
            } 
            else if(fd > FD_SETSIZE)
            {
                closesocket(fd);
            }
            else 
            {
                printf("Accept socket %d, address %s \n",fd,inet_ntoa(ss.sin_addr));
                set_socket_nonblocking(fd);
                state[fd] = alloc_fd_state();
                assert(state[fd]);
            }
        }

        //process read and write socket
        for (i=0; i < maxfd+1; ++i) 
        {
            int r = 0;
            if (i == listener)
                continue;

            if (FD_ISSET(i, &readset)) 
            {
                r = do_read(i, state[i]);
            }
            if (r == 0 && FD_ISSET(i, &writeset)) 
            {
                r = do_write(i, state[i]);
            }
            if (r) 
            {
                free_fd_state(state[i]);
                state[i] = NULL;
                closesocket(i);
            }
        }
    }
}

int main(int c, char **v)
{
#ifdef WIN32
    init_win_socket();
#endif

    run();
    return 0;
}
几种服务器端IO模型的简单介绍及实现

示意图如下:

几种服务器端IO模型的简单介绍及实现

这里Select监听的socket都是Non-blocking的,所以在do_read() do_write()中对返回为EAGAIN/WSAEWOULDBLOCK都做了处理。

从代码中可以看出使用Select返回后,仍然需要轮训再检测每个socket的状态(读、写),这样的轮训检测在大量连接下也是效率不高的。因为当需要探测的句柄值较大时,select () 接口本身需要消耗大量时间去轮询各个句柄。

很多操作系统提供了更为高效的接口,如 linux 提供 了 epoll,BSD 提供了 kqueue,Solaris 提供了 /dev/poll …。如果需要实现更高效的服务器程序,类似 epoll 这样的接口更被推荐。遗憾的是不同的操作系统特供的 epoll 接口有很大差异,所以使用类似于 epoll 的接口实现具有较好跨平台能力的服务器会比较困难。

5、使用事件驱动库libevent的服务器模型

Libevent 是一种高性能事件循环/事件驱动库。

为了实际处理每个请求,libevent 库提供一种事件机制,它作为底层网络后端的包装器。事件系统让为连接添加处理函数变得非常简便,同时降低了底层IO复杂性。这是 libevent 系统的核心。

创建 libevent 服务器的基本方法是,注册当发生某一操作(比如接受来自客户端的连接)时应该执行的函数,然后调用主事件循环 event_dispatch()。执行过程的控制现在由 libevent 系统处理。注册事件和将调用的函数之后,事件系统开始自治;在应用程序运行时,可以在事件队列中添加(注册)或 删除(取消注册)事件。事件注册非常方便,可以通过它添加新事件以处理新打开的连接,从而构建灵活的网络处理系统。

使用Libevent实现的一个回显服务器如下:

几种服务器端IO模型的简单介绍及实现
几种服务器端IO模型的简单介绍及实现
#include <event2/event.h>

#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

struct fd_state 
{
    char buffer[MAX_LINE];
    size_t buffer_used;

    size_t n_written;
    size_t write_upto;

    struct event *read_event;
    struct event *write_event;
};

struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
    struct fd_state *state = (struct fd_state *)malloc(sizeof(struct fd_state));
    if (!state)
    {
        return NULL;
    }

    state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
    if (!state->read_event) 
    {
        free(state);
        return NULL;
    }

    state->write_event = event_new(base, fd, EV_WRITE, do_write, state);
    if (!state->write_event) 
    {
        event_free(state->read_event);
        free(state);
        return NULL;
    }

    memset(state->buffer,0,MAX_LINE);
    state->buffer_used = state->n_written = state->write_upto = 0;

    return state;
}

void free_fd_state(struct fd_state *state)
{
    event_free(state->read_event);
    event_free(state->write_event);
    free(state);
}

void do_read(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = (struct fd_state *) arg;
    char buf[1024];
    int i;
    int result;
    assert(state->write_event);
    while(1)
    {
        memset(buf,0,1024);
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
        {
            break;
        }
        else
        {
            for (i=0; i < result; ++i)  
            {
                if (state->buffer_used < sizeof(state->buffer))
                    state->buffer[state->buffer_used++] = buf[i];
            }
        }
    }
    printf("receive data: %s  size: %d\n",state->buffer+state->n_written,state->write_upto-state->n_written);
    assert(state->write_event);
    event_add(state->write_event, NULL);
    state->write_upto = state->buffer_used;
    
    if (result == 0) 
    {
        printf("connect closed \n");
        free_fd_state(state);
    } 
    else if (result < 0) 
    {
#ifdef WIN32
        if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
            return;
#else
        if (errno == EAGAIN)
            return;
#endif
        perror("recv");
        free_fd_state(state);
    }
}

void do_write(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = (struct fd_state *)arg;

    while (state->n_written < state->write_upto) 
    {
        int result = send(fd, state->buffer + state->n_written,
            state->write_upto - state->n_written, 0);
        if (result < 0) 
        {
#ifdef WIN32
            if (result == -1 && WSAGetLastError()==WSAEWOULDBLOCK)
                return;
#else
            if (errno == EAGAIN)
                return;
#endif
            free_fd_state(state);
            return;
        }
        assert(result != 0);
        printf("send data: %s \n",state->buffer+ state->n_written);

        state->n_written += result;
    }

    //buffer is full
    if (state->n_written == state->buffer_used)
    {
        state->n_written = state->write_upto = state->buffer_used = 0;
        memset(state->buffer,0,MAX_LINE);
    }
}

void do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = (struct event_base *)arg;
    struct sockaddr_in ss;
    int slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd > 0) 
    {
        printf("accept socket %d, address %s \n",fd,inet_ntoa(ss.sin_addr));
        struct fd_state *state;
        evutil_make_socket_nonblocking(fd);
        state = alloc_fd_state(base, fd);
        assert(state);
        assert(state->read_event);
        event_add(state->read_event, NULL);
    }
}

void run()
{
    int listener;
    struct sockaddr_in addr_server;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
    {
        perror("event_base_new error");
        return; 
    }

    addr_server.sin_addr.S_un.S_addr = ADDR_ANY;
    addr_server.sin_family = AF_INET;
    addr_server.sin_addr.s_addr = 0;
    addr_server.sin_port = htons(10286);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (const char *)&one, sizeof(one));

    if (bind(listener, (struct sockaddr*)&addr_server, sizeof(addr_server)) < 0) 
    {
        perror("bind error");
        return;
    }

    if (listen(listener, 10)<0) 
    {
        perror("listen error");
        return;
    }

    printf("server is listening ... \n");

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    event_add(listener_event, NULL);
    event_base_dispatch(base);
}

int init_win_socket()
{
    WSADATA wsaData;

    if(WSAStartup(MAKEWORD(2,2) , &wsaData ) != 0) 
    {
        return -1;
    }

    return 0;
}

int main(int c, char **v)
{

#ifdef WIN32
    init_win_socket();
#endif

    run();

    getchar();
    return 0;
}
几种服务器端IO模型的简单介绍及实现

6、信号驱动IO模型(Signal-driven IO)

使用信号,让内核在描述符就绪时发送SIGIO信号通知应用程序,称这种模型为信号驱动式I/O(signal-driven I/O)。

图示如下:

几种服务器端IO模型的简单介绍及实现

首先开启套接字的信号驱动式I/O功能,并通过sigaction系统调用安装一个信号处理函数。该系统调用将立即返回,我们的进程继续工作,也就是说进程没有被阻塞。当数据报准备好读取时,内核就为该进程产生一个SIGIO信号。随后就可以在信号处理函数中调用recvfrom读取数据报,并通知主循环数据已经准备好待处理,也可以立即通知主循环,让它读取数据报。

无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达期间进程不被阻塞。主循环可以继续执行 ,只要等到来自信号处理函数的通知:既可以是数据已准备好被处理,也可以是数据报已准备好被读取。

7、异步IO模型(asynchronous IO)

异步I/O(asynchronous I/O)由POSIX规范定义。演变成当前POSIX规范的各种早起标准所定义的实时函数中存在的差异已经取得一致。一般地说,这些函数的工作机制是:告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。这种模型与前一节介绍的信号驱动模型的主要区别在于:信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。

示意图如下:

几种服务器端IO模型的简单介绍及实现

我们调用aio_read函数(POSIX异步I/O函数以aio_或lio_开头),给内核传递描述符、缓冲区指针、缓冲区大小(与read相同的三个参数)和文件偏移(与lseek类似),并告诉内核当整个操作完成时如何通知我们。该系统调用立即返回,并且在等待I/O完成期间,我们的进程不被阻塞。本例子中我们假设要求内核在操作完成时产生某个信号,该信号直到数据已复制到应用进程缓冲区才产生,这一点不同于信号驱动I/O模型。

 

参考:

《UNIX网络编程》

使用 libevent 和 libev 提高网络应用性能:http://www.ibm.com/developerworks/cn/aix/library/au-libev/

使用异步 I/O 大大提高应用程序的性能:https://www.ibm.com/developerworks/cn/linux/l-async/

 

 

作者:阿凡卢
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。http://www.cnblogs.com/luxiaoxun/p/3691800.html
上一篇:微信Web开发者工具 移动调试 手机连接不上


下一篇:Exchange与ADFS单点登录 PART 5:添加ADFS信赖方信任