《linux kernel develpoment》第八章

《linux内核设计与实现》

下半部和退后执行的工作

1、下半部

下半部的任务就是执行与中断处理密切相关但中断处理程序本身不执行的工作,理想情况下是所有工作都给下半部,即想让上半部尽快结束,因为在上半部中会锁定中断线,因此能够越快越好。但大部分时候是没办法达到理想情况的,因为他需要做一些工作,如需要操作硬件对中断的到达来确认或者从硬件拷贝数据,这些都对于时间较为敏感,因此需要上半部完成,而下半部主要对数据进行处理分析。
以下几点有助于区分上下部:
1、时间较为敏感,放在中断处理程序
2、与硬件相关,放在中断处理程序
3、某个任务不可被其他中断打断,则放在中断处理程序
4、其他所有的任务放在下半部执行

1.1 为什么要用下半部

总而言之就是要让上半部做的工作尽可能少,从而使得中断处理程序尽快返回退出。

1.2 下半部环境

1、“下半部”起源
最早的linux只提供“bottom half”(BH)机制来实现下半部,但是该机制只是将工作推迟进行。其提供了一个静态创建、32个bottom halves组成的链表,上半部通过一个32位整数中的一位来标识出哪个BH可以执行。每个BH在全局范围内进行同步,但即使处于不同的处理器也不允许两个BH同时执行。该方法虽然简单但却性能不行。
2、任务队列
当驱动程序可以把他们下半部的注册到合适的队列中去,这些函数会在某个时刻执行。虽然较BH好,但是不够灵活,对于一些对性能较高的子系统如网络,不能胜任。
3、软中断和tasklet
在后面的版本中软中断和tasklet完全可以代替BH接口。
软中断是一组静态定义的下半部接口,有32个,可以在所有的处理器上同时执行,及时两个类型相同也可以。
tasklet是一个基于软中断实现的灵活性强、动态创建的下半部实现机制。不同类型的tasklet可以在不同处理器上同时执行,但类型相同的tasklet不能够同时执行。
一般而言,像网络这种对性能要求高的可以使用软中断,而其他的tasklet基本可以满足需求。

在2.5版本后的内核中,BH被完全遗弃了,此后任务队列也被工作队列所取代,在后面的版本中只剩下工作队列、软中断和tasklet。

2、软中断

2.1 软中断的实现

软中断是在编译期间静态分配的,而tasklet是可以动态注册和注销。软中断由softirq_action结构表示,定义在"linux/interrupt.h"中。

struct softirq_action { 
void (*action)(struct softirq_action *);
};
//在kernel/softirq.c中定义了一个包含32个该结构体的数组
static struct softirq_action softirq_vec[NR_SOFTIRQS];

有上述结构体数组大小可知,最多可能有32个软中断,这是一个定值。
1、软中断处理程序

//软中断处理程序action函数原型
void softirq_handler(struct softirq_action *)
//当内核运行一个软中断处理中断程序的时候,会执行action函数,其唯一的参数是指向相应的softirq_action结构体指针
my_softirq->action(my_softirq);

一个软中断不会抢占另外一个软中断,唯一可以抢占软中断的是中断处理程序。
2、执行软中断
一个注册的软中断只有被标记后才可以执行,成为触发软中断,该标记会在中断处理程序返回之前完成,以保证中断处理程序结束后该程序会被继续执行。
在以下情况软中断会被执行:
1、从一个硬件中断代码出返回时
2、在softirqd内核线程中
3、在显示检查和执行待处理的软中断代码中,如网络子系统中
不管是由什么办法换唤起,最终都会在do_softirq中执行,该函数会循环遍历每一个,调用处理他们的函数,其简化后核心部分如下:

	u32 pending;
	pending = local_softirq_pending(); 
	if (pending) {
		struct softirq_action *h;
		/* reset the pending bitmask */ 
		set_softirq_pending(0);
		h = softirq_vec; 
		do {
			if (pending & 1) 
				h->action(h);
			h++; 
			pending >>= 1;
		} while (pending); 
	}

