文章目录
eventfd可以用于线程或者父子进程间通信,内核通过eventfd也可以向用户空间进程发消息。其核心实现是在内核空间维护一个计数器,向用户空间暴露一个与之关联的匿名fd。不同线程通过读写该fd通知或等待对方,内核通过写该fd通知用户程序
eventfd用法
- eventfd机制接口简单,核心只有4个,分别是创建eventfd(eventfd),写eventfd(write),读eventfd(read),监听eventfd(poll/select)。
-
int eventfd(unsigned int initval, int flags)
:创建一个eventfd,它的返回值是一个文件fd,可以读写。该接口传入一个初始值initval用于内核初始化计数器,flags用于控制返回的eventfd的read行为。flags如果包含EFD_NONBLOCK
,read eventfd将不会阻塞,如果包含EFD_SEMAPHORE
,read eventfd每次读之后内核计数器都减1。 -
ssize_t write(int fd, const void *buf, size_t count)
:写eventfd,传入一个8字节的buffer,buffer的值增加到内核维护的计数器中。 -
ssize_t read(int fd, void *buf, size_t count)
:读eventfd,如果计数器非0,信号量方式返回1,否则返回计数器的值。如果计数器为0,读失败,阻塞模式下会阻塞直到计数器非0,非阻塞模式下返回EAGAIN错误。 -
int poll(struct pollfd *fds, nfds_t nfds, int timeout)
:监听eventfd是否可读。
demo
- 代码
#include <stdio.h>
#include <stdlib.h>
#include <sys/eventfd.h>
#include <pthread.h>
#include <unistd.h>
int efd;
void *threadFunc()
{
uint64_t buffer;
int rc;
int i = 0;
while(i++ < 2){
/* 如果计数器非0,read成功,buffer返回计数器值。成功后有两种行为:信号量方式计数器每次减,其它每次清0。
* 如果计数器0,read失败,由两种返回方式:EFD_NONBLOCK方式会阻塞,反之返回EAGAIN
*/
rc = read(efd, &buffer, sizeof(buffer));
if (rc == 8) {
printf("notify success, eventfd counter = %lu\n", buffer);
} else {
perror("read");
}
}
}
static void
open_eventfd(unsigned int initval, int flags)
{
efd = eventfd(initval, flags);
if (efd == -1) {
perror("eventfd");
}
}
static void
close_eventfd(int fd)
{
close(fd);
}
/* counter表示写eventfd的次数,每次写入值为2 */
static void test(int counter)
{
int rc;
pthread_t tid;
void *status;
int i = 0;
uint64_t buf = 2;
/* create thread */
if(pthread_create(&tid, NULL, threadFunc, NULL) < 0){
perror("pthread_create");
}
while(i++ < counter){
rc = write(efd, &buf, sizeof(buf));
printf("signal to subscriber success, value = %lu\n", buf);
if(rc != 8){
perror("write");
}
sleep(2);
}
pthread_join(tid, &status);
}
int main()
{
unsigned int initval;
printf("NON-SEMAPHORE BLOCK way\n");
/* 初始值为4, flags为0,默认blocking方式读取eventfd */
initval = 4;
open_eventfd(initval, 0);
printf("init counter = %lu\n", initval);
test(2);
close_eventfd(efd);
printf("change to SEMAPHORE way\n");
/* 初始值为4, 信号量方式维护counter */
initval = 4;
open_eventfd(initval, EFD_SEMAPHORE);
printf("init counter = %lu\n", initval);
test(2);
close_eventfd(efd);
printf("change to NONBLOCK way\n");
/* 初始值为4, NONBLOCK方式读eventfd */
initval = 4;
open_eventfd(initval, EFD_NONBLOCK);
printf("init counter = %lu\n", initval);
test(2);
close_eventfd(efd);
return 0;
}
分析
- demo中创建eventfd使用了三种方式,分别如下:
- 阻塞非信号量:以非信号量方式创建的eventfd,在读eventfd之后,内核的计数器归零,下一次再读就会阻塞,除非有进程再次写eventfd。
内核计数器初始值为4,主线程第1次写入2,计数器增至6
读线程返回6,之后计数器清0,读线程阻塞
下一次主线程写入2,计数器增至2,读线程返回2 - 阻塞信号量:以信号量方式创建的eventfd,在读eventfd之后,内核的计数器减1
内核计数器初始值为4,主线程第一次写入2,计数器增至6
读线程返回1,计数器减1变成5,读线程循环读返回1,计数器再减1变成4
主线程写入2计数器增至6 - 非阻塞非信号量:读eventfd之后,计数器清0,再次读eventfd返回EAGAIN
内核计数器初始值为4,主线程第一次写入2,计数器增至6
读线程返回6,计数器清0,读线程循环非阻塞读返回错误码EAGAIN
主线程写入2计数器增至2
- demo阻塞读模式下,信号量和非信号量方式如下图所示:
eventfd内核实现
创建eventfd
系统调用
- eventfd系统调用有三个步骤:
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)
{
......
get_unused_fd_flags(flags & EFD_SHARED_FCNTL_FLAGS); /* 1 */
file = eventfd_file_create(count, flags); /* 2 */
fd_install(fd, file); /* 3 */
......
}
1. 从进程的文件描述符表(fdt)中查询可用的fd
2. 在匿名节点文件系统(anon_inodefs)中创建一个文件结构体
3. 安装文件描述符,将fd和file关联起来
eventfd_ctx
- eventfd_ctx结构体是eventfd实现的核心,如下:
struct eventfd_ctx {
struct kref kref;
wait_queue_head_t wqh;
/*
* Every time that a write(2) is performed on an eventfd, the
* value of the __u64 being written is added to "count" and a
* wakeup is performed on "wqh". A read(2) will return the "count"
* value to userspace, and will reset "count" to zero. The kernel
* side eventfd_signal() also, adds to the "count" counter and
* issue a wakeup.
*/
__u64 count;
unsigned int flags;
};
- wqh:等待队列头,所有阻塞在eventfd上的读进程挂在该等待队列上
- count:eventfd计数器,当用户程序write eventfd时内核会将值加在计数器上,用户程序read eventfd之后,内核会将值减1或者清0(由EFD_SEMAPHORE标志决定),当计数器为0时,内核会将read进程挂载等待队列头wqh指向的队列上。
两种方式可以唤醒等待在eventfd上的进程,一个是用户态write,另一个是内核态的eventfd_signal。从这里可以看出eventfd不仅可以用于用户进程相互通信,也可以用作内核通知用户进程的手段。 - flags:决定用户read后内核的处理方式,
EFD_SEMAPHORE,EFD_CLOEXEC,EFD_NONBLOCK
三个取值
- eventfd_ctx的初始化在eventfd_file_create中实现,如下:
struct file *eventfd_file_create(unsigned int count, int flags)
{
struct file *file;
struct eventfd_ctx *ctx;
......
ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
kref_init(&ctx->kref);
init_waitqueue_head(&ctx->wqh); /* 1 */
ctx->count = count;
ctx->flags = flags;
file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx, /* 2 */
O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS));
......
return file;
}
- eventfd的file操作由eventfd_fops实现,如下:
static const struct file_operations eventfd_fops = {
......
.read = eventfd_read,
.write = eventfd_write,
......
};
读eventfd
- 用户进程读eventfd时,最终会进入 eventfd_read,如下:
static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
......
res = eventfd_ctx_read(ctx, file->f_flags & O_NONBLOCK, &cnt); /* 如果count非0,将其放在cnt中返回给用户进程 */
......
}
- eventfd_read的核心是eventfd_ctx_read,看一下它的说明和实现:
/**
* eventfd_ctx_read - Reads the eventfd counter or wait if it is zero.
* @ctx: [in] Pointer to eventfd context.
* @no_wait: [in] Different from zero if the operation should not block.
* @cnt: [out] Pointer to the 64-bit counter value.
*
* Returns %0 if successful, or the following error codes:
*
* - -EAGAIN : The operation would have blocked but @no_wait was non-zero.
* - -ERESTARTSYS : A signal interrupted the wait operation.
*
* If @no_wait is zero, the function might sleep until the eventfd internal
* counter becomes greater than zero.
*/
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)
{
ssize_t res;
/* 声明一个等待队列项并关联本进程的task
* 等待队列关联的函数为default_wake_function,作为唤醒函数存在
*/
DECLARE_WAITQUEUE(wait, current);
spin_lock_irq(&ctx->wqh.lock);
*cnt = 0;
res = -EAGAIN;
/* 如果count大于0,读进程不阻塞 */
if (ctx->count > 0)
res = 0;
else if (!no_wait) {
__add_wait_queue(&ctx->wqh, &wait); /* 将等待队列项添加到eventfd的等待队列头中 */
for (;;) {
/* 设置阻塞状态 */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果count大于0,break,这种情况在第一次循环之后可能发生
* eventfd的写进程在写完计数器后,会唤醒阻塞在eventfd上的读进程
* 这时读进程重新被调度器调度,进入新一轮的循环,来到这里,重新检查内核计数器
*/
if (ctx->count > 0) {
res = 0;
break;
}
/* 如果有未处理的信号,也break,进行处理 */
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
/* 主动请求调度,当前进程被挂起 */
schedule();
/* 挂起的进程重新运行 */
spin_lock_irq(&ctx->wqh.lock);
}
/* 从循环中跳出,将当前进程从eventfd的等待队列中删除 */
__remove_wait_queue(&ctx->wqh, &wait);
/* 设置运行状态 */
__set_current_state(TASK_RUNNING);
}
if (likely(res == 0)) {
/* 读取counter */
eventfd_ctx_do_read(ctx, cnt);
/* 如果eventfd上有阻塞的写进程,将其唤醒 */
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLOUT);
}
spin_unlock_irq(&ctx->wqh.lock);
return res;
}
- eventfd_ctx_read函数中有一个地方要注意,当执行eventfd_ctx_do_read函数之后,表示真正获取到了计数器的值,这之后会检查阻塞在eventfd上的写进程,因为写进程可能会因为计数器超过范围而阻塞,读进程完成之后计数器值会变小,这个时候可以唤醒写进程重新检查写入的值是否超过范围。这个在写eventfd中会介绍。
- 看一下真正获取内核计数器的函数:
static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
{
/* 如果是信号量方式,返回给用户进程的counter始终是1,之后内核counter减1
* 如果是非信号量方式,返回给用户进程的counter就是内核维护的counter
* 之后内核counter清0
*/
*cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;
ctx->count -= *cnt;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
写eventfd
- 从eventfd_ctx 的数据结构解释中我们了解到,写eventfd有两种场景,一是用户态写,二是内核态写。这两种场景分别用于实现用户态通知和内核态的通知。这节主要介绍用户态发起的eventfd写操作。
- 写eventfd的主要动作是往计数器加用户态传入的值,有一种情况会阻塞写进程,就是计数器值到达或者超出范围。这时写进程阻塞直到计数器在正常范围内,注意,用户态写eventfd也不允许到达计数器的最大值,因为这个最大值要为内核态的写保留,后面的内核态写会介绍
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 ucnt;
/* 声明等待队列项,default_wake_function为唤醒函数 */
DECLARE_WAITQUEUE(wait, current);
/* 用户态传入的buffer长度必须>=8字节,否则返回错误 */
if (count < sizeof(ucnt))
return -EINVAL;
if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
return -EFAULT;
/* 如果写入的值等于64bit所能表示的最大值,返回错误 */
if (ucnt == ULLONG_MAX)
return -EINVAL;
spin_lock_irq(&ctx->wqh.lock);
res = -EAGAIN;
/* 写入的ucnt后内核计数器不会溢出 */
if (ULLONG_MAX - ctx->count > ucnt)
res = sizeof(ucnt);
/* 写入后内核计数器溢出,并且是阻塞方式打开的eventfd */
else if (!(file->f_flags & O_NONBLOCK)) {
__add_wait_queue(&ctx->wqh, &wait); // 写进程加入等待队列
for (res = 0;;) {
/* 设置阻塞状态 */
set_current_state(TASK_INTERRUPTIBLE);
/* 当有读进程read eventfd之后,内核计数器会减少
* 读进程读取计数器成功后,会唤醒阻塞在evenfd上的写进程
* 这时写进程重新被调度进入新一轮的循环检查,走到这里
* */
if (ULLONG_MAX - ctx->count > ucnt) {
res = sizeof(ucnt);
break;
}
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
/* 主动让出CPU,申请调度器调度 */
schedule();
spin_lock_irq(&ctx->wqh.lock);
}
/* 跳出循环,从等待队列中退出 */
__remove_wait_queue(&ctx->wqh, &wait);
/* 设置运行状态 */
__set_current_state(TASK_RUNNING);
}
if (likely(res > 0)) {
/* 增加内核计数器 */
ctx->count += ucnt;
/* 如果eventfd上有阻塞的读进程,将其唤醒 */
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLIN);
}
spin_unlock_irq(&ctx->wqh.lock);
return res;
}
- 在读eventfd的流程中,我们提到读完eventfd之后,会唤醒阻塞在其上的写进程,同样,在写eventfd的流程中,写完eventfd之后会唤醒阻塞在其上的读进程。但是写进程阻塞的场景比较少,它只在计数器到达最大值或者溢出的情况才出现。