Libevent基础之 Reactor模式

个人作品,未经允许禁止转载!

该代码意在说明reactor的原理,没有深究细节。

详见注释

#include<stdio.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <ctime>
#include <netinet/in.h>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <arpa/inet.h>
#include <unistd.h>
#include <iostream>
using namespace std;

#define MAX_BUFFER_SIZE 1024
#define MAX_EVENTS_NUMBER (1024)
#define SERVER_PORT 9998
#define MAX_PENDING_CONNECTIONS 1024

// Reactor结构体,epoll_event中传入,当发生事件时,该事件会由epoll_event携带传出,从而获取到fd的历史信息和回调函数,这是reactor的核心
struct reactor_event_s{
    int fd;
    int events; // EPOLLIN or EPOLLOUT
    void *arg;  // Self defined data pointer
    void (*callback)(int fd, int events, void *arg);    // When events happens, call this callback
    int valid;  // Is the data valid in an array.etc
    // 已经接受的数据及其长度,当一次epoll返回没有接收到完整消息时,数据可以暂时存放在这里,下一次接着使用
    // 在有状态协议中,查找fd的历史信息会耗费大量时间,epoll可以帮忙管理历史数据,但是仍然使用了Log(n)的复杂度
    // 这个问题可以用协程来解决,当该协程进入CPU调度时,协程的上下文已经放入了他的栈中,无需查找数据
    char buffer[MAX_BUFFER_SIZE];
    int len;
    // 上次活跃事件,可以用于做超时处理。
    long last_active;   // Last active timestamp, for timeout
};

int epoll_fd;
// Reactor结构体数组,存放已有的Reactor信息
// 这里应该采取更合理的数据结构如链表进行管理
reactor_event_s reactor_events[MAX_EVENTS_NUMBER];

// 初始化一个监听socket,监听端口并将其加入epoll
// 这个监听socket的回调函数是 accpetconn
void init_listen_socket(int epfd, unsigned short port);

// 在指定的reactor_event_s地址上设置reactor信息
void event_set(reactor_event_s *re, int fd, void(*callback)(int, int, void*), void* arg);
// 向reactor添加event,并加入epoll
void event_add(int epfd, int events, reactor_event_s* re);
// 从reactor删除event,并从epoll删除
void event_del(int epfd, reactor_event_s* re);
// 监听socket的回调函数,内部会accept,然后生成新的reactor加入epoll
void acceptconn(int fd, int events, void* callback);
// 接收数据的回调,接收到之后进行回显
void recv_data(int fd, int events, void* callback);
// 发送数据的简单函数
void send_data(int fd, char* data, int len);

bool running = true;
int main(int argc, char* argv[]){

    epoll_fd = epoll_create(MAX_EVENTS_NUMBER + 1);

    init_listen_socket(epoll_fd, SERVER_PORT);

    epoll_event events[MAX_EVENTS_NUMBER + 1];
    while(running){
        long now = time(nullptr);
        // Check over 10s without data, disconnect
        for (int i=0; i < MAX_EVENTS_NUMBER; i++){
            if (reactor_events[i].valid == 1 && now - reactor_events[i].last_active >= 10){
                close(reactor_events[i].fd);
                printf("[fd=%d] timeout. disconnect...\n", reactor_events[i].fd);
                event_del(epoll_fd, &reactor_events[i]);
            }
        }

        // Check for new event
        int nfd = epoll_wait(epoll_fd, events, MAX_EVENTS_NUMBER + 1, 0);
        if (nfd < 0){
            printf("epoll_awit");
            break;
        }
        else if (nfd > 0){
            // printf("%d\n", nfd);
        }
        for (int i=0; i<nfd; i++){
            reactor_event_s* re= static_cast<reactor_event_s*>(events[i].data.ptr);
            if ((events[i].events & EPOLLIN) && (re->events & EPOLLIN))
                re->callback(re->fd, events[i].events, re->arg);
            if ((events[i].events & EPOLLOUT) && (re->events & EPOLLOUT))
                re->callback(re->fd, events[i].events, re->arg);
        }
    }


}