上述代码中:
1)用局部变量pending保存待处理软中断32位未处理图,若第n位被置为1则表示第n位对应的软中断等待处理。
2)如果待处理软中断位图已经被保存,则将其清零。
3)用h之指针指向softirq_vec第一项
4)pending第一位被置为1,则h->action(h)被调用
5)指针加1,则指向下一项
6)位掩码右移一位,丢弃掉已经处理的第一位
7)重复以上步骤,直到pending为0,则表示没有可处理的软中断了。

2.2 使用软中断

1、分配索引
目前只有两个子系统网络和SCSI在使用软中断,如果我们的任务对于时间要求较高且自己能够高效地完成加锁的应用,再考虑用软中断。
想要建立一个软中断必须在<linux/interrupt.h>中定义的一个枚举类型来静态地声明软中断
《linux kernel develpoment》第八章
2、注册处理程序
在运行时通过以下两个函数注册软中断处理程序,该函数有两个参数:软中断的索引号和处理函数。

open_softirq(NET_TX_SOFTIRQ, net_tx_action); 
open_softirq(NET_RX_SOFTIRQ, net_rx_action);

在处理软中断的时候,可以被中断处理程序打断,但是不能休眠。如果同个软中断在他被执行的同时再次被其他处理器触发了,那么就以为着这两个软中断需要共享数据,这个时候就需要注意锁的使用。而这也是tasklet备受青睐的原因之一。
3、触发软中断
同构在枚举类型的列表中添加新的项以及调用open_softirq()注册后,新的软中断程序即可运行。raise_softirq函数可以将一个软中断设置为挂起状态,使其能够在下次调用do_softirq函数时运行。

raise_softirq(NET_TX_SOFTIRQ);

以上的例子中会触发NET_TX_SOFTIRQ软中断,其处理程序net_txaction会在内核下一次执行软中断时投入运行。如果中断本来就被禁止了,则可调用raise_softirq_irqoff函数。

3、tasklet

3.1 tasklet的实现

tasklet实际上是基于软中断实现的,其有两个中断代表:HI_SOFTIRQ和TASKLET_SOFTIRQ,两者唯一的区别在于HI_SOFTIRQ类型的软中断优先于TASKLET_SOFTIRQ的类型软中断执行。
1、tasklet结构体
tasklet有结构体tasklet_struct结构表示,每个结构体单独表示tasklet,在"linux/interrupt.h"

struct tasklet_struct { 
	struct tasklet_struct *next; /* next tasklet in the list */ 
	unsigned long state; /* state of the tasklet */ 
	atomic_t count; /* reference counter */ 
	void (*func)(unsigned long); /* tasklet handler function */ 
	unsigned long data; /* argument to the tasklet function */
};

结构体中的func成员即tasklet的处理函数,data是唯一的参数。
state只能取值为:0、TASKLET_STATE_SCHED和TASKLET_STATE_RUN之间取值。其中TASKLET_STATE_SCHED表示已经被调度,准备投入运行,TASKLET_STATE_RUN表示正在运行。
count是一个引用计数器,若不为0,则tasklet会被禁止,不允许执行;只有为0时才被挂起。
2、调度tasklet
已调度的tasklet存放在tasklet_vec(普通tasklet)和tasklet_hi_vec(高优先级的tasklet),这两个是两个结构体链表存放的都是tasklet结构体。
tasklet由tasklet_schedule和tasklet_hi_schedule两个函数调度。
tashklet_schedule执行步骤:
1)检查tasklet的state是否为TASKLET_STATE_SCHED状态
2)调用_tasklet_schedule
3)保存中断状态,禁止本地中断
4)将需要调度的tasklet加到每个处理器上的tasklet_vec和tasklet_hi_vec链表头
5)唤起TASKLET_SOFTIRQ和HI_SOFTIRQ软中断,在下次调用do_softirq时就会执行该tasklet
6)恢复中断原状态并返回
通过tasklet_schedule将tasklet设置成TASKLET_STATES_SCHED状态,当中断返回之后,似乎便是一个最佳的调度时刻,此时do_softirq()便会调用相应的处理函数,即tasklet_achtion和tasklet_hi_action,这两个函数执行步骤如下:
1)禁止中断,并设置当前处理器检索tasklet_vsc和tasklet_hig_vec链表
2)将当前处理器上的链表设置为NULL,达到清空的效果
3)允许响应中断
4)循环遍历链表上的每一个待处理的tasklet
5)如果是多处理系统,通过检查TASKLET_STATE_RUN来判断tasklet是否正在其他处理器上执行,如果是则不执行,跳到下一个tasklet
6)如果tasklet没有被执行,则设置其状态为TASKLET_STATE_RUN
7)检查count是否为0,如果非0,则该tasklet被禁止,则跳到下一个
8)执行对应的tasklet处理程序
9)tasklet运行完毕,清楚state域的TASKLET_STATE_RUN值
10)重复执行下一个tasklet,直到没有剩余的tasklet

