二、软件构架和实现
1. 一些基础数据结构
文件softirq.c
/*PER-CPU变量,每个cpu对应一个,描述当前cpu中关于softirq的一些状态,比如是否有softirq挂起需要执行等等*/
irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
typedef struct {
unsigned int __softirq_pending; /*32位,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)*/
unsigned long idle_timestamp;
unsigned int __nmi_count; /* arch dependent */
unsigned int apic_timer_irqs; /* arch dependent */
} ____cacheline_aligned irq_cpustat_t;
/*表示softirq最多有32种类型,实际上Linux只用了6种,见文件interrupt.h*/
static struct softirq_action softirq_vec[32] __cacheline_aligned_in_smp;
/* PLEASE, avoid to allocate new softirqs, if you need not _really_ high
frequency threaded job scheduling. For almost all the purposes
tasklets are more than enough. F.e. all serial device BHs et
al. should be converted to tasklets, not to softirqs.
*/
enum
{
HI_SOFTIRQ=0, /*用于高优先级的tasklet*/
TIMER_SOFTIRQ, /*用于定时器的下半部*/
NET_TX_SOFTIRQ,/*用于网络层发包*/
NET_RX_SOFTIRQ, /*用于网络层收包*/
SCSI_SOFTIRQ, /*用于SCSI设备*/
TASKLET_SOFTIRQ /*用于低优先级的tasklet*/
};
struct softirq_action
{
void (*action)(struct softirq_action *); /*softirq的回调函数*/
void *data; /*传入action的参数*/
};
Struct softirq_action是每个softirq的配置结构,一般在系统启动的时候,6个不同的softirq,会通过函数open_softirq()来注册自己的softirq_action,实现很简单:
void open_softirq(int nr, void (*action)(struct softirq_action*), void *data)
{
softirq_vec[nr].data = data;
softirq_vec[nr].action = action;
}
关键是传入的函数指针,具体指明了该softirq要实现的功能或要做的动作。
这里分开看下这六个注册点:
Net/core/dev.c中的net_dev_init()里面注册了网络层需要用到的收包和发包的两个softirq:
open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);
open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);
对应的函数指针为net_tx_action和net_rx_action,具体功能就不在本文的范围之内的。
Driver/scsi/scsi.c中init_scsi()注册了SCSI_SOFTIRQ
open_softirq(SCSI_SOFTIRQ, scsi_softirq, NULL);
start_kernel() àsiftirq_init() 中注册了两种tasklet,一种是高优先级的tasklet,一直是低优先级的tasklet:
open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);
open_softirq(HI_SOFTIRQ, tasklet_hi_action, NULL);
start_kernel() àinit_timers() 中注册了TIMER_SOFTIRQ
open_softirq(TIMER_SOFTIRQ, run_timer_softirq, NULL);
2. softirq运行时机:
系统在运行过程中,会在合适的地方使用函数local_softirq_pending()检查系统是否有softirq需要处理;需要时会调用函数do_softirq()进行处理。这些检查点主要包括以下几个地方:
(1)中断过程退出函数irq_exit();
(2)内核线程ksoftirqd;
(3)内核网络子系统中显示调用;
(4)函数local_bh_enable().
先前我们分析irq_cpustat_t结构的时候,看到__softirq_pending字段。这是一个32位无符号的变量,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)。那么在softirq运行之前肯定就有地方设置了这个变量的各个位,才会触发到softirq运行。这个触发动作一般是在上半部中进行的,即上半部通过系统,还有下半部需要运行。触发函数如下:
#define __raise_softirq_irqoff(nr) do { or_softirq_pending(1UL << (nr)); } while (0)
#define or_softirq_pending(x) (local_softirq_pending() |= (x))
#define local_softirq_pending() \
__IRQ_STAT(smp_processor_id(), __softirq_pending)
#define __IRQ_STAT(cpu, member) (irq_stat[cpu].member)
由此可以看出, __raise_softirq_irqoff(nr)实际上是把当前cpu的per-cpu变量irq_stat的__softirq_pending 的从右往左数的第nr位置1,这样系统就知道某个softirq需要在某个时刻运行了。
当然,__raise_sofritq_irqoff()被很多地方封装过,不同子系统用自己封装的函数,比如网络子系统就用netif_rx_reschedule和net_rx_action来激活softirq。
3.softirq的执行分析:
我们先提到了softirq在四个地方有可能运行,最常见的就是中断过程退出函数irq_exit(),我们下面分析之,其他的触发点请大家对照代码自行分析:
void irq_exit(void)
{
account_system_vtime(current);
sub_preempt_count(IRQ_EXIT_OFFSET);
/*如果在上半部中设置了per-cpu变量irq_stat的__softirq_pending字段,则运行下半部的处理函数, 从这里也可以看出,上半部和下半部一定是运行在同一个cpu上,因为上半部中是设置了per-cpu变量irq_stat本地cpu副本中的__softirq_pending中的位,而下半部也只是判断本地cpu的__sofrirq_pending中的位。这样有效的利用了cpu cache的特性*/
if (!in_interrupt() && local_softirq_pending())
invoke_softirq();
preempt_enable_no_resched();
}
Invoke_sofirq()àdo_softirq()
asmlinkage void do_softirq(void)
{
__u32 pending;
unsigned long flags;
/*本地cpu中,softirq不能在中断环境中运行,这个中断环境包括了上半部和下半部,所以这里保证了同一个cpu上,下半部是不会被重入的,但不能保证其他cpu上的同时运行同一个softirq处理。所以编程人员必须让自己编写的softirq处理函数可重入,以防SMP系统中同时运行这些softirq导致数据出现不一致性*/
if (in_interrupt())
return;
local_irq_save(flags); /*关中断*/
pending = local_softirq_pending(); /*取得per-cpu变量irq_stat本地cpu副本的__softirq_pending的值*/
if (pending) /*如果有本地cpu有softirq挂起需要处理,则通过__do_softirq()运行之,否则恢复中断并退出*/
__do_softirq();
local_irq_restore(flags); /*恢复开中断*/
}
关键是__do_softirq()
asmlinkage void __do_softirq(void)
{
struct softirq_action *h;
__u32 pending;
int max_restart = MAX_SOFTIRQ_RESTART;
int cpu;
pending = local_softirq_pending();
/*这个地方加local_bh_disable的原因是softirq的处理必须是串行的,又因为softirq的执行期间中断是打开的,所以当另一个中断被执行的话在in_interrupt函数上面就会发现这里已经有在执行了,就会自动退出*/
local_bh_disable();
cpu = smp_processor_id();
restart:
/* Reset the pending bitmask before enabling irqs */
/*把per-cpu变量 irq_stat的本地cpu副本的__softirq_pengding字段清0,
表示代码有信心在这一次处理中把本地cpu上挂起的所有softirq都处理掉(有信心只是开个玩笑;)*/
set_softirq_pending(0);
local_irq_enable(); /*保证下半部要在中断打开的情况下进行,否则下半部就失去意义了*/
h = softirq_vec; /*softirq_vec是一个全局数组(有32个元素),存放了32种softirq的处理函数*/
/*遍历unsigned int pengding的每一位,如果有被置为1,则运行对应的下半部处理函数action*/
do {
if (pending & 1) {
/*action是一个处理队列,对于非tasklet的softirq来说只有一个元素,
但对于tasklet来说,就有N个函数需要处理*/
h->action(h);
rcu_bh_qsctr_inc(cpu);
}
h++;
pending >>= 1;
} while (pending);
local_irq_disable(); /*关闭中断*/
/*因为下半部是在开中断的环境中运行的,
所以有可能在运行了softirq A以后,
然后在运行其他的softirq B,
这时又产生A的硬件中断(A和B在同一个cpu中产生),
而在A的上半部中又设置了per-cpu变量irq_stat的本地
cpu副本的irq_stat的__softirq_pending的对应的bit,
所以代码运行到这里又发现__softirq_pending不0,
所以要重做处理。这样的情况最多执行max_restart次,因为
如果不限次数的运行下去,中断就一直不返回,那么进程
就得不到调度,系统性能会大大受影响。所以运行max_restart次以后,
如果这样的情况还在一直发生,那么就唤醒per-cpu thread来专门执行
这些下半部,注意,在per-cpu thread中处理的中断下半部,是可以睡眠
的,但是编程人员无法掌握他编写的softirq处理程序是在irq_exit()中处理还是
在per-cpu thread中处理,所以一般都不会有睡眠的可能(编程人员需要保证
这一点)*/
pending = local_softirq_pending();
if (pending && --max_restart)
goto restart;
if (pending)
wakeup_softirqd();
__local_bh_enable(); /*enable下半部运行*/
}
这里就不画流程图了,关键是要仔细的分析几个中断关闭和中断使能的时机,以及softirq是否可以重入的问题。
三总结
本文分析了softirq运行的时间点,以及softirq是怎样被cpu调度的。后面还要继续分析tasklet的实现,tasklet实际上就是凌驾在softirq机制上的,它占用了Linux现有6种softirq的2种(优先级最高的和优先级最低的)。
要特别注意的是:softirq处理函数也不能睡眠,因为它也是运行在中断上下文环境中的(不考虑ksoftirqd线程)。
一、为什么要进入tasklet
我们在softirq的文章中分析过,在SMP系统中,任何一个处理器在响应外设中断请求,完成中断上半部处理后,都可以调用函数do_softirq()来处理构建在softirq机制上的下半部。也就是说,softirq处理函数在SMP系统中是可以并行执行的,这要求使用softirq机制的下半部必须是多处理器可重入的。这对于一般的驱动程序开发者而言, 事情会变得复杂化、难度增大。为了降低驱动开发难度必须提供一套有效的机制,tasklet就是为了解决这一问题而出现的。
二、tasklet实现分析
1. 一个实例
#include
#include
#include
#include
#include
#include
#include
static struct tasklet_struct my_tasklet; /*定义自己的tasklet_struct变量*/
static void tasklet_handler (unsigned long data)
{
printk(KERN_ALERT “tasklet_handler is running.\n”);
}
static int __init test_init(void)
{
tasklet_init(&my_tasklet, tasklet_handler, 0); /*挂入钩子函数tasklet_handler*/
tasklet_schedule(&my_tasklet); /* 触发softirq的TASKLET_SOFTIRQ,在下一次运行softirq时运行这个tasklet*/
return 0;
}
static void __exit test_exit(void)
{
tasklet_kill(&my_tasklet); /*禁止该tasklet的运行*/
printk(KERN_ALERT “test_exit running.\n”);
}
MODULE_LICENSE(“GPL”);
module_init(test_init);
module_exit(test_exit);
运行结果如图:
2. 实现分析
我们就从上面这个实例入手来分析tasklet的实现,
在init中,通过函数tasklet_init()来初始化自己需要注册到系统中的tasklet结构:
void tasklet_init(struct tasklet_struct *t,
void (*func)(unsigned long), unsigned long data)
{
t->next = NULL;
t->state = 0;
atomic_set(&t->count, 0);
t->func = func;
t->data = data;
}
很简单,只是初始化tasklet_struct的各个字段,挂上钩子函数。
然后,通过函数tasklet_schedule()来触发该tasklet
static inline void tasklet_schedule(struct tasklet_struct *t)
{
/*如果需要调度的tasklet的state不为TASKLET_STATE_SCHED,则触发之。这样,就保证了多个cpu不可能同时运行同一个tasklet,因为如果一个tasklet被调度过一次,那么它的state字段就会被设置TASKLET_STATE_SCHED标记,然后插入per-cpu变量的链表中。如果这时另外一个cpu也去调度该tasklet,那么就会在下面的if语句中被挡掉,不会运行到__tasklet_schedule(),从而不会插入到另外这个cpu的per-cpu变量的链表中,就不会被运行到。所以这里是保证了tasklet编写的函数不用是可重入的,这样就方便了编程人员。(注意,softirq机制需要编写可重入的函数)*/
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
我们来看__tasklet_schedule()的实现:
void fastcall __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;
local_irq_save(flags);
/*把需要添加进系统的自己编写的struct tasklet_struc加入
到per-cpu变量tasklet_vec的本地副本的链表的表头中*/
t->next = __get_cpu_var(tasklet_vec).list;
__get_cpu_var(tasklet_vec).list = t;
raise_softirq_irqoff(TASKLET_SOFTIRQ); /*触发softirq的TASKLET_SOFTIRQ*/
local_irq_restore(flags);
}
这段代码也非常简单,只是把自己要注册到系统中的tasklet_struct挂入到per-cpu变量tasklet_vec的list中而已,这里是挂到链表首部。因为需要修改per-cpu变量tasklet_vec的list的值,为了防止中断处理程序也去修改这个值,所以要加自旋锁,为了保持数据的一致性。
然后通过raise_softirq_irqoff()设置低优先级的tasklet对应的softirq标记,以便cpu在运行softirq的时候运行到tasklet,因为tasklet是凌驾在softirq机制之上的。
OK,这里就完成了我们自己的my_tasklet的注册和触发对应的softirq,那我们现在就应该分析tasklet的运行了。
我们前面提到,tasklet是凌驾在softirq机制之上的。还记得前面说到了Linux中有六种softirq,优先级最高的是HI_SOFTIRQ,优先级最低的是TASKLET_SOFTIRQ,一般情况下我们是利用TASKLET_SOFTIRQ来实现tasklet的功能。
open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);中定义了处理tasklet的处理函数tasklet_action.所以我们要分析这个函数的实现:
static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;
/*把per-cpu变量tasklet_vec的本地副本上的list设置为NULL,
由于这里要修改per-cpu变量,为了防止中断处理程序
或者内核抢占造成该数据的不一致性,所以这里禁止中断再修改数据
,然后再开启中断.(注意,关闭本地中断的副作用就是禁止内核抢占,
因为内核抢占只有两个时间点: 1.中断返回到内核态;2.手动使能内核抢占。
明显程序员不会在临界区内手动使能内核抢占,所以关闭本地中断的
副作用就是禁止内核抢占)*/
local_irq_disable();
list = __get_cpu_var(tasklet_vec).list;
__get_cpu_var(tasklet_vec).list = NULL;
local_irq_enable();
/*遍历tasklet链表,让链表上挂入的函数全部执行完成*/
while (list) {
struct tasklet_struct *t = list;
list = list->next;
if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data); /*真正运行user注册的tasklet函数的地方*/
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
/*这里相当于把tasklet的list指针从链表中后移了(可以自行画图分析),
所以刚才运行过的tasklet回调函数以后不会再次运行,除非用于再次
通过tasklet_schedule()注册之*/
local_irq_disable();
t->next = __get_cpu_var(tasklet_vec).list;
__get_cpu_var(tasklet_vec).list = t;
__raise_softirq_irqoff(TASKLET_SOFTIRQ); /*再一次触发tasklet对应的softirq,使下次系统运行softirq时能运行到tasklet*/
local_irq_enable();
}
}
运行流程是不是很简单呢?呵呵。只要注意到加锁的时机就OK了!
三、总结
Tasklet与一般的softirq的比较重要的一个区别在于: softirq处理函数需要被编写成可重入的,因为多个cpu可能同时执行同一个softirq处理函数,为了防止数据出现不一致性,所以softirq的处理函数必须被编写成可重入。最典型的就是要在softirq处理函数中用spinlock保护一些共享资源。而tasklet机制本身就保证了tasklet处理函数不会同时被多个cpu调度到。因为在tasklet_schedule()中,就保证了多个cpu不可能同时调度到同一个tasklet处理函数,这样tasklet就不用编写成可重入的处理函数,这样就大大减轻了kernel编程人员的负担。