qemu的主事件循环分析已经有半个月了,下面来总结下qemu aio相关的api
一下Aio context指的是aio looper线程
首先我们要知道qemu的aio主要实现了下面五种能力
1 bh
1 bh的创建
/**
* aio_bh_new: Allocate a new bottom half structure.
*
* Bottom halves are lightweight callbacks whose invocation is guaranteed
* to be wait-free, thread-safe and signal-safe. The #QEMUBH structure
* is opaque and must be allocated prior to its use.
*/
QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
{
QEMUBH *bh;
bh = g_new(QEMUBH, 1);
*bh = (QEMUBH){
.ctx = ctx,
.cb = cb,
.opaque = opaque,
};
qemu_mutex_lock(&ctx->bh_lock);
bh->next = ctx->first_bh;
/* Make sure that the members are ready before putting bh into list */
smp_wmb();
ctx->first_bh = bh;
qemu_mutex_unlock(&ctx->bh_lock);
return bh;
}
bh创建就被添加进AioContext, 这个函数是线程安全的,所以可以在非Aio context中调用
2 调用
void qemu_bh_schedule_idle(QEMUBH *bh)
{
bh->idle = 1;
/* Make sure that idle & any writes needed by the callback are done
* before the locations are read in the aio_bh_poll.
*/
atomic_mb_set(&bh->scheduled, 1);
}
void qemu_bh_schedule(QEMUBH *bh)
{
AioContext *ctx;
ctx = bh->ctx;
bh->idle = 0;
/* The memory barrier implicit in atomic_xchg makes sure that:
* 1. idle & any writes needed by the callback are done before the
* locations are read in the aio_bh_poll.
* 2. ctx is loaded before scheduled is set and the callback has a chance
* to execute.
*/
if (atomic_xchg(&bh->scheduled, 1) == 0) {
aio_notify(ctx);
}
}
调度.两种方式,一种是idle的,也就是如果没有其他事件触发aio 唤醒,要等到10ms后执行, 一种是直接唤醒aio 执行
3 取消/删除
/* This func is async.
*/
void qemu_bh_cancel(QEMUBH *bh)
{
bh->scheduled = 0;
}
/* This func is async.The bottom half will do the delete action at the finial
* end.
*/
void qemu_bh_delete(QEMUBH *bh)
{
bh->scheduled = 0;
bh->deleted = 1;
}
注意这两个操作都不是原子的,并且可以异步的,所以有可能取消后还是被调度(概率比较小)
另外bh的使用场景更像是一半异步完成,后半段放到aio context线程执行
2 fd(文件描述符事件)
/* Register a file descriptor and associated callbacks. Behaves very similarly
* to qemu_set_fd_handler. Unlike qemu_set_fd_handler, these callbacks will
* be invoked when using aio_poll().
*
* Code that invokes AIO completion functions should rely on this function
* instead of qemu_set_fd_handler[2].
*/
void aio_set_fd_handler(AioContext *ctx,
int fd,
bool is_external,
IOHandler *io_read,
IOHandler *io_write,
void *opaque)
{
AioHandler *node;
bool is_new = false;
bool deleted = false;
node = find_aio_handler(ctx, fd);
/* Are we deleting the fd handler? */
if (!io_read && !io_write) {
if (!node) {
return;
}
g_source_remove_poll(&ctx->source, &node->pfd);
/* If the lock is held, just mark the node as deleted */
if (ctx->walking_handlers) {
node->deleted = 1;
node->pfd.revents = 0;
} else {
/* Otherwise, delete it for real. We can't just mark it as
* deleted because deleted nodes are only cleaned up after
* releasing the walking_handlers lock.
*/
QLIST_REMOVE(node, node);
deleted = true;
}
} else {
if (node == NULL) {
/* Alloc and insert if it's not already there */
node = g_new0(AioHandler, 1);
node->pfd.fd = fd;
QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
g_source_add_poll(&ctx->source, &node->pfd);
is_new = true;
}
/* Update handler with latest information */
node->io_read = io_read;
node->io_write = io_write;
node->opaque = opaque;
node->is_external = is_external;
node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
}
aio_epoll_update(ctx, node, is_new);
aio_notify(ctx);
if (deleted) {
g_free(node);
}
}
注意这个函数是不持锁的, 另外该方法有两个作用,一个是注册fd,一个是注销fd, 所以现在猜测该方法只能在aio context线程调用
3 timer
/**
* timer_new_tl:
* @timer_list: the timer list to attach the timer to
* @scale: the scale value for the timer
* @cb: the callback to be called when the timer expires
* @opaque: the opaque pointer to be passed to the callback
*
* Creeate a new timer and associate it with @timer_list.
* The memory is allocated by the function.
*
* This is not the preferred interface unless you know you
* are going to call timer_free. Use timer_init instead.
*
* Returns: a pointer to the timer
*/
static inline QEMUTimer *timer_new_tl(QEMUTimerList *timer_list,
int scale,
QEMUTimerCB *cb,
void *opaque)
{
QEMUTimer *ts = (QEMUTimer *) g_malloc0(sizeof(QEMUTimer));
timer_init_tl(ts, timer_list, scale, cb, opaque);
return ts;
}
/**
* timer_del:
* @ts: the timer
*
* Delete a timer from the active list.
*
* This function is thread-safe but the timer and its timer list must not be
* freed while this function is running.
*/
void timer_del(QEMUTimer *ts);
4 threadpool
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
ThreadPoolFunc *func, void *arg,
BlockCompletionFunc *cb, void *opaque)
{
ThreadPoolElement *req;
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
req->func = func;
req->arg = arg;
req->state = THREAD_QUEUED;
req->pool = pool;
QLIST_INSERT_HEAD(&pool->head, req, all);
trace_thread_pool_submit(pool, req, arg);
qemu_mutex_lock(&pool->lock);
if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
spawn_thread(pool);
}
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
qemu_mutex_unlock(&pool->lock);
qemu_sem_post(&pool->sem);
return &req->common;
}
这个函数看样子也只能在aio context线程中调用, 另外当没有idle线程时候会尝试调用
static void thread_pool_cancel(BlockAIOCB *acb)
{
ThreadPoolElement *elem = (ThreadPoolElement *)acb;
ThreadPool *pool = elem->pool;
trace_thread_pool_cancel(elem, elem->common.opaque);
qemu_mutex_lock(&pool->lock);
if (elem->state == THREAD_QUEUED &&
/* No thread has yet started working on elem. we can try to "steal"
* the item from the worker if we can get a signal from the
* semaphore. Because this is non-blocking, we can do it with
* the lock taken and ensure that elem will remain THREAD_QUEUED.
*/
qemu_sem_timedwait(&pool->sem, 0) == 0) {
QTAILQ_REMOVE(&pool->request_list, elem, reqs);
qemu_bh_schedule(pool->completion_bh);
elem->state = THREAD_DONE;
elem->ret = -ECANCELED;
}
qemu_mutex_unlock(&pool->lock);
}
取消 从取消队列中删除