3.2 使用tasklet

1、声明自己的tasklet
有两种方式:

1)静态创建一个tasklet:
DECLARE_TASKLET(name,func,data)
DECLARE_TASKLET_DISABLED(name,func,data);

以上两个函数创建的区别在于设置引用计数器的初始值不同,第一个设置引用计数器为0,第二个设置为1。
如DECLARE_TASKLET(my_tasklet,my_tasklet_handler,dev);等价于
struct tasklet_struct my_tasklet = {NULL,0,ATOMIC_INIT(0),my_tasklet_handler,dev};
2)动态创建

static tasklet_init(tasklet,tasklet_handler,dev);

2、编写自己的处理程序:
声明一个处理函数:

void tasklet_handler(unsigned long data);

3、调度tasklet

tasklet_schedule(&my_tasklet);

以上函数my_tasklet标记为挂起,只要被挂起了,在有机会的时候就会被尽可能早地运行。

//禁止tasklet
tasklet_disbale(&my_tasklet);
//激活tasklet
tasklet_enable(&my_tasklet);
//从挂起的队列中去掉一个tasklet
tasklet_kill(&my_tasklet);

4、ksoftirqd
当内核中出现大量软中断的时候,内核进程就会启动辅助处理软中断(和tasklet)去处理它们。
对于软中断而言,内核会选择在几个特殊时机处理,而软中断被触发有时候的频率很高,类似于像网络通信的的期间频率很高,这个时候,可能软中断会一直被触发,且软中断本身有将自己重新设置为可执行状态的能力,而导致用户进程无法获得足够的处理器时间。
为了解决以上问题有以下两种方法:
1)当在执行软中断时候,有新的软中断发生时,会一并处理完再退出处理软中断。
这样子做只有系统处于低负载的情况下,该方案才有理想的运行效果。
2)选择不处理重新出发的软中断。在聪中断返回的时候,内核会检查所有挂起的软中断并且处理,但是任何自行重复触发的软中断不会马上处理,会等到下一个软中断执行的时候去处理。
这种方案虽保证了用户空间不会处理饥饿状态,却使得软中断可能处于饥饿状态。

为了处理以上的解决方案所出现的问题,提出了一种软中断处理线程,即当大量软中断出现的时候,内核会唤醒一组内核线程来处理这些负载,其nice=19,即优先级最低,不会影响其他线程的运行。在每个处理器上都会有这样的一个线程,其名字都叫做ksoftirq/n,其中n代表的是第n个处理器。当该线程被初始化启动后就会执行以下的操作:

for (;;) { 
	if (!softirq_pending(cpu))
		schedule();
	set_current_state(TASK_RUNNING);
	while (softirq_pending(cpu)) { 
		do_softirq(); 
		if (need_resched()) 
			schedule();
	}
	set_current_state(TASK_INTERRUPTIBLE); 
}

