linux网络实现分析(1)——数据包的接收(从网卡到协议栈)
说明:源码参考2.6.32
从网卡到协议栈的skb接收有两种方式:NAPI和非NAPI。其中有公共逻辑,也有区别。首先看下用到的基本数据结构。
1. 基本数据结构
l  softnet_data
每cpu数据
struct softnet_data
{
struct Qdisc *output_queue; //发送帧队列
struct sk_buff_head input_pkt_queue; //接收帧队列(入口队列)
struct list_head poll_list; //这是一个双向链表
struct sk_buff *completion_queue;
struct napi_struct backlog;
};
说明:
1. 可以看到发送帧队列并不是skb的链表,而是Qdisc的链表,这是因为发送一般需要Qos流控,所以发送帧会存入相应dev关联的Qdisc中(Qdisc中有skb的队列),详见“后面链路层数据包发送”分析。
2. poll_list是一个双向链表,每一个节点是一个napi_struct结构,而napi_struct又是net_device的成员,所以这个链表也可以理解为一个net_device链表,这些net_device都带有输入帧等着被处理。
3. input_pkt_queue是设备驱动将数据从物理介质接收后封装成skb后存放的缓存队列,所有非NAPI设备共有这一个输入缓存队列,而NAPI设备有自己的私有队列用于存放输入包。
/*
* Structure for NAPI scheduling similar to tasklet but with weighting
*/
struct napi_struct {
struct list_head poll_list;//通过这个成员和其他napi_struct相连
unsigned long state;
int weight;
int (*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLL
spinlock_t poll_lock;
int poll_owner;
#endif
unsigned int gro_count;
struct net_device *dev;
struct list_head dev_list;
struct sk_buff *gro_list;
struct sk_buff *skb;
};
说明:
/* The poll_list must only be managed by the entity which
* changes the state of the NAPI_STATE_SCHED bit. This means
* whoever atomically sets that bit can add this napi_struct
* to the per-cpu poll_list, and whoever clears that bit
* can remove from the list right before clearing the bit.
*/
1. 每个napi_struct可以通过其poll_list成员链接在softnet_data的poll_list,进而组成链表。
2. 通过state成员是否设置NAPI_STATE_SCHED位,来表示该napi_struct是否已经链在某个cpu的softnet_data上,即是否处于调度状态。所以在链入softnet_data前,需要设置NAPI_STATE_SCHED位,在从softnet_data移除时要清除NAPI_STATE_SCHED位。
2. 处理过程
非NAPI设备驱动,接收数据包,分配skb,封装完skb后会调用netif_rx。而对于 NAPI 方式,它没有使用 input_pkt_queue 队列,而是使用私有的 队列,所以它没有这一个步骤。
netif_rx
netif_rx通常在中断环境中被驱动程序调用,但是也有例外,特别是当此函数是被环回设备调用时。因此,netif_rx在其启动时会关闭本地CPU的中断事件,在完成工作后再开启。
该函数从设备驱动程序中接收skb,并送入每cpu变量softnet_data的队列中,以完成数据包从设备驱动接受进入协议栈。
返回值:NET_RX_SUCCESS(接受入队),NET_RX_DROP(由于队满等原因丢弃)。
int netif_rx(struct sk_buff *skb)
{
struct softnet_data *queue;
unsigned long flags;
/* if netpoll wants it, pretend we never saw it */
if (netpoll_rx(skb)) //NETPOLL是否需要消费该帧
return NET_RX_DROP;
if (!skb->tstamp.tv64) // skb->tstamp.tv64是否设置
net_timestamp(skb);// 初始化skb->tstamp.tv64
local_irq_save(flags); //关闭本地cpu的IRQ
queue = &__get_cpu_var(softnet_data); //获取本cpu的softnet_data
__get_cpu_var(netdev_rx_stat).total++; //更新cpu统计计数
if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {//如果队列不满
if (queue->input_pkt_queue.qlen) {//如果输入队列不空
enqueue:
__skb_queue_tail(&queue->input_pkt_queue, skb); //数据包入队
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
//如果输入队列为空
napi_schedule(&queue->backlog);
goto enqueue;
}
__get_cpu_var(netdev_rx_stat).dropped++;
local_irq_restore(flags);
kfree_skb(skb);
return NET_RX_DROP;
}
可以看到该函数的主要功能时将skb放入queue->input_pkt_queue(非NAPI设备共享queue->input_pkt_queue这个队列,而NAPI设备有自己的私有队列),为什么只有队列为空时才调用napi_schedule而,不空时入队后直接返回呢?有没有想过就这样返回后由谁来处理数据包呢?首先回答后者,处理包的任务由软中断负责,而软中断由napi_schedule来触发(见后面分析)。如果队列不为空,说明之前触发过软中断(最开始肯定是空的),并且软中断还没有把包处理完,这时只需将新包加入队列即可,之前唤醒的软中断稍后会一起处理掉这个新入队的skb。如果队列为空,说明软中断还未被调用过,或者说上次软中断已经处理完成,所以这次需要再次调用napi_schedule。
napi_schedule
static inline void napi_schedule(struct napi_struct *n)
{
if (napi_schedule_prep(n)) //检测napi是否能被scheduled
__napi_schedule(n);
}
l  napi_schedule_prep – 检测napi是否能被调度
测试napi例程是否已经运行,如果没有则将其标记为运行状态(通过设置NAPI_STATE_SCHED位),NAPI_STATE_SCHED标记位用来保证当前只有一个NAPI poll函数在运行。同时要检查当前napi的状态不能是NAPI_STATE_DISABLE的。
static inline int napi_schedule_prep(struct napi_struct *n)
{
return !napi_disable_pending(n) &&
!test_and_set_bit(NAPI_STATE_SCHED, &n->state);
}
l  __napi_schedule
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;
local_irq_save(flags);
//将napi_struct添加到softnet_data的poll_list
list_add_tail(&n->poll_list, &__get_cpu_var(softnet_data).poll_list);
//调用NET_RX_SOFTIRQ软中断
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
local_irq_restore(flags);
}
至此,中断的上半部已经完成,以下的工作则交由中断的下半部来实现。总之,NON-NAPI 的中断上半部接收过程可以简单的描述为,它首先为新到来的数据帧分 配合适长度的 SKB,再将接收到的数据从 NIC 中拷贝过来,然后将这个 SKB 链入当前 CPU 的 softnet_data 中的链表中,最后进一步触发中断下半部发继续处理。
而NAPI的中断处理函数(驱动)直接调用napi_schedule,和非NAPI相比跳过了netif_rx函数,所以NAPI和非NAPI的最主要区别也就出来了:
(1) NAPI设备接收数据包不需要存入softnet_data的input_pkt_queue,而是由其自身的驱动函数放入设备自身的私有队列中。非NAPI则要由其驱动函数放入softnet_data的input_pkt_queue。
(2) NAPI调用napi_schedule传入的参数是设备自身对应的napi_struct,将来软中断就会调用设备自身初始化给napi_struct的poll方法,而NAPI则通过调用napi_schedule传入的参数是softnet_data结构中的backlog成员,其poll方法被初始化为内核的process_backlog函数。NAPI通过调用自身的poll方法就能直接将后续到达的数据包从自身私用队列送入协议栈,直到私有队列空,而不用再产生中断(非NAPI设备产生中断的作用是将skb收入softnet_data的input_pkt_queue,再触发软中断,而NAPI设备产生中断只为触发软中断)。
处理NET_RX_SOFTIRQ软中断的函数是net_rx_action。
l  net_rx_action
static void net_rx_action(struct softirq_action *h)
{ //获取struct napi_struct链表
struct list_head *list = &__get_cpu_var(softnet_data).poll_list;
unsigned long time_limit = jiffies + 2;//每次net_rx_action的执行时间
int budget = netdev_budget;
void *have;
local_irq_disable();
while (!list_empty(list)) {
struct napi_struct *n;
int work, weight;
// budget为全部变量,为每次net_rx_action调用,最多处理的skb数量
if (unlikely(budget <= 0 || time_after(jiffies, time_limit)))
goto softnet_break;
local_irq_enable();
/* Even though interrupts have been re-enabled, this
* access is safe because interrupts can only add new
* entries to the tail of this list, and only ->poll()
* calls can remove this head entry from the list.
*/
n = list_entry(list->next, struct napi_struct, poll_list);
have = netpoll_poll_lock(n);
weight = n->weight; //weigth为poll方法每次调用每次最多处理skb的数量
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
//调用struct napi_struct的poll方法,每次最多处理weight个skb
work = n->poll(n, weight); //实际处理了work个skb
trace_napi_poll(n);
}
WARN_ON_ONCE(work > weight);
budget -= work;
local_irq_disable();
if (unlikely(work == weight)) {//说明队列中的skb已经处理完
if (unlikely(napi_disable_pending(n))) {
local_irq_enable();
napi_complete(n);
local_irq_disable();
} else
list_move_tail(&n->poll_list, list);
}
// work == weight说明队列中的skb还没处理完,需要将本napi_struct放入链尾,下次继续执行其poll方法处理skb
netpoll_poll_unlock(have);
}
out:
local_irq_enable();
return;
//当net_rx_action*返回(执行时间到或者处理skb数量超过了budget),而入口队列(softnet_data .input_pkt_queue)中仍有skb时则会执行最后一段代码。
softnet_break:
__get_cpu_var(netdev_rx_stat).time_squeeze++;
__raise_softirq_irqoff(NET_RX_SOFTIRQ);// NET_RX_SOFTIRQ再次被调度
goto out;
}
l  虚拟poll方法
不同设备驱动的poll方法不一样,对于非NAPI设备其poll方法被初始化为process_backlog。
static int process_backlog(struct napi_struct *napi, int quota)
{
int work = 0;
struct softnet_data *queue = &__get_cpu_var(softnet_data);
unsigned long start_time = jiffies;
napi->weight = weight_p;
do {
struct sk_buff *skb;
local_irq_disable();
skb = __skb_dequeue(&queue->input_pkt_queue); //取出skb
if (!skb) {//队列已空
__napi_complete(napi);
local_irq_enable();
break;
}
local_irq_enable();
netif_receive_skb(skb);
} while (++work < quota && jiffies == start_time);
return work;
}
对于NAPI设备,使用自己的poll函数,从自己的私有队列取出skb(NAPI设备不使用公用入口队列softnet_data .input_pkt_queue),然后调用netif_receive_skb(skb)方法。而进入netif_receive_skb方法也就正式进入协议栈的处理逻辑了。
整个处理流程如下图所示。