qemu中的eventfd——用法与原理

文章目录

 

eventfd可以用于线程或者父子进程间通信,内核通过eventfd也可以向用户空间进程发消息。其核心实现是在内核空间维护一个计数器,向用户空间暴露一个与之关联的匿名fd。不同线程通过读写该fd通知或等待对方,内核通过写该fd通知用户程序

eventfd用法

  • eventfd机制接口简单,核心只有4个,分别是创建eventfd(eventfd),写eventfd(write),读eventfd(read),监听eventfd(poll/select)。
  1. int eventfd(unsigned int initval, int flags):创建一个eventfd,它的返回值是一个文件fd,可以读写。该接口传入一个初始值initval用于内核初始化计数器,flags用于控制返回的eventfd的read行为。flags如果包含EFD_NONBLOCK,read eventfd将不会阻塞,如果包含EFD_SEMAPHORE,read eventfd每次读之后内核计数器都减1。
  2. ssize_t write(int fd, const void *buf, size_t count):写eventfd,传入一个8字节的buffer,buffer的值增加到内核维护的计数器中。
  3. ssize_t read(int fd, void *buf, size_t count):读eventfd,如果计数器非0,信号量方式返回1,否则返回计数器的值。如果计数器为0,读失败,阻塞模式下会阻塞直到计数器非0,非阻塞模式下返回EAGAIN错误。
  4. int poll(struct pollfd *fds, nfds_t nfds, int timeout):监听eventfd是否可读。

demo

  • 代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <pthread.h>
#include <unistd.h>

int efd;

void *threadFunc()
{
    uint64_t buffer;
    int rc;
    int i = 0;
    while(i++ < 2){
        /* 如果计数器非0,read成功,buffer返回计数器值。成功后有两种行为:信号量方式计数器每次减,其它每次清0。
         * 如果计数器0,read失败,由两种返回方式:EFD_NONBLOCK方式会阻塞,反之返回EAGAIN 
         */
        rc = read(efd, &buffer, sizeof(buffer));

        if (rc == 8) {
            printf("notify success, eventfd counter = %lu\n", buffer);
        } else {
            perror("read");
        }
    }
}

static void
open_eventfd(unsigned int initval, int flags)
{
    efd = eventfd(initval, flags);
    if (efd == -1) {
        perror("eventfd");
    }
}

static void
close_eventfd(int fd)
{
    close(fd);
}
/* counter表示写eventfd的次数,每次写入值为2 */
static void test(int counter)
{
    int rc;
    pthread_t tid;
    void *status;
    int i = 0;
    uint64_t buf = 2;

    /* create thread */
    if(pthread_create(&tid, NULL, threadFunc, NULL) < 0){
        perror("pthread_create");
    }

    while(i++ < counter){
        rc = write(efd, &buf, sizeof(buf));
        printf("signal to subscriber success, value = %lu\n", buf);

        if(rc != 8){
            perror("write");
        }
        sleep(2);
    }

    pthread_join(tid, &status);
}

int main()
{
    unsigned int initval;

    printf("NON-SEMAPHORE BLOCK way\n");
    /* 初始值为4, flags为0,默认blocking方式读取eventfd */
    initval = 4;
    open_eventfd(initval, 0);
    printf("init counter = %lu\n", initval);

    test(2);

    close_eventfd(efd);

    printf("change to SEMAPHORE way\n");

    /* 初始值为4, 信号量方式维护counter */
    initval = 4;
    open_eventfd(initval, EFD_SEMAPHORE);
    printf("init counter = %lu\n", initval);

    test(2);

    close_eventfd(efd);

    printf("change to NONBLOCK way\n");

    /* 初始值为4, NONBLOCK方式读eventfd */
    initval = 4;
    open_eventfd(initval, EFD_NONBLOCK);
    printf("init counter = %lu\n", initval);

    test(2);

    close_eventfd(efd);

    return 0;
}

分析

  • demo中创建eventfd使用了三种方式,分别如下:
  1. 阻塞非信号量:以非信号量方式创建的eventfd,在读eventfd之后,内核的计数器归零,下一次再读就会阻塞,除非有进程再次写eventfd。
    qemu中的eventfd——用法与原理
    内核计数器初始值为4,主线程第1次写入2,计数器增至6
    读线程返回6,之后计数器清0,读线程阻塞
    下一次主线程写入2,计数器增至2,读线程返回2
  2. 阻塞信号量:以信号量方式创建的eventfd,在读eventfd之后,内核的计数器减1
    qemu中的eventfd——用法与原理
    内核计数器初始值为4,主线程第一次写入2,计数器增至6
    读线程返回1,计数器减1变成5,读线程循环读返回1,计数器再减1变成4
    主线程写入2计数器增至6
  3. 非阻塞非信号量:读eventfd之后,计数器清0,再次读eventfd返回EAGAIN
    qemu中的eventfd——用法与原理
    内核计数器初始值为4,主线程第一次写入2,计数器增至6
    读线程返回6,计数器清0,读线程循环非阻塞读返回错误码EAGAIN
    主线程写入2计数器增至2
  • demo阻塞读模式下,信号量和非信号量方式如下图所示:
    qemu中的eventfd——用法与原理

