2.do_select函数分析

do_select函数的运行过程

  • 1 先把全部fd扫一遍
  • 2 如果发现有可用的fd,跳到5
  • 3 如果没有,当前进程去睡眠xx秒
  • 4 xx秒后自己醒了,或者状态变化的fd唤醒了自己,跳到1
  • 5 结束循环体,返回

核心过程

  1. poll_initwait():设置poll_wqueues->poll_table的成员变量poll_queue_proc为__pollwait函数;同时记录当前进程task_struct记在pwq结构体的polling_task。
  2. f_op->poll():会调用poll_wait(),进而执行上一步设置的方法__pollwait();
    __pollwait():设置wait->func唤醒回调函数为pollwake函数,并将poll_table_entry->wait加入等待队列
  3. poll_schedule_timeout():该进程进入带有超时的睡眠状态

之后,当其他进程就绪事件发生时便会唤醒相应等待队列上的进程。比如监控的是可写事件,则会在write()方法中调用wake_up方法唤醒相对应的等待队列上的进程,当唤醒后执行前面设置的唤醒回调函数pollwake函数。

  1. pollwake():详见pollwake函数唤醒过程分析一篇
  2. poll_freewait():当进程唤醒后,将就绪事件结果保存在fds的res_in、res_out、res_ex,然后把该等待队列从该队列头中移除。
  3. 回到core_sys_select(),将就绪事件结果拷贝到用户空间。

原理

在一个循环中对每个需要监听的设备调用它们自己的 poll 支持函数(内核最终会相应调用 poll_wait(), 把当前进程添加到相应设备的等待队列上,然后将该应用程序进程设置为睡眠状态)以使得当前进程被加入各个设备的等待队列。

若当前没有任何被监听的设备就绪,则内核进行调度(调用 schedule)让出 cpu 进入阻塞状态,超时schedule 返回时将再次循环检测是否有操作可以进行,如此反复;若有任意一个设备就绪,调用wake up 唤醒用户当前进程,select/poll 便立即返回,用户进程获得了可读或可写的fd。