当有待处理的软中断时,即通过softirq_pending函数去发现,就会通过ksoftirq去调用do_softirq去处理。通过该函数循环去执行,重新触发的软中断也会被发现。通过调用schedule让重要的进程得到调度机会。当所有的操作执行后就会将进程状态设置为TASK_INTERRUPTIBLE以便唤起调度程序选择其他进程投入运行。

4、工作队列

工作队列(work queue)是另外一种将工作推后执行的形式,与我们前面的形式都不相同,其是将工作交给一个内核线程去执行,这个下半部分总是会在进程上下文中执行。这样子的工作队列的任务能够拥有上下文的共同的优势,且工作队列允许重新调度甚至睡眠。因此如果我们的任务需要睡眠则选择工作队列,反之则可以选择软中断或者tasklet。

4.1 工作队列的实现

工作队列子系统是一个用于创建内核线程的接口,他创建的线程成为工作者线程,工作队列通过创建一个专门的工作者线程来处理需要推后的工作。
缺省的工作者线程叫做event/n,n代表处理器编号。每个处理器对应一个这样的线程。
1、表示线程的数据结构
工作者线程由workqueue_struct表示:

struct workqueue_struct { 
	struct cpu_workqueue_struct cpu_wq[NR_CPUS]; 
	struct list_head list; 
	const char *name; 
	int singlethread; 
	int freezeable; 
	int rt;
};

由于每个处理器都有一个工作者线程,这就意味着每个处理器对应一个cpu_workqueue_struct结构体,其定义域kernel/workqueue.c中。

struct cpu_workqueue_struct { 
	spinlock_t lock; /* 锁保护这个结构 */
	struct list_head worklist; /* list of work */ 
	wait_queue_head_t more_work; 
	struct work_struct *current_struct;
	struct workqueue_struct *wq; /* 关联工作队列结构 */
	task_t *thread; /* 关联线程 */
};

2、表示工作的数据结构
该结构体定义在linux/workqueue.h中

struct work_struct { 
	atomic_long_t data; 
	struct list_head entry; 
	work_func_t func;	//被推后的任务的具体执行函数
};

每个工作者线程都是由普通的内核线程实现的,都要执行worker_thread函数,当工作队列没有对象时,该函数阻塞,若有操作插入队列后,则开始执行操作。
每个处理器上每种类型的队列对应这样的一个链表,当有操作时,对应的该结构体work_struct对象就会被添加到该链表上。以下是workr_thread函数的核心操作:

for (;;) {
	//将自己设置为休眠状态,且设置为TASK_INTERRUPTIBLE,并把自己加入到等待队列中
	prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 
	//当队列为空时则调用schedule调度函数然后开始睡眠
	if (list_empty(&cwq->worklist))
		schedule(); 
	//工作队列中有新的对象加入时则结束睡眠
	finish_wait(&cwq->more_work, &wait);
	//执行被推后的工作 
	run_workqueue(cwq);
}

run_workqueue用于完成被上半部推后的工作

while (!list_empty(&cwq->worklist)) { 
	//当链表不为空,则选取下一个节点对象
	struct work_struct *work; 
	//获取我们希望执行的函数func和参数data
	work_func_t f; 
	void *data;
	//把节点从链表上摘取下来,并且将待处理标志位清零
	work = list_entry(cwq->worklist.next, struct work_struct, entry); 
	f = work->func; 
	list_del_init(cwq->worklist.next); 
	work_clear_pending(work); 
	//调用函数
	f(work);
}

3、工作队列实现机制总结
由下图可知,最高一层是工作者线程。工作类型可以有多种,如内核默认是event类型,每个CPU上都有一个该类的工作者线程,而每个工作者线程都由一个cpu_workequeue_struct结构体表示,而workqueue_struct结构体则表示给定类型的所有工作者线程。
如我们多定义了一个falcon工作者类型,而内核默认有event类型,且工作在四核CPU上,则有四个event类型线程(包括四个cpu_workqueue_struct)和四个falcon类型线程(包括四个cpu_workqueue_struct),同时还有一个event类型的workqueue_struct和falcon类型的workqueue_struct。
《linux kernel develpoment》第八章