eventfd内核实现

创建eventfd

系统调用

  • eventfd系统调用有三个步骤:
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)
{
	......
	get_unused_fd_flags(flags & EFD_SHARED_FCNTL_FLAGS);		/* 1 */
    file = eventfd_file_create(count, flags);					/* 2 */
    fd_install(fd, file);										/* 3 */
    ......
}
1. 从进程的文件描述符表(fdt)中查询可用的fd
2. 在匿名节点文件系统(anon_inodefs)中创建一个文件结构体
3. 安装文件描述符,将fd和file关联起来

qemu中的eventfd——用法与原理

eventfd_ctx

  • eventfd_ctx结构体是eventfd实现的核心,如下:
struct eventfd_ctx {
    struct kref kref;
    wait_queue_head_t wqh;
    /*            
     * Every time that a write(2) is performed on an eventfd, the
     * value of the __u64 being written is added to "count" and a
     * wakeup is performed on "wqh". A read(2) will return the "count"
     * value to userspace, and will reset "count" to zero. The kernel
     * side eventfd_signal() also, adds to the "count" counter and
     * issue a wakeup.
     */
    __u64 count;
    unsigned int flags;
}; 
  1. wqh:等待队列头,所有阻塞在eventfd上的读进程挂在该等待队列上
  2. count:eventfd计数器,当用户程序write eventfd时内核会将值加在计数器上,用户程序read eventfd之后,内核会将值减1或者清0(由EFD_SEMAPHORE标志决定),当计数器为0时,内核会将read进程挂载等待队列头wqh指向的队列上。
    两种方式可以唤醒等待在eventfd上的进程,一个是用户态write,另一个是内核态的eventfd_signal。从这里可以看出eventfd不仅可以用于用户进程相互通信,也可以用作内核通知用户进程的手段。
  3. flags:决定用户read后内核的处理方式,EFD_SEMAPHORE,EFD_CLOEXEC,EFD_NONBLOCK三个取值
  • eventfd_ctx的初始化在eventfd_file_create中实现,如下:
struct file *eventfd_file_create(unsigned int count, int flags)
{   
    struct file *file;
    struct eventfd_ctx *ctx;
	......    
    ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
    kref_init(&ctx->kref);
    init_waitqueue_head(&ctx->wqh);									/* 1 */
    ctx->count = count;
    ctx->flags = flags;
    file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx,		/* 2 */
                  O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS));
	......
    return file;
}
  •  
  • eventfd的file操作由eventfd_fops实现,如下:
static const struct file_operations eventfd_fops = {
	......
    .read       = eventfd_read,
    .write      = eventfd_write,
	......
};  

读eventfd

  • 用户进程读eventfd时,最终会进入 eventfd_read,如下:
static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
                loff_t *ppos)
{   
    struct eventfd_ctx *ctx = file->private_data;
	......
    res = eventfd_ctx_read(ctx, file->f_flags & O_NONBLOCK, &cnt);	/* 如果count非0,将其放在cnt中返回给用户进程 */
	......
}
  • eventfd_read的核心是eventfd_ctx_read,看一下它的说明和实现:
/**
 * eventfd_ctx_read - Reads the eventfd counter or wait if it is zero.
 * @ctx: [in] Pointer to eventfd context.
 * @no_wait: [in] Different from zero if the operation should not block.
 * @cnt: [out] Pointer to the 64-bit counter value.
 *
 * Returns %0 if successful, or the following error codes:
 *
 *  - -EAGAIN      : The operation would have blocked but @no_wait was non-zero.
 *  - -ERESTARTSYS : A signal interrupted the wait operation.
 *
 * If @no_wait is zero, the function might sleep until the eventfd internal
 * counter becomes greater than zero.
 */ 
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)
{               
    ssize_t res;
    /* 声明一个等待队列项并关联本进程的task 
	 * 等待队列关联的函数为default_wake_function,作为唤醒函数存在
	 */
    DECLARE_WAITQUEUE(wait, current);
    
    spin_lock_irq(&ctx->wqh.lock);
    *cnt = 0;
    res = -EAGAIN;
    /* 如果count大于0,读进程不阻塞 */
    if (ctx->count > 0)
        res = 0;	
    else if (!no_wait) {
        __add_wait_queue(&ctx->wqh, &wait);	/* 将等待队列项添加到eventfd的等待队列头中 */
        for (;;) {
        	/* 设置阻塞状态 */
            set_current_state(TASK_INTERRUPTIBLE);
            /* 如果count大于0,break,这种情况在第一次循环之后可能发生 
			 * eventfd的写进程在写完计数器后,会唤醒阻塞在eventfd上的读进程
			 * 这时读进程重新被调度器调度,进入新一轮的循环,来到这里,重新检查内核计数器
			 */
            if (ctx->count > 0) {
                res = 0;
                break;
            }
            /* 如果有未处理的信号,也break,进行处理 */
            if (signal_pending(current)) {
                res = -ERESTARTSYS;
                break;
            }
          
            spin_unlock_irq(&ctx->wqh.lock);
            /* 主动请求调度,当前进程被挂起 */
            schedule();		  
            /* 挂起的进程重新运行 */
            spin_lock_irq(&ctx->wqh.lock);
        }
      	/* 从循环中跳出,将当前进程从eventfd的等待队列中删除 */
        __remove_wait_queue(&ctx->wqh, &wait);
          /* 设置运行状态 */
        __set_current_state(TASK_RUNNING);
    }
    if (likely(res == 0)) {
    	/* 读取counter */
        eventfd_ctx_do_read(ctx, cnt);
        /* 如果eventfd上有阻塞的写进程,将其唤醒 */
        if (waitqueue_active(&ctx->wqh))
            wake_up_locked_poll(&ctx->wqh, POLLOUT);
    }
    spin_unlock_irq(&ctx->wqh.lock);

    return res;
}
  • eventfd_ctx_read函数中有一个地方要注意,当执行eventfd_ctx_do_read函数之后,表示真正获取到了计数器的值,这之后会检查阻塞在eventfd上的写进程,因为写进程可能会因为计数器超过范围而阻塞,读进程完成之后计数器值会变小,这个时候可以唤醒写进程重新检查写入的值是否超过范围。这个在写eventfd中会介绍。
  • 看一下真正获取内核计数器的函数:
static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
{           
	/* 如果是信号量方式,返回给用户进程的counter始终是1,之后内核counter减1
	 * 如果是非信号量方式,返回给用户进程的counter就是内核维护的counter
	 * 之后内核counter清0
	 */
    *cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;
    ctx->count -= *cnt;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

写eventfd

  • 从eventfd_ctx 的数据结构解释中我们了解到,写eventfd有两种场景,一是用户态写,二是内核态写。这两种场景分别用于实现用户态通知和内核态的通知。这节主要介绍用户态发起的eventfd写操作。
  • 写eventfd的主要动作是往计数器加用户态传入的值,有一种情况会阻塞写进程,就是计数器值到达或者超出范围。这时写进程阻塞直到计数器在正常范围内,注意,用户态写eventfd也不允许到达计数器的最大值,因为这个最大值要为内核态的写保留,后面的内核态写会介绍
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
                 loff_t *ppos)
{   
    struct eventfd_ctx *ctx = file->private_data;
    ssize_t res;
    __u64 ucnt;
    /* 声明等待队列项,default_wake_function为唤醒函数 */
    DECLARE_WAITQUEUE(wait, current);
	/* 用户态传入的buffer长度必须>=8字节,否则返回错误 */
    if (count < sizeof(ucnt))
        return -EINVAL;
    if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
        return -EFAULT;
    /* 如果写入的值等于64bit所能表示的最大值,返回错误 */
    if (ucnt == ULLONG_MAX)
        return -EINVAL;
    spin_lock_irq(&ctx->wqh.lock);
    res = -EAGAIN;
    /* 写入的ucnt后内核计数器不会溢出 */
    if (ULLONG_MAX - ctx->count > ucnt)
        res = sizeof(ucnt);
    /* 写入后内核计数器溢出,并且是阻塞方式打开的eventfd */
    else if (!(file->f_flags & O_NONBLOCK)) {
        __add_wait_queue(&ctx->wqh, &wait);	// 写进程加入等待队列
        for (res = 0;;) {
        	/* 设置阻塞状态 */
            set_current_state(TASK_INTERRUPTIBLE);
            /* 当有读进程read eventfd之后,内核计数器会减少
             * 读进程读取计数器成功后,会唤醒阻塞在evenfd上的写进程
             * 这时写进程重新被调度进入新一轮的循环检查,走到这里
             * */
            if (ULLONG_MAX - ctx->count > ucnt) {
                res = sizeof(ucnt);
                break;
            }
            if (signal_pending(current)) {
                res = -ERESTARTSYS;
                break;
            }
            spin_unlock_irq(&ctx->wqh.lock);
            /* 主动让出CPU,申请调度器调度 */
            schedule();
            spin_lock_irq(&ctx->wqh.lock);
        }
        /* 跳出循环,从等待队列中退出 */
        __remove_wait_queue(&ctx->wqh, &wait);
        /* 设置运行状态 */
        __set_current_state(TASK_RUNNING);
    }
    if (likely(res > 0)) {
    	/* 增加内核计数器 */
        ctx->count += ucnt;
        /* 如果eventfd上有阻塞的读进程,将其唤醒 */
        if (waitqueue_active(&ctx->wqh))
            wake_up_locked_poll(&ctx->wqh, POLLIN);
    }
    spin_unlock_irq(&ctx->wqh.lock);

    return res;
}
  •  
  • 在读eventfd的流程中,我们提到读完eventfd之后,会唤醒阻塞在其上的写进程,同样,在写eventfd的流程中,写完eventfd之后会唤醒阻塞在其上的读进程。但是写进程阻塞的场景比较少,它只在计数器到达最大值或者溢出的情况才出现。
上一篇:kvm vcpu run


下一篇:QEMU MIPS虚拟机