在上文中我们提到了libevent 事件循环event_dispatch 的大致过程,以epoll为例,我们看一下事件被如何加入激活队列。
//在epoll_dispatch中,epoll_wait返回可用的文件描述符号后,由fd在io_map中找到相应的io事件
//
void
evmap_io_active_(struct event_base *base, evutil_socket_t fd, short events)
{
struct event_io_map *io = &base->io;
struct evmap_io *ctx;
struct event *ev; #ifndef EVMAP_USE_HT
if (fd < || fd >= io->nentries)
return;
#endif
GET_IO_SLOT(ctx, io, fd, evmap_io);// 找到IO 事件 if (NULL == ctx)
return;
LIST_FOREACH(ev, &ctx->events, ev_io_next) { //因为一个文件描述符号可能有多个事件并将它们以链表保存,遍厉它们看关心的事件是否发生了。
if (ev->ev_events & events)
event_active_nolock_(ev, ev->ev_events & events, ); //加入到激活队列。
}
}
再看函数event_active_nolock_(ev, ev->ev_events & events, 1)。
void
event_active_nolock_(struct event *ev, int res, short ncalls)
{
struct event_base *base; event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback)); base = ev->ev_base;
EVENT_BASE_ASSERT_LOCKED(base); if (ev->ev_flags & EVLIST_FINALIZING) {
/* XXXX debug */
return;
}
//判断事件是否已经在激活队列或者在延迟激活队列
switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
default:
case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER:
EVUTIL_ASSERT();
break;
case EVLIST_ACTIVE:
/* We get different kinds of events, add them together */
ev->ev_res |= res;
return;
case EVLIST_ACTIVE_LATER:
ev->ev_res |= res;
break;
case :
ev->ev_res = res;
break;
}
if (ev->ev_pri < base->event_running_priority)
base->event_continue = ; if (ev->ev_events & EV_SIGNAL) {
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == event_to_event_callback(ev) &&
!EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif
ev->ev_ncalls = ncalls;
ev->ev_pncalls = NULL;
}
//event_to_event_callback(ev) 激活队列只放入回调结构体
event_callback_activate_nolock_(base, event_to_event_callback(ev));
} int
event_callback_activate_nolock_(struct event_base *base,
struct event_callback *evcb)
{
int r = 1; if (evcb->evcb_flags & EVLIST_FINALIZING)
return 0; switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) {
default:
EVUTIL_ASSERT(0);
case EVLIST_ACTIVE_LATER:
//事件为延迟激活
event_queue_remove_active_later(base, evcb);
r = 0;
break;
case EVLIST_ACTIVE:
return 0;
case 0:
break;
}
将回调加入base中的activequeue中。
event_queue_insert_active(base, evcb);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base); return r;
}
我们在timeout_process中也可以看到event_active_nolock_(ev, EV_TIMEOUT, 1); 将定时事件也加入了激活队列中,在dispatch中 我们可以看到对激活对列中的事件逐一调用
static int
event_process_active(struct event_base *base)
{
/* Caller must hold th_base_lock */
..........
..........
//遍厉激活对列开始执行回调函数
for (i = ; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
base->event_running_priority = i;
activeq = &base->activequeues[i];
if (i < limit_after_prio)
//执行回调函数
c = event_process_active_single_queue(base, activeq,
INT_MAX, NULL);
else
c = event_process_active_single_queue(base, activeq,
maxcb, endtime);
if (c < ) {
goto done;
} else if (c > )
break; /* Processed a real event; do not
* consider lower-priority events */
/* If we get here, all of the events we processed
* were internal. Continue. */
}
} done:
base->event_running_priority = -; return c;
} static int
event_process_active_single_queue(struct event_base *base,
struct evcallback_list *activeq,
int max_to_process, const struct timeval *endtime)
{ ........
//执行回调函数
switch (evcb->evcb_closure) {
case EV_CLOSURE_EVENT_SIGNAL:
EVUTIL_ASSERT(ev != NULL);
event_signal_closure(base, ev);
break;
case EV_CLOSURE_EVENT_PERSIST:
EVUTIL_ASSERT(ev != NULL);
event_persist_closure(base, ev);
break;
case EV_CLOSURE_EVENT: {
void (*evcb_callback)(evutil_socket_t, short, void *);
EVUTIL_ASSERT(ev != NULL);
evcb_callback = *ev->ev_callback;
EVBASE_RELEASE_LOCK(base, th_base_lock);
evcb_callback(ev->ev_fd, ev->ev_res, ev->ev_arg); //回调函数
}
break;
case EV_CLOSURE_CB_SELF: {
void (*evcb_selfcb)(struct event_callback *, void *) = evcb->evcb_cb_union.evcb_selfcb;
EVBASE_RELEASE_LOCK(base, th_base_lock);
evcb_selfcb(evcb, evcb->evcb_arg); //
}
break;
case EV_CLOSURE_EVENT_FINALIZE:
case EV_CLOSURE_EVENT_FINALIZE_FREE: {
void (*evcb_evfinalize)(struct event *, void *);
int evcb_closure = evcb->evcb_closure;
EVUTIL_ASSERT(ev != NULL);
base->current_event = NULL;
evcb_evfinalize = ev->ev_evcallback.evcb_cb_union.evcb_evfinalize;
EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_FINALIZING));
EVBASE_RELEASE_LOCK(base, th_base_lock);
evcb_evfinalize(ev, ev->ev_arg);
event_debug_note_teardown_(ev);
if (evcb_closure == EV_CLOSURE_EVENT_FINALIZE_FREE)
mm_free(ev);
}
break;
case EV_CLOSURE_CB_FINALIZE: {
void (*evcb_cbfinalize)(struct event_callback *, void *) = evcb->evcb_cb_union.evcb_cbfinalize;
base->current_event = NULL;
EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_FINALIZING));
EVBASE_RELEASE_LOCK(base, th_base_lock);
evcb_cbfinalize(evcb, evcb->evcb_arg); //
}
break;
default:
EVUTIL_ASSERT();
}
}