libev与多线程

libev与多线程

libev支持多线程,也是one loop per-thread的模型。

但是要手动的启动和管理多个libev的细节。

我采用的方式是: N个libev的数据套接字线程 + 1个libev的监听套接字线程。

即,1个主线程里跑1个libev,称为main_libev。负责驱动accept socket,同时还注册1个timer用于定时统计链接相关的数据。

其他N个线程里分别各自的libev,称为data_libev,负责驱动数据的socket。

libev与多线程

当有新连接到来时,main_libev的回调里通过enqueue操作把新的fd传给N个线程的任意一个,这个操作是线程安全的。

同时给这个线程发送一个异步事件(async_send操作),告知其可以接客了。

data_libev在async的回调里,取出新的fd,然后加入到自己的ev中,并设置相应的回调。

data_libev的封装

data_libev被封装在类中,同时提供ev_run(),receive_fd(),add_newfds()等操作。

typedef std::list<int> FDList;
typedef FDList::iterator FDItr;
class LibEVPerThread
{
public:
    LibEVPerThread()
    {
        m_n_fd_received = 0;
        m_n_fd_added = 0;
        pthread_mutex_init(&m_lock, NULL);
        m_evloop_ptr = ev_loop_new(EVBACKEND_EPOLL);
    }
    bool set_id(unsigned int id) { m_id = id; }
    int ev_run()
    {
        ev_async_init(&m_async, async_cb);
        ev_async_start(m_evloop_ptr, &m_async);
        ev_set_userdata(m_evloop_ptr, this);
        printf("begin to ev_run()\n");
        ::ev_run(m_evloop_ptr, 0);
        printf("end to ev_run()\n");
    }
    bool receive_fd(int fd)
    {
        printf("ID: %u ----> receive a fd:%d\n", m_id, fd);
        enque_fd(fd);
        ev_async_send(m_evloop_ptr, &m_async);
    }
    int add_newfds()
    {
        FDList tmpfds = deque_fd();
        for (FDItr itr=tmpfds.begin(); itr!=tmpfds.end(); ++itr)
        {
            m_n_fd_added ++;
            add_fd(*itr);
            printf("ID: %u ----> added a fd:%d, totoal: %u\n", m_id, *itr, m_n_fd_added);
        }
    }
private:
    int enque_fd(int fd)
    {
        pthread_mutex_lock(&m_lock);
        m_sockfds.push_back(fd);
        pthread_mutex_unlock(&m_lock);
    }
    FDList deque_fd()
    {
        FDList tmp;
        pthread_mutex_lock(&m_lock);
        tmp.swap(m_sockfds);
        pthread_mutex_unlock(&m_lock);
        return tmp;
    }
    bool add_fd(int fd)
    {
        struct ev_io *evio_ptr = new ev_io();
        ev_io_init(evio_ptr, message_cb, fd, EV_READ);
        ev_io_start(m_evloop_ptr, evio_ptr);
    }
private:
    pthread_mutex_t m_lock;
    FDList m_sockfds;
    struct ev_async m_async;
    struct ev_loop *m_evloop_ptr;
    pthread_t m_tid;
    unsigned int m_n_fd_added;
    unsigned int m_n_fd_received;
    unsigned int m_id;
};

ev_run() 启动libev。

receive_fd() 提供给外界的接口,负责在新的fd到来后,添加到这个类中,并同时libev。

add_newfds() 提供给自己使用,负责在async的回调中,把新的fd添加到到自己的libev中。

ev_run()启动的时候只有一个async的事件,在它的回调里调用libev类的add_newfds()。

void async_cb(EV_P_ ev_async *w, int events)
{
    EVThreadInfo *ev_thread_info = (EVThreadInfo*)ev_userdata(EV_A);
    ev_thread_info->add_newfds();
}

main_libev

在main()的线程里使用EV_DEFAULT,监视accept socket的事件。

struct ev_loop *main_evloop = EV_DEFAULT;
    int accept_fd = init_accept_sock();
    ev_io accept_ev;
    ev_io_init (&accept_ev, on_connection_cb, accept_fd, EV_READ);
    ev_io_start (main_evloop, &accept_ev);

在它的回调函数on_connection_cb里,首先accept接收一个新的套接字,然后robin的方式选择一个libev类,调用libev类的receive_fd,把新的套接字发送给libev类,并通知。

void on_connection_cb(EV_P_ ev_io *w, int events)
{
     int newfd = accept(w->fd, NULL, 0);
     EVThreadInfo* ev_thread_info = &g_thread_info_prt[g_robin%g_n_thread];
     ev_thread_info->receive_fd(newfd);
     g_robin++;
}

实验和待处理问题

启动服务端

[zunbao.fengzb@rt1f07470.tbc /home/zunbao.fengzb/echo_client_server_libev]
$./multi_loop_server 6089 24

启动客户端

[CHECK@ump137035.sqa.cm4 ~/rds_proxy_test/proxy/echo_client_server_libev]
$./multi_loop_client 10.98.109.161 6089 24 85

实验结果

in timer callback, qps 1026115
in timer callback, qps 1016615
in timer callback, qps 1025216

client每次发送1个字节。

每个数据包的大小= 1Byte载荷 + IP头部20Byte + tcp头部20Byte+tcp选项12Byte = 53Byte

同时,每个数据包都要回一个ack包,ack包大小= IP头部20Byte + tcp头部20Byte = 40Byte

上一篇:Linux网络解读(4) - 数据包的发送接收之设备层


下一篇:boost::asio的类继承体系