void event_set(reactor_event_s *re, int fd, void(*callback)(int, int, void*), void* arg){
    re->fd = fd;
    re->events = 0;
    re->callback = callback;
    re->arg = arg;
    re->buffer[0] = '\0';
    re->len = 0;
    re->last_active = time(NULL);
    return ;
}

void event_add(int epfd, int events, reactor_event_s* re){
    epoll_event event{};
    event.events = re->events = events;
    event.data.ptr = re;
    int op;
    if (re->valid == 1){
        op = EPOLL_CTL_MOD;
    }
    else{
        op = EPOLL_CTL_ADD;
        re->valid = 1;
    }
    if (epoll_ctl(epfd, op, re->fd, &event) < 0)
        printf("%s: add event failed!\n", __func__);
    else printf("%s: add event success!\n", __func__);
    return ;
}



void init_listen_socket(int epfd, unsigned short port){
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    fcntl(listen_fd, F_SETFL, O_NONBLOCK);
    event_set(&reactor_events[MAX_EVENTS_NUMBER], listen_fd, acceptconn, &reactor_events[MAX_EVENTS_NUMBER]);
    event_add(epfd, EPOLLIN, &reactor_events[MAX_EVENTS_NUMBER]);

    sockaddr_in addrin{};
    addrin.sin_family = AF_INET;
    addrin.sin_addr.s_addr = INADDR_ANY;
    addrin.sin_port = htons(port);

    if (bind(listen_fd, (sockaddr*)&addrin, sizeof(addrin)) != 0){
        perror("bind");
        exit(-1);
    }

    if (listen(listen_fd, MAX_PENDING_CONNECTIONS) != 0){
        perror("listen");
        exit(-1);
    }
    return ;
}

void acceptconn(int listen_fd, int events, void* callback){
    sockaddr_in caddr;
    int client_fd;
    socklen_t len = sizeof(caddr);
    if ((client_fd = accept(listen_fd, (sockaddr*)&caddr, &len)) == -1){
        if (errno != EAGAIN && errno != EINTR){
            // handle error here
        }
        printf("[Error]: Failed to accept in %s: %s\n", __func__, strerror(errno));
        return;
    }

    int i;
    // 为什么这里使用 do while(0): https://www.cnblogs.com/lizhenghn/p/3674430.html
    do{
        for (i=0; i < MAX_EVENTS_NUMBER; i++){
            if (reactor_events[i].valid == 0)
                break;
        }
        if (i == MAX_EVENTS_NUMBER){
            printf("%s: max connection limit exceeded!\n", __func__);
            break;
        }
        if (fcntl(client_fd, F_SETFL, O_NONBLOCK) < 0){
            printf("%s: fnctl set nonblocking failed!\n", __func__);
            break;
        }
        event_set(&reactor_events[i], client_fd, recv_data, &reactor_events[i]);
        event_add(epoll_fd, EPOLLIN, &reactor_events[i]);
    } while(0);
    printf("New connection [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(caddr.sin_addr), ntohs(caddr.sin_port), reactor_events[i].last_active, i);
}

void recv_data(int fd, int events, void* arg){
    reactor_event_s *re = static_cast<reactor_event_s*>(arg);
    re->last_active = time(nullptr);
    int len;
    len = recv(fd, re->buffer, MAX_BUFFER_SIZE, 0);
    re->len = len;
    re->buffer[len] = '\0';
    if (len > 0) {
        printf("Recv <= %s. Sending Back...\n", re->buffer);
        send_data(fd, re->buffer, len);
    }
    return ;
}

void send_data(int fd, char* data, int len){
    send(fd, data, len, 0);
}

void event_del(int epfd, reactor_event_s* re){
    epoll_event event{};
    if (re->valid != 1) return;
    // FIXME: Why is this necessary ?
    //event.data.ptr = re;
    //event.data.fd = re->fd;
    epoll_ctl(epfd, EPOLL_CTL_DEL, re->fd, &event);
    re->valid = 0;
    return ;
}

上一篇:Javascript高级程序设计第一章 | ch1 | 阅读笔记


下一篇:ES6 map数据结构