do_select函数分析

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
	ktime_t expire, *to = NULL;
	struct poll_wqueues table;
	poll_table *wait;
	
	·····(Part omitted)

	rcu_read_lock();
	retval = max_select_fd(n, fds); // 
	rcu_read_unlock();

    ·····(Part omitted)

	poll_initwait(&table);
	wait = &table.pt;

    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
        wait = NULL;
        timed_out = 1;     // 如果系统调用带进来的超时时间为0,那么设置timed_out = 1,表示不阻塞,直接返回。
    }

    if (end_time && !timed_out)
       slack = estimate_accuracy(end_time); // 超时时间转换

	retval = 0; // retval用于保存已经准备好的描述符数,初始为0
	for (;;) {
		·····(Part omitted)

		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;

			in = *inp++; out = *outp++; ex = *exp++;
			all_bits = in | out | ex;
			if (all_bits == 0) {  // in、out、exp集合都无监控事件,即退出这一轮的事件检测
				i += BITS_PER_LONG;  // 一轮检测BITS_PER_LONG个bit位
				continue;
			}

			for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
				struct fd f;
				····(Part omitted)
				// 通过fd(即i),主要是获取当前用户进程中的fd对应的文件对象即socket对象,increases the reference count
				f = fdget(i); 
				if (f.file) {
					const struct file_operations *f_op; // 获取socket文件对象的驱动程序文件操作表指针
					f_op = f.file->f_op;
					mask = DEFAULT_POLLMASK;
					if (f_op->poll) { // socket文件对象,其f_op->poll对应的函数是sock_poll
						wait_key_set(wait, in, out,
							     bit, busy_flag);
		                // 在循环中对每个需要监听的设备调用它们自己的poll方法使得当前进程被加入各个设备的等待队列。
						// 事件mask:通过sock_poll函数(socket文件的poll方法)检查文件是否能够进行IO操作,返回当前设备fd的状态(如是否可读可写)
						mask = (*f_op->poll)(f.file, wait); 
					}
					fdput(f); /*Release the reference to file, that is, reduce the reference count f_count */
					/*通过判断socket的可读写状态来把socket放置到合适的返回集合中。
					如果socket可读,那么就把socket放置到可读集合中,如果socket可写,那么就放置到可写集合中*/
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit; // 如果是这个描述符可读, 置位
						retval++;      // 返回描述符个数加1
						wait->_qproc = NULL;
					}
					····(Part omitted)
					/* got something, stop busy polling */
					if (retval) {
						can_busy_loop = false;
						busy_flag = 0;

					/*
					 * only remember a returned
					 * POLL_BUSY_LOOP if we asked for it
					 */
					} else if (busy_flag & mask)
						can_busy_loop = true;

				}
			}
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
			cond_resched();
		}
		wait->_qproc = NULL;  // 避免应用进程被唤醒之后再次调用pollwait()的时候重复地调用函数__pollwait()
		/*retval保存了检测到的可操作的文件描述符的个数。如果有文件可操作或超时或收到signal,则跳出for(;;)循环系统调用结束,直接返回*/
		/* timed_out为1时,表明进程睡眠延时到期, 系统调用结束*/
		if (retval || timed_out || signal_pending(current))
			break;
		if (table.error) {
			retval = table.error;  // 返回错误码给用户进程,内核会根据错误码判断是否重新执行系统调用
			break;
		}

		/* only if found POLL_BUSY_LOOP sockets && not out of time */
		if (can_busy_loop && !need_resched()) {
			if (!busy_end) {
				busy_end = busy_loop_end_time();
				continue;
			}
			if (!busy_loop_timeout(busy_end))
				continue;
		}
		busy_flag = 0;

		/*
		 * If this is the first loop and we have a timeout
		 * given, then we convert to ktime_t and set the to
		 * pointer to the expiry value.
		 */
		if (end_time && !to) {
			expire = timespec_to_ktime(*end_time);
			to = &expire;
		}
		
       // 第一次循环中,当前用户进程从这里进入休眠,上面传下来的超时时间只是为了用在睡眠超时这里而已
       // 进程睡眠超时,poll_schedule_timeout()返回0;被唤醒时返回-EINTR
		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
					   to, slack))
			timed_out = 1;  /* 超时后,将其设置成1,方便后面退出循环返回到上层 */
	}
   
	poll_freewait(&table);  // 清理各个驱动程序的等待队列头,同时释放掉所有空出来的poll_table_page页(包含的poll_table_entry)

	return retval;
}

static int max_select_fd(unsigned long n, fd_set_bits *fds)

max_select_fd返回已在fd_set中打开且小于用户指定的最大值的fd

poll_initwait(&table)

该调用的作用:将当前用户进程放入自己的waiting queue table中,并将waiting queue添加到test table 中等待
poll_initwait函数初始化一个poll_wqueues变量table:
poll_initwait —> init_poll_funcptr(&pwq->pt, __pollwait); —> pt->qproc = qproc;
即table->pt->qproc = __pollwait,__pollwait将在驱动的poll函数里用到

void poll_initwait(struct poll_wqueues *pwq)
{
	init_poll_funcptr(&pwq->pt, __pollwait); // 注册_pollwait回调函数
	pwq->polling_task = current;  // 指定当前用户进程
	pwq->triggered = 0;
	pwq->error = 0;
	pwq->table = NULL;
	pwq->inline_index = 0;
}

sock_poll函数

是socket文件对象的poll方法(驱动程序操作),用于查询socket文件对象是否可读可写(继续调用tcp_poll才能得到)

static unsigned int sock_poll(struct file *file, poll_table *wait)
{
	unsigned int busy_flag = 0;
	struct socket *sock;

	/*
	 *      We can't return errors to poll, so it's either yes or no.
	 */
	sock = file->private_data;

	if (sk_can_busy_loop(sock->sk)) {
		/* this socket can poll_ll so tell the system call */
		busy_flag = POLL_BUSY_LOOP;

		/* once, only if requested by syscall */
		if (wait && (wait->_key & POLL_BUSY_LOOP))
			sk_busy_loop(sock->sk, 1);
	}

	return busy_flag | sock->ops->poll(file, sock, wait); // 对应 TCP 类型的socket,这个poll接口对应的是 tcp_poll() 函数
}

tcp_poll函数 - Wait for a TCP event

tcp_poll() 函数通过调用 sock_poll_wait() 函数把进程添加到socket的等待队列中,
然后检测socket是否可读写,并通过mask返回可读写的状态。
所以在 do_select() 函数中的 mask = file->f_op->poll(file, wait); 这行代码其实调用的是tcp文件对象的poll方法(驱动程序操作): tcp_poll() 函数。

unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
	unsigned int mask;
	struct sock *sk = sock->sk;
	const struct tcp_sock *tp = tcp_sk(sk);

	sock_rps_record_flow(sk);
	// sk_sleep(sk)获取socket的wait_queue_head_t等待队列头
    // 每个socket自己都带有一个等待队列socket_wq,所以“设备的等待队列”不止一个
	sock_poll_wait(file, sk_sleep(sk), wait);
	····(Part omitted)
	
	return mask;
}

sock_poll_wait

中间函数,主要作用是调用poll_wait函数,并将memory barrier置于poll_wait调用之后,确保写入完成?

static inline void sock_poll_wait(struct file *filp,
		wait_queue_head_t *wait_address, poll_table *p)
{
	if (!poll_does_not_wait(p) && wait_address) {
		poll_wait(filp, wait_address, p);   // 把文件添加到sk->sleep队列中进行等待
		/* We need to be sure we are in sync with the
		 * socket flags modification.
		 *
		 * This memory barrier is paired in the wq_has_sleeper.
		 */
		smp_mb(); // 完全内存屏障
	}
}

poll_wait函数

poll_wait 函数所做的工作是把当前用户进程添加到 wait 参数指定的等待队列中。
poll_wait 函数并不阻塞,真正的阻塞动作是上层的 select/poll 函数中完成的。

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    // poll_table在睡眠之前保证其为有效地址,而在唤醒之后保证传入的poll_table地址是NULL  
    // 因为在唤醒之后,再次调用fop->poll()的作用只是为了再次检查设备的事件状态而已  
    // 如果驱动程序中没有提供等待队列头wait_address,那么将不会往下执行p->qproc
	if (p && p->_qproc && wait_address) 
		p->_qproc(filp, wait_address, p); // 回调函数:p->qproc就是之前poll_initwait初始化poll_wqueues时注册的__pollwait
}

__pollwait函数

由__pollwait函数实际执行将用户进程插入socket的等待队列的动作
内核调用系统调用引用的所有文件的poll方法,并将相同的poll_table传递给每个文件,而poll_table的核心就是__pollwait

__pollwait()函数调用时需要3个参数,第一个是特定fd对应的file结构体指针,第二个就是特定fd对应的硬件驱动程序中的等待队列头指针,第3个是调用select()的应用进程中poll_wqueues结构体的poll_table项(该进程监测的所有fd调用fop->poll函数都用这一个poll_table结构体)。