4.2 使用工作队列

1、创建推后的工作

//静态创建推后的工作
//静态创建一个名为name的处理函数为func,参数为data的work_struct结构体
DECLARE_WORK(name, void (*func)(void *), void *data);
//动态创建推后的工作
//动态创建并初始化一个指向的工作的work func为处理函数 data为传入参数
INIT_WORK(struct work_struct *work, void (*func)(void *), void *data);

2、工作队列处理函数

void work_handler(void *data)

该函数会由一个工作者线程执行,因此会运行于一个进程上下文中,可以响应中断且不持有任何锁,可以休眠,不可访问用户空间。
3、对工作进行调度
当我们工作被创建之后可以调用schedule_work(&work)去调度该工作,此时work马上就会被调度。
若不想被马上执行,则可以调用schedule_delayed_work(&work,delay)去延时一段时间执行
4、刷新操作
排入队列的工作会在工作者线程下一次被唤醒的实话执行,在下一步之前,必须保证一些操作已经完成,如在卸载之前,就需要确保一下我们指定的工作已经处理完毕,因此需要调用以下函数:
void flush_scheduled_work(void);
函数会一直地等待,知道队列所有对象都执行完毕才返回。若想取消掉因为延迟而被挂起的任务,可以调用以下函数:
int cancle_delayed_work(struct work_struct *work);
5、创建新的工作队列
如果缺省队列无法满足我们的需要,则需要创建一个新的工作队列和与之相应的工作者线程,这样子会在每个处理器上都创建一个工作者线程。

//name参数用于该内核线程的命名
struct workqueue_struct *create_workqueue(const char *name);
//下面两个函数与schedule_work相似,区别就在于他是针对指定的队列而非缺省队列
int queue_work(struct workqueue_struct *wq,struct work_struct *work);
int queue_delayed_work(struct workqueue_struct *wq,struct work_strcut *work,unsigned long delay)
//刷新指定的工作队列
flush_workqueue(struct workqueue_struct *wq)

5、下半部机制的选择

简单地说我们在选择做驱动程序的时候需要有两种选择:
1、考虑是否需要休眠,如果需要则可以选择工作队列不然就是用tasklet
2、是否一定有注重性能且自己在资源共享这方面锁已经做得很好了就可以使用软中断

6、在下半部之间加锁

1、在tasklet中,会负责执行序列化的保障,即两个类型相同的tasklet不允许同时执行,即使在不同的处理器上也不行,这就意味着我们不必担心线程同步的问题,即加不加锁。
2、而对于软中断,我们需要加上锁,以保证数据读写不会冲突,但也要防止死锁的出现。

7、禁止下半部

在编写驱动程序的时候,为了保证共享数据的安全,这个时候需要通过锁去保证,一般做法为获取得到一个锁后,然后禁止所有下半部的执行,接口如下:
《linux kernel develpoment》第八章
注意一次disable要对应一次enable,即当我们disable了3次后,只有enable四次,下半部全部才会被重新激活。
这两个函数通过preempt_count为每个进程维护一个计数器,当计数器为0的时候,下半部才能被处理。
以下是这两个函数近似的表示:

/*
* disable local bottom halves by incrementing the preempt_count
*/ 
void local_bh_disable(void) 
{
	struct thread_info *t = current_thread_info();
	t->preempt_count += SOFTIRQ_OFFSET; 
}
/*
* decrement the preempt_count - this will ‘automatically’ enable
* bottom halves if the count returns to zero
*
* optionally run any bottom halves that are pending
*/ 
void local_bh_enable(void) 
{
	struct thread_info *t = current_thread_info();
	t->preempt_count -= SOFTIRQ_OFFSET;
	/*
	* preempt_count是否为0并且是否有挂起的下半部,如果都满足则执行下半部
	*/ 
	if (unlikely(!t->preempt_count && softirq_pending(smp_processor_id())))
		do_softirq();
}
上一篇:tasklet和workqueue区别


下一篇:【原创】Linux中断子系统(三)-softirq和tasklet