static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); // 使用container_of求出poll_wqueues的地址  
	struct poll_table_entry *entry = poll_get_entry(pwq); // 分配一个poll_table_entry
	if (!entry)
		return;
	// 初始化一个poll_table_entry
	entry->filp = get_file(filp); // 将filp的引用计数加1,filp这里是socket文件对象
	entry->wait_address = wait_address;  // 设置来自设备驱动程序的等待队列头
	entry->key = p->_key; // 设置对该fd事件关心的mask
	init_waitqueue_func_entry(&entry->wait, pollwake);  // 初始化队列等待项,pollwake是唤醒该队列等待项调用的函数
	entry->wait.private = pwq; // 将poll_wqueues作为该等待队列项的私有数据
	add_wait_queue(wait_address, &entry->wait); // 将该等待队列项添加到等待队列头中去
}
static inline void
init_waitqueue_func_entry(wait_queue_t *q, wait_queue_func_t func)
{
	q->flags	= 0;
	q->private	= NULL;
	q->func		= func; // //设置唤醒回调函数
}
void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
	unsigned long flags;

	wait->flags &= ~WQ_FLAG_EXCLUSIVE;
	spin_lock_irqsave(&q->lock, flags);
	__add_wait_queue(q, wait);
	spin_unlock_irqrestore(&q->lock, flags);
}

static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
	list_add(&new->task_list, &head->task_list);
}
void poll_freewait(struct poll_wqueues *pwq)
{
    struct poll_table_page * p = pwq->table;
    int i;
    for (i = 0; i < pwq->inline_index; i++)
        free_poll_entry(pwq->inline_entries + i);
    while (p) {
        struct poll_table_entry * entry;
        struct poll_table_page *old;

        entry = p->entry;
        do {
            entry--;
            free_poll_entry(entry);
        } while (entry > p->entries);
        old = p;
        p = p->next;
        free_page((unsigned long) old);
    }
}

static void free_poll_entry(struct poll_table_entry *entry)
{
    //从等待队列中移除wait
    remove_wait_queue(entry->wait_address, &entry->wait);
    fput(entry->filp);
}

poll系统调用的内部数据结构

1. poll_wqueues

每一个select系统调用只有一个 poll_wqueues,记录相关I/O设备的等待队列 ,对外接口是 poll_table pt 的地址
当select()退出循环体返回时,它要把当前进程从全部等待队列中移除——这些设备再也不用着去唤醒当前队列了。

struct poll_wqueues {
	poll_table pt;  //包含一个函数指针,通常指向__pollwait或null
	struct poll_table_page *table;  // 如果inline_entries空间不够用,后续会动态申请物理内存页以链表的形式挂载poll_wqueues.table上统一管理
	struct task_struct *polling_task;  // 指向当前用户进程
	int triggered;  // 当前用户进程被唤醒后置成1,以免该进程接着进睡眠
	int error;  // 错误码
	int inline_index;  // 数组inline_entries的引用下标
	struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; // 内嵌的poll_table_entry数组inline_entries[] 的大小有限
};

对每一个fd调用fop->poll() -> poll_wait() -> __pollwait()都会先从poll_wqueues. inline_entries[]中分配一个poll_table_entry结构体,直到该数组用完才会分配物理页挂在链表指针poll_wqueues.table上, 然后才会分配一个poll_table_entry结构体

2. poll_table

poll_table 结构就是为了把进程添加到socket的等待队列中而创造的
poll_table 会被传递给所有的文件对象,最终文件对象的驱动程序操作 poll方法 会调用poll_table指向的__pollwait函数

typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
typedef struct poll_table_struct {
	poll_queue_proc _qproc;  // 函数指针,指向__pollwait函数
	unsigned long _key; // // 等待特定fd对应硬件设备的事件掩码,如POLLIN、POLLOUT、POLLERR; wait_key_set函数设置_key
} poll_table;
3. poll_table_page 和 poll_table_entry
// poll_table_page是一个包含poll_table_entry结构内存页链表,每一个page占1个PAGE_SIZE大小的区域
struct poll_table_page {  // 申请的物理页都会将起始地址强制转换成该结构体指针 
	struct poll_table_page * next;  // 指向下一个申请的物理页
	struct poll_table_entry * entry;  // 指向entries[]中首个待分配(空的) poll_table_entry地址
	struct poll_table_entry entries[0];  // 该page页后面剩余的空间都是待分配的
};
// 轮询表条目,分配给特定的select调用
struct poll_table_entry {
	struct file *filp;  // 指向特定fd对应的file文件对象
	unsigned long key;  // 等待特定fd对应硬件设备的事件掩码,如POLLIN、POLLOUT、POLLERR;
	wait_queue_t wait;    // 代表调用select()的应用进程,等待在fd对应设备的特定事件(读或者写)的等待队列头上的等待队列项;
	wait_queue_head_t *wait_address;  // 设备驱动程序中特定事件的等待队列头(等待队列有多个wait_queue_t组成,通过双链表连接)
};

特定的硬件设备驱动程序的事件等待队列头是有限个数的,通常有读事件和写事件的等待队列头;
应用程序可以有多个fd在进行同时监测其各自的事件发生,但该应用进程中每一个fd有多少个poll_table_entry存在,那就取决于fd对应的驱动程序中有几个事件等待队列头了
通常驱动程序的poll函数中需要对每一个事件的等待队列头调用poll_wait()函数。比如,如果有读写两个等待队列头,那么就在这个应用进程中存在两个poll_table_entry结构体,在这两个事件的等待队列头中分别将两个等待队列项加入
如果有多个应用进程使用selcet()方式同时在访问同一个硬件设备,此时硬件驱动程序中加入等待队列头中的等待队列项对每一个应用程序来说都是相同数量的(一个事件等待队列头一个,数量取决于事件等待队列头的个数)

4. wait_queue_t和wait_queue_head_t

等待队列表示一组睡眠的进程,当某一条件为真时,由内核唤醒它们
因为进程经常需要等待某event发生,等待队列实现了在event上的条件等待:
希望等待特定event的进程将自己放入合适的等待队列,并放弃控制权,进入睡眠。

等待队列由循环链表实现,由等待队列头(wait_queue_head_t)和等待队列项(wait_queue_t)组成,其元素(等待队列项)包含指向进程描述符的指针

当一个进程需要在某个wait_queue_head_t上睡眠时,将自己的进程控制块信息封装到wait_queue_t中,然后挂载到wait_queue_t的链表中,如下图:
2.do_select函数分析

typedef struct __wait_queue wait_queue_t;
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);
int default_wake_function(wait_queue_t *wait, unsigned mode, int flags, void *key);

// 等待队列项
// wait_queue_t 是sleeping processe的等待队列项,用于引用进程自身
struct __wait_queue {
	unsigned int		flags;  //prepare_to_wait()里有对flags的操作,查看以得出其含义
#define WQ_FLAG_EXCLUSIVE	0x01 //一个常数,在prepare_to_wait()用于修改flags的值
	void			*private;   //通常指向当前任务控制块
	wait_queue_func_t	func;   //唤醒阻塞任务的函数 ,决定了唤醒的方式
	struct list_head	task_list; // 阻塞任务链表
};

每个等待队列都有一个等待队列头(wait queue head),使用等待队列时首先需要定义一个wait_queue_head,这可以通过DECLARE_WAIT_QUEUE_HEAD宏来完成,这是静态定义的方法。该宏会定义一个wait_queue_head,并且初始化结构中的锁以及等待队列

// 等待队列头
// 该结构体变量通常由interruptible_sleep_on分配在stack上,
struct __wait_queue_head {
	spinlock_t		lock;   //自旋锁变量,用于等待队列头 
	struct list_head	task_list; // a linked list of sleeping processes,list中每个item的类型是wait_queue_t
};
typedef struct __wait_queue_head wait_queue_head_t;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PLQSjEvc-1608088685625)(https://i.stack.imgur.com/LNXqH.gif)]

参考资料

  1. wait_queue_head and wait_queue
  2. LDD
上一篇:kafka Poll轮询机制与消费者组的重平衡分区策略剖析


下一篇:【译】理解Rust中的Futures(二)