Overview
从宏观的角度来看,一个packet从网卡到socket接收缓冲区的路径如下所示:
- 驱动加载并初始化
- packet到达网卡
- packet通过DMA被拷贝到内核中的一个ring buffer
- 产生一个硬件中断,让系统知道已经有个packet到达内存
- 驱动会调用NAPI启动一个poll loop,如果它还没启动的话
- 系统的每个CPU上都有一个ksoftirqd进程,它们都是在系统启动的时候就已经注册了的。ksoftirqd进程会调用NAPI的poll函数从ring buffer中将packet取出,而poll函数是设备驱动程序在初始化的时候注册的。
- 那些已经写入数据的ring buffer的内存区域会被unmapped
- 那些通过DMA写入内存的数据会以"skb"的形式传递给网络层进行进一步的处理
- 如果packet steering功能开启或者网卡有多个receive queue,则接收到的packet会被分发到多个CPU上
- 队列中的数据会被传递到protocol layer
- protocol layer会对数据进行处理
- 数据最终会通过protocl layers加入所属socket的receive buffer
整个流程会在下文的各个章节中进行详细的描述,而下文中的protocol layer会以IP和UDP作为例子,但是其中的很多内容,对于其他protocol layer都是通用的。
Detailed Look
本文将会以igb驱动程序作为例子,并用它来控制一个比较常见的服务器网卡Intel I350。因此,我们首先来看看igb设备驱动程序是怎么工作的。
Network Device Driver
Initialization
驱动程序会利用module_init宏注册一个初始化函数,当内核加载驱动程序时,该函数就会被调用。igb初始化函数(igb_init_module)和它利用module_init进行注册的代码如下:
/**
* igb_init_module - Driver Registration Routine
*
* igb_init_module is the first routine called when the driver is
* loaded. All it does is register with the PCI subsystem.
**/
static int __init igb_init_module(void)
{
int ret;
pr_info("%s - version %s\n", igb_driver_string, igb_driver_version);
pr_info("%s\n", igb_copyright); /* ... */ ret = pci_register_driver(&igb_driver);
return ret;
} module_init(igb_init_module);
其中初始化设备的大部分工作都是由pci_register_driver来完成的。我们将在下面详细介绍。
PCI initialization
Intel I350是一个PCI express设备。PCI设备通过PCI Configuration Space中的一些寄存器标识自己。
当一个设备驱动程序被编译时,会用一个叫做MODULE_DEVICE_TABLE的宏来创建一个table,用该table来包含该设备驱动程序可以控制的PCI设备的设备ID。接着这个table也会被注册为一个结构的一部分,我们在下面马上就能看到。
而内核最终将会使用这个table来决定该加载哪个驱动程序来控制该设备。
操作系统就是这样确定哪个设备和系统连接了,以及该使用哪个驱动来和该设备进行交互。
static DEFINE_PCI_DEVICE_TABLE(igb_pci_tbl) = {
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_1GBPS) },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_SGMII) },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I354_BACKPLANE_2_5GBPS) },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I211_COPPER), board_82575 },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER), board_82575 },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_FIBER), board_82575 },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES), board_82575 },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SGMII), board_82575 },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_COPPER_FLASHLESS), board_82575 },
{ PCI_VDEVICE(INTEL, E1000_DEV_ID_I210_SERDES_FLASHLESS), board_82575 }, /* ... */
};
MODULE_DEVICE_TABLE(pci, igb_pci_tbl);
从上文已知,在驱动的初始化函数中会调用pci_register_driver函数。
该函数会注册一个满是指针的结构,其中大多数的指针都是函数指针,不过包含PCI device ID的table同样会被注册。内核会利用这些驱动注册的函数来启动PCI设备。
static struct pci_driver igb_driver = {
.name = igb_driver_name,
.id_table = igb_pci_tbl,
.probe = igb_probe,
.remove = igb_remove, /* ... */
};
PCI probe
一旦一个设备通过它的PCI ID被识别,内核就会选择合适的驱动程序来控制该设备。每一个PCI设备驱动程序都在内核的PCI子系统中注册了一个probe function。对于还没有驱动控制的设备,内核会调用该函数,直到和某个驱动程序相匹配。大多数驱动程序都有大量的代码用来控制设备。具体的操作各个驱动也有所不同。但是一些典型的操作如下所示:
- 启动PCI设备
- 申请内存和IO端口
- 设置DMA mask
- 注册设备驱动程序支持的ethtool function
- 有必要的话,启动watch dog task(比如,e1000 有一个watchdog task来确认硬件是否挂起)
- 处理一些该设备特有的问题
- 创建,初始化和注册一个struct net_device_ops结构,该结构包含一系列的函数指针,指向各种例如打开设备,发送数据,设置mac地址以及其他一些功能
- 创建,初始化和注册一个struct net_device用来代表一个网络设备
让我们来快速浏览一下,igd里对应的igb_probe是如何完成上述操作的
A peek into PCI initialization
接下来的这些代码取自igb_probe函数,主要用于一些基本的PCI配置
err = pci_enable_device_mem(pdev); /* ... */ err = dma_set_mask_and_coherent(&pdev->dev, DMA_BIT_MASK(64)); /* ... */ err = pci_request_selected_regions(pdev, pci_select_bars(pdev,
IORESOURCE_MEM),
igb_driver_name); pci_enable_pcie_error_reporting(pdev); pci_set_master(pdev);
pci_save_state(pdev);
首先,设备会由pci_enable_device_mem初始化,如果该设备处于暂停状态就会被唤醒,获取内存资源以及其他一些工作。接着会对DMA mask进行设置,因为该设备会读写64位的内存地址,因此dma_set_mask_and_coherent的参数为DMA_BIT_MASK(64)。然后调用pci_request_selected_regions获取内存,同时使能PCI Express Advanced Error Reporting功能,最终调用pci_set_master使能DMA并且调用pci_save_state保存PCI configuration space。
Network device initialization
igb_probe函数做了大量关于网络设备初始化的工作。除了一些针对PCI的工作以外,它还需要做如下这些工作:
- 注册struct net_device_ops结构
- 注册ethtool的相关操作
- 从网卡中获取默认的mac地址
- 设置net_device中的feature flags
- 以及其他一些工作
下面我们对上述的每一部分进行详细的分析。
struct net_device_ops
struct net_device_ops中包含许多函数指针指向一些网络子系统用来操作设备的重要功能。我们将在接下来的内容中多次提及此结构。在igb_probe中net_device_ops结构将会和struct net_device绑定,代码如下:
static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
/* ... */ netdev->netdev_ops = &igb_netdev_ops;
而net_device_ops结构中的各个指针指向的函数也定义在同一个文件中:
static const struct net_device_ops igb_netdev_ops = {
.ndo_open = igb_open,
.ndo_stop = igb_close,
.ndo_start_xmit = igb_xmit_frame,
.ndo_get_stats64 = igb_get_stats64,
.ndo_set_rx_mode = igb_set_rx_mode,
.ndo_set_mac_address = igb_set_mac,
.ndo_change_mtu = igb_change_mtu,
.ndo_do_ioctl = igb_ioctl, /* ... */
我们可以看到,这个结构中包含很多有趣的字段,例如ndo_open,ndo_stop,ndo_start_xmit和ndo_get_stats64,他们都包含了igb驱动实现的对应函数的地址。我们下面会对其中的某些内容做进一步的分析。
ethtool registration
ethtool是一个命令行工具,用来获取和设置各种驱动和硬件相关的选项。通常会利用ethtool来从网络设备收集一些详细的数据。
ethtool通过ioctl系统调用和设备驱动程序交互。设备驱动程序注册了一系列的函数用于ethtool的操作。当ethtool发出一个ioctl调用时,内核会找到对应驱动的ethtool结构并且执行相应的注册函数。驱动的ethtool函数可以做许多事情,包括修改驱动中一个简单的falg,乃至通过写设备的寄存器来调整真实设备。
igb驱动通过在igb_probe中调用igb_set_ethtool_ops来注册ethtool的各个操作。
static int igb_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
/* ... */ igb_set_ethtool_ops(netdev);
void igb_set_ethtool_ops(struct net_device *netdev)
{
SET_ETHTOOL_OPS(netdev, &igb_ethtool_ops);
}
static const struct ethtool_ops igb_ethtool_ops = {
.get_settings = igb_get_settings,
.set_settings = igb_set_settings,
.get_drvinfo = igb_get_drvinfo,
.get_regs_len = igb_get_regs_len,
.get_regs = igb_get_regs,
/* ... */
每个驱动都能自己决定哪些ethtool函数和自己有关并且决定实现其中的哪些。并不是每个驱动都需要实现所有的ethtool函数。其中一个比较有趣的ethtool函数是get_ethtool_stats,它会创建一些非常详细的计数器进行追踪,它们要么位于驱动中,要么位于设备内。
IRQs
当一个数据帧通过DMA被写入RAM时,网卡是如何通知系统的其余部分,已经有数据可以处理了呢?
一般网卡会产生一个interrupt request(IRQ)表示有数据到了。有以下三种IRQ类型:MSI-X,MSI和legacy IRQ。但是如果有大量的数据帧到达时,就会导致产生大量的IRQ。而产生的IRQ越多,那么用于high level task,例如用户进程的CPU时间就越少。
于是创建了New API(NAPI)这种机制,用于减少数据包的到来导致设备产生中断的数目。尽管NAPI可以减少IRQ的数目,但是并不能完全避免。下面的章节会告诉我们原因。
NAPI
NAPI在许多方面和获取数据传统的方式不同。NAPI允许设备驱动程序注册一个poll函数,NAPI子系统会调用它来获取数据帧。
NAPI一般的使用方式如下:
- NAPI由驱动使能,但是开始仍处于关闭状态
- 一个packet到达并由网卡通过DMA到内存
- 网卡产生一个IRQ,从而触发了驱动中的IRQ handler
- 驱动利用一个softirq唤醒NAPI子系统,它会在另一个线程中调用驱动注册的poll函数来获取packet
- 驱动接着会屏蔽网卡发出的所有IRQ,因为这能让NAPI子系统处理packet并且不受来自设备的中断的影响
- 一旦没有更多的工作需要做了,NAPI子系统会被关闭,而来自设备的IRQ又会被开启
- 跳到步骤2
上述这种收集数据的方式和传统方式相比能够有效减少overhead,因为一次能处理很多数据,而不需要每个数据帧产生一次IRQ。设备驱动程序实现了poll函数并通过调用netif_napi_add将它注册到NAPI中。当通过netif_napi_add向NAPI注册poll时,驱动同时会声明一个weight,大多数驱动都会将它固定为64。该值的意义将会在下文讨论。
通常,驱动程序会在初始化的时候注册他们的NAPI poll函数。
NAPI initialization in the igb driver
igb驱动通过如下一个长长的调用链来实现NAPI的初始化:
- igb_probe调用igb_sw_init
- igb_sw_init调用igb_init_interrupt_scheme
- igb_init_interrupt_scheme调用igb_alloc_q_vectors
- igb_alloc_q_vectors调用igb_alloc_q_vector
- igb_alloc_q_vector调用netif_napi_add
这个调用链会导致一些上层的事发生:
- 如果支持MSI-X,它会通过调用pci_enable_msix使能
- 许多设置被初始化:尤其是设备和驱动用来发送接收包的发送和接收队列的数目
- 每次创建一个传输或者接收队列都会调用一次igb_alloc_q_vector
- 每次调用igb_alloc_q_vector都会调用netif_napi_add为该队列注册一个poll函数,并且每次调用poll函数接收数据时都会传递给它一个struct napi_struct的实例。
让我们来看一看igb_alloc_q_vector是如何注册poll回调函数以及它的私有数据的
static int igb_alloc_q_vector(struct igb_adapter *adapter,
int v_count, int v_idx,
int txr_count, int txr_idx,
int rxr_count, int rxr_idx)
{
/* ... */ /* allocate q_vector and rings */
q_vector = kzalloc(size, GFP_KERNEL);
if (!q_vector)
return -ENOMEM; /* initialize NAPI */
netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64); /* ... */
上述代码为receive queue分配了内存,并且向NAPI子系统注册了igb_poll函数。其中的参数包含了新创建的receive queue相关的struct napi_struct的引用(&q_vector->napi)。当需要从receive queue中接收数据包时,NAPI子系统会将它传输给igb_poll函数。这对于我们以后研究数据流从驱动发送网络栈的过程是非常重要的。
Bring a network device up
回忆一下之前说过的net_device_ops结构,它注册了一系列函数用于启动设备,传输包,设置mac地址等等。当一个网络设备被启动时(比如,调用ifconfig eth0 up) ,和net_device_ops结构中的ndo_open域相关的函数就会被调用。
ndo_open函数会做如下操作:
- 获取receive queue和send queue的内存
- 使能NAPI
- 注册interrupt handler
- 使能hardware interrupts
- 以及其他一些工作
在igb驱动中,和net_device_ops结构中的ndo_open相关的函数为igb_open。
Preparing to receive data from the network
现在大多数的网卡都利用DMA直接将数据写入RAM,从而让操作系统能直接获取数据进行处理。许多网卡为此使用的数据结构类似于创建在环形缓冲区上的队列。为了实现DMA,设备驱动程序必须和操作系统合作,保留一些内存可供网卡使用。一旦区域确定,网卡会得到关于通知,并且会将收到的数据都写入其中。之后这些数据会被取出并交由网络子系统处理。
这些都非常简单,但是如果数据包到达的过快,单个CPU不能很好地处理所有的数据包怎么办?因为该数据结构是基于一个固定大小的内存区域,因此接收到的包将会被丢弃。这个时候,像Receive Side Scaling(RSS)或multiqueue就能派上用场了。有的设备有能力同时将包写入不同的RAM,每个区域都是一个单独的队列。这就允许操作系统在硬件层面使用多个CPU并行处理获取的数据。但是这个特性并不是被所有网卡支持的。不过Intel I350是支持multiple queue的。我们可以看到在igb驱动中,它在启动时就是调用一个叫igb_setup_all_rx_resources的函数。而它又会调用另一个函数,igb_setup_rx_resources,用于让receive queue处理DMA内存。事实上,receive queue的数目和长度可以通过ethtool进行调整。对该这些值进行调整,我们可以看到对于已处理包的数目和已丢弃包数目比例的影响。
网卡一般使用基于packet header field(源地址,目的地址,端口)的哈希函数来确定某个包该发往哪个receive queue。有的网卡还允许你调整某些receive queue的权重,从而让某些特定的队列处理更多的流量。还有的网卡甚至允许你调整哈希函数。这样的话,你可以将特定的数据流发往特定的receive queue进行处理,甚至在硬件层面就将包丢弃。接下来我们很快会看到如何对这些设置进行调整。
Enable NAPI
当打开一个设备时,驱动通常会使能NAPI。之前我们看到了驱动是如何注册NAPI的poll函数的,但是直到打开设备前,NAPI都不是使能的。使能NAPI其实非常简单直接,调用napi_enable翻转struct napi_struct 中的一个位就表示NAPI使能了。如上所述,尽管NAPI使能了,但是它仍然可能处于关闭状态。在igb driver中,每个q_vector的NAPI都会在设备打开或者利用ethtool改变队列的数目或大小时使能。
for (i = 0; i < adapter->num_q_vectors; i++)
napi_enable(&(adapter->q_vector[i]->napi));
Register an interrupt handler
使能了NAPI之后,下一步就是注册一个interrupt handler。现在有好几种方式用于发生一个中断:MSI-X,MSI以及legacy interrupts。因此,这一部分的代码对于不同的驱动都是不同的,这取决于特定的硬件支持哪种中断方式。驱动必须确定设备支持哪种中断方式,并且注册合适的处理方法,从而能在中断发生时进行处理。有些驱动,例如igb会为每种方法注册一个interrupt handler,一种方法失败就换另一种。对于支持multiple receive queue的网卡来说,MSI-X是更好的方法。这样的话,每个receive queue都有自己的hardware interrupt,从而能被特定的CPU处理(通过irqbalance或修改/proc/irq/IRQ_NUMBER/smp_affinity)。我们很快就能看到,处理中断的CPU也将是对包进行处理的CPU。这样一来,收到的包就能从hardware interrupt开始直到整个网络栈都由不同的CPU处理。
如果MSI-X不能用,MSI仍然要优于legacy interrupts。在igb驱动中,函数igb_msix_ring,igb_intr_msi和igb_intr分别是MSI-X,MSI和legacy interrupt对应的interrupt handler。
static int igb_request_irq(struct igb_adapter *adapter)
{
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
int err = 0; if (adapter->msix_entries) {
err = igb_request_msix(adapter);
if (!err)
goto request_done;
/* fall back to MSI */ /* ... */
} /* ... */ if (adapter->flags & IGB_FLAG_HAS_MSI) {
err = request_irq(pdev->irq, igb_intr_msi, 0,
netdev->name, adapter);
if (!err)
goto request_done; /* fall back to legacy interrupts */ /* ... */
} err = request_irq(pdev->irq, igb_intr, IRQF_SHARED,
netdev->name, adapter); if (err)
dev_err(&pdev->dev, "Error %d getting interrupt\n", err); request_done:
return err;
}
从上面的代码我们可以看到,驱动会首先尝试利用igb_request_msix设置MSI-X的interrupt handler,如果失败的话,进入MSI。request_irq用于注册MSI的interrupt handler,igb_intr_mis。如果这也失败了,则会进入legacy interrupts。这个时候会再次使用request_irq注册legacy interrupt的interrupt handler,igb_intr。igb的驱动就是这样注册一个函数用于处理,当网卡发出中断说明有数据到达并已经准备好接受处理了。
Enable Interrupts
到现在为止,基本上所有事情都设置完毕了。唯一剩下的就是打开中断并且等待数据的到来。打开中断对于每个设备都是不一样的,对于igb驱动,它是在__igb_open中通过调用igb_irq_enable完成的。一般,打开中断都是通过写设备的寄存器完成的:
static void igb_irq_enable(struct igb_adapter *adapter)
{ /* ... */ wr32(E1000_IMS, IMS_ENABLE_MASK | E1000_IMS_DRSTA);
wr32(E1000_IAM, IMS_ENABLE_MASK | E1000_IMS_DRSTA); /* ... */
}
The network device is now up
驱动可能还需要做另外一些事,例如启动定时器,work queue,或者其他硬件相关的设置。一旦这些都完成了,那么设备就已经启动并准备好投入使用了。
SoftIRQs
在深入网络栈之前,我们先要了解一下Linux内核中一个叫做SoftIRQ的东西
What is a softirq
Linux内核中的softirq system是一种能够让代码到interrupt handler上下文之外执行的一种机制。它非常重要,因为在几乎所有的interrupt handler的执行过程中,hardware interrupts都是关闭的。而中断关闭的时间越长,那么就越有可能丢失某些event。因此我们可以把一些执行时间较长的代码放到interrupt handler之外执行,这样就能让它快点完成从而恢复中断。在内核中,还有其他的机制能够延迟代码的执行,但是对于网络栈来说,我们选择softirqs。
softirq system可以被看成是一系列的kernel thread(每个CPU一个),它们会对不同的softirq event运行不同的处理函数。如果你观察过top命令的输出,并且在一系列的kernel threads中看到了一个ksoftirqd/0,那么它就是运行在CPU 0上的一个softirq kernel thread。
内核子系统可以通过运行open_softirq函数来注册一个softirq handler。我们下面将看到的是网络子系统如何注册它的softirq handlers。现在,我们先来学习一下softirq是如何工作的。
ksoftirqd
因为softirq对于推迟设备驱动工作的执行太过重要了,你可以想象,它一定在内核整个生命周期中很早的时候就开始执行了。下面我们来看看ksoftirqd系统是如何初始化的:
static struct smp_hotplug_thread softirq_threads = {
.store = &ksoftirqd,
.thread_should_run = ksoftirqd_should_run,
.thread_fn = run_ksoftirqd,
.thread_comm = "ksoftirqd/%u",
}; static __init int spawn_ksoftirqd(void)
{
register_cpu_notifier(&cpu_nfb); BUG_ON(smpboot_register_percpu_thread(&softirq_threads)); return 0;
}
early_initcall(spawn_ksoftirqd);
你可以看到上面struct smp_hotplug_thread的定义,其中注册了两个函数指针:ksoftirqd_should_run和run_softirqd。这两个函数都会在kernel/smpboot.c中被调用,用来构成一个event loop。kernel/smpboot.c中的代码首先会调用ksoftirqd_should_run来确定是否还有pending softirq,如果有的话,就执行run_softirqd。run_ksoftirqd会在调用__do_softirq之前执行一些minor bookkeeping。
__do_softirq
__do_softirq函数主要做以下这些事:
- 确定哪些softirq被挂起了
- 记录时间
- 更新softirq执行次数
- 被挂起的softirq的softirq handler(在open_softirq被注册)被执行
因此,现在你看CPU的使用图,其中的softirq或si代表的就是用于这些deferred work所需的时间。
Linux network device subsysem
既然我们已经大概了解了网卡驱动和softirq是如何工作的,接下来我们来看看Linux network device subsystem是如何初始化的。接着我们将追踪一个包从它到达网卡之后所走过的整条路径。
Initialization of network device system
network device (netdev) subsystem是在函数net_dev_init中初始化的。有许多有趣的事情在这个初始化函数中发生。
Initialization of struct softnet_data structures
net_dev_init会为每个CPU都创建一个struct softnet_data。这个结构会包含很多指针用于处理网络数据:
- 一系列注册到该CPU的NAPI结构
- 一个backlog用于数据处理
- processing weight
- 一个receive offload 结构的列表
- 对于Receive packet steering的设置
- 以及其他
其中的每一部分我们都会在下文中详细叙述。
Initialization of softirq handlers
net_dev_init注册了一个receive softirq handler和transmit softirq handler分布用于处理输入和输出的数据。代码如下:
static int __init net_dev_init(void)
{
/* ... */ open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action); /* ... */
}
我们很快就能看到驱动的interrupt handler是如何触发NET_RX_SOFTIRQ的net_rx_action函数的
Data arrives
终于,数据来了!
假设receive queue有足够的descriptors,packet会直接通过DMA写入RAM。之后设备就会产生一个相应的中断(或者在MSI-X中,是packet到达的receive queue对应的中断)
Interrupt handler
一般来说,当一个中断对应的interrupt handler运行时,它应该将尽量多的工作都放到中断上下文之外进行。这非常重要,因为在一个中断执行的过程中,其他中断都阻塞了。让我们来看看MSI-X interrupt handler的源码,它能很好地解释,为什么interrupt handler应该尽可能地少做工作。
static irqreturn_t igb_msix_ring(int irq, void *data)
{
struct igb_q_vector *q_vector = data; /* Write the ITR value calculated from the previous interrupt. */
igb_write_itr(q_vector); napi_schedule(&q_vector->napi); return IRQ_HANDLED;
}
这个interrupt handler非常短,在返回之前仅仅做了两个很快的操作。首先,它调用了igb_write_itr,更新了一下硬件相关的寄存器。在这个例子中,被更新的寄存器是用于追踪hardware interrupt到达速率的。这个寄存器通常和一个叫"Interrupt Throttling"(或者叫"Interrupt Coalescing")的硬件特性相结合,它用来调整中断发往CPU的速率。我们很快可以看到ethtool提供了一种机制,能够调节IRQ发生的速率。
接着调用napi_schedule用来唤醒NAPI processing loop(如果它不在运行的话)。注意的是NAPI processing loop是在softirq运行的,而不是在interrupt handler中。interrupt handler只是简单地让它开始执行,如果它没有准备好的话。
真正的代码会展现这些工作会是多么重要,它会帮助我们理解网络数据是如何在多CPU系统中处理的。
NAPI and napi_schedule
让我们来看看hardware interrupt handler中调用的napi_schedule是如何工作的。
要记住,NAPI存在的目的就是在不需要网卡发送中断,表示已经有数据可以准备处理的情况下也能接收数据。如上文所述,NAPI的poll loop会在收到一个hardware interrupt后生成。换句话说:NAPI是使能的,但是处于关闭状态,直到网卡产生一个IRQ表示第一个packet到达,NAPI才算打开。当然还有其他一些情况,我们很快就能看到,NAPI会被关闭,直到一个hardware interrupt让它重新开启。
NAPI poll loop会在驱动的interrupt handler调用napi_schedule后启动。不过napi_schedule只是一个包装函数,它直接调用了__napi_schedule
/**
* __napi_schedule - schedule for receive
* @n: entry to schedule
*
* The entry's receive function will be scheduled to run
*/
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags; local_irq_save(flags);
____napi_schedule(&__get_cpu_var(softnet_data), n);
local_irq_restore(flags);
}
EXPORT_SYMBOL(__napi_schedule);
该代码调用__get_cpu_var获取当前运行的CPU的softnet_data结构。接着softnet_data结构和struct napi_struct结构会被传输给__napi_schedule。
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}
上面的代码主要做了两件事:
- 从驱动程序的interrupt handler中获取的struct napi_struct会被添加到当前CPU的softnet_data结构的poll_list中
- __raise_softirq_irqoff用于触发一个NET_RX_SOFTIRQ softirq。这会导致在network device subsystem初始化的时候注册的net_rx_action被执行,如果它当前没有在执行的话
我们很快就能看到,softirq的处理函数net_rx_action会调用NAPI的poll函数用于获取数据
A note about CPU and network data processing
需要注意的是到目前为止我们见到的所有把任务从hardware interrupt handler推迟到softirq的代码使用的结构都是和当前CPU相关的。尽管驱动的IRQ handler只做很少的工作,但是softirq handler会和驱动的IRQ handler在同一个CPU上执行。
这就是为什么IRQ会由哪个CPU处理很重要了,因为该CPU不仅会用于执行驱动的interrupt handler,还会通过对应的NAPI在softirq中获取数据。
我们接下去将会看到,像Receive Packet Steering这样的机制会将其中的一些工作分发到其他CPU上去。
Network data processing begins
一旦softirq的代码知道了是哪个softirq被挂起了,它就会开始执行,并且调用net_rx_action,这个时候网络数据的处理就开始了。让我们来看看net_rx_action的processing loop的各个部分,了解一下它是如何工作的。
net_rx_action processing loop
net_rx_action从被DAM写入的packet所在的内存开始处理。该函数会遍历在当前CPU上排队的NAPI结构,依次取下每个结构并进行处理。processing loop指定了NAPI的poll函数所能进行的工作量以及消耗的工作时间。它通过如下两种方式实现:
- 通过追踪budget(可以调整)
- 检查经过的时间
while (!list_empty(&sd->poll_list)) {
struct napi_struct *n;
int work, weight; /* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
goto softnet_break;
这就是内核如何防止packet processing一直独占整个CPU。其中的budget是该CPU上每个NAPI结构的预算的总和。这就是为什么multiqueue网卡要小心地调节IRQ affinity的原因。我们直到,处理从设备发出的IRQ的CPU也会被用来处理对应的softirq handler,因此也会成为处理上述循环和budget computation的CPU。
有着multiqueue网卡的系统可能会出现这种情况,多个NAPI结构被注册到了同一个CPU上。所有的NAPI结构的处理都会消耗同一个CPU的budget。
如果你没有足够的CPU去分发网卡的IRQ,你可以考虑增加net_rx_action的budget从而允许每个CPU能处理更多的packet。增加budget会增加CPU的使用率,但是可以减小延时,因为数据处理地更及时(但是CPU的处理时间仍然是2 jiffies,不管budget是多少)。
NAPI poll function and weight
我们已经知道网卡驱动调用netif_napi_add注册poll函数。在上文中我们已经看到,igb驱动中有如下这段代码:
/* initialize NAPI */
netif_napi_add(adapter->netdev, &q_vector->napi, igb_poll, 64);
它给NAPI结构的weight赋值为64。我们现在就会看到它是如何在net_rx_action processing loop中使用的。
weight = n->weight; work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight);
trace_napi_poll(n);
} WARN_ON_ONCE(work > weight); budget -= work;
首先从NAPI结构中获取weight(此处是64),然后将它传递给同样注册到NAPI结构中的poll函数(此处为igb_poll)。poll函数会返回已经被处理的帧数,并保存在work中,之后它将从budget中减去。因此,假设:
- 你的驱动使用的weight是64
- 你的budget设置的是300
你的系统将会在如下任意一种情况发生时,停止处理数据;
- igb_poll函数最多被调用5次(如果没有数据处理还会更少)
- 消耗了2 jiffies的时间
The NAPI / network device driver contract
NAPI子系统和设备驱动的交互中还未提及的一部分就是关闭NAPI的条件,包含的内容如下:
- 如果驱动的poll函数消耗完了它的weight,它一定不能改变NAPI的状态。net_rx_action的循环会继续进行
- 如果驱动的poll函数没有消耗完它所有的weight,它必须关闭NAPI。NAPI会在下次收到IRQ的时候重新启动并且驱动的IRQ handler会调用napi_schedule函数
我们先来看看net_rx_action如何处理第一种情况
Finishing the net_rx_action loop
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(work == weight)) {
if (unlikely(napi_disable_pending(n))) {
local_irq_enable();
napi_complete(n);
local_irq_disable();
} else {
if (n->gro_list) {
/* flush too old packets
* If HZ < 1000, flush all packets.
*/
local_irq_enable();
napi_gro_flush(n, HZ >= 1000);
local_irq_disable();
}
list_move_tail(&n->poll_list, &sd->poll_list);
}
}
如果所有的work都被消耗完了,net_rx_action需要处理以下两种情况:
- 网络设备需要被关闭(因为用户运行了ifconfig eth0 down)
- 如果设备没有被关闭,检查是否存在generic receive offload(GRO)list。如果time tick rate 大于1000,所有最近更新的GRO'd network flow都会被清除。下面我们会详细介绍GRO。将NAPI结构移到列表的尾端,并迭代至下一个NAPI运行
这就是packet processing loop如何调用驱动注册的poll函数来处理数据。我们很快将会看到,poll函数将会获取数据并将它传递到协议栈进行处理。
Exiting the loop when limits are reached
当下列情况发生时,net_rx_action的循环将会退出:
- 该CPU的poll list已经没有更多的NAPI结构了(!list_empty(&sd->poll_list))
- 剩下的budget小于等于0
- 2 jiffies的时间限制到了
/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
goto softnet_break;
softnet_break:
sd->time_squeeze++;
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
goto out;
struct softnet_data结构中的某些统计数据增加了并且softirq的NET_RX_SOFTIRQ被关闭了。其中的time_squeeze域是用来测量这样一个数据:net_rx_action还有很多工作要做,但是要么因为budget用完了,要么超时了,此类情况发生的次数。这些统计数据对于了解网络的瓶颈是非常有用的。NET_RX_SOFTIRQ被关闭从而能给其他任务腾出时间。这一小段代码的意义是,尽管还有很多工作要做,但是我们不想再独占CPU了,
执行流接着被传递给了out。当没有更多的NAPI结构需要处理,换句话说,budget比network activity更多,所有的驱动都已经关闭了NAPI,net_rx_action无事可做的时候,也会运行到out。
out段代码在从net_rx_action返回之前做了一件重要的事情:调用net_rps_action_and_irq_enable。它在Receive Packet Steering使能的情况下有着重要的作用;它会唤醒远程的CPU用于处理网络数据。
我们将在之后更多地了解RPS是如何工作的。现在让我们先走进NAPI poll函数的内部,这样我们就能向上进入网络栈了。
NAPI poll
我们已经知道设备驱动程序申请了一块内存用于让设备DMA到达packet。驱动有责任申请这些区域,同样也有责任unmap those regions,获取其中的数据并且将它发往网络栈。让我们通过观察igb driver是如何完成这些工作的,从而了解在实际过程中这些步骤是如何完成的。
igb_poll
/**
* igb_poll - NAPI Rx polling callback
* @napi: napi polling structure
* @budget: count of how many packets we should handle
**/
static int igb_poll(struct napi_struct *napi, int budget)
{
struct igb_q_vector *q_vector = container_of(napi,
struct igb_q_vector,
napi);
bool clean_complete = true; #ifdef CONFIG_IGB_DCA
if (q_vector->adapter->flags & IGB_FLAG_DCA_ENABLED)
igb_update_dca(q_vector);
#endif /* ... */ if (q_vector->rx.ring)
clean_complete &= igb_clean_rx_irq(q_vector, budget); /* If all work not completed, return budget and keep polling */
if (!clean_complete)
return budget; /* If not enough Rx work done, exit the polling mode */
napi_complete(napi);
igb_ring_irq_enable(q_vector); return 0;
}
上述代码干了如下这些有趣的事情:
- 如果内核支持Direct Cache Access(DCA),那么CPU cache就是热的,对于RX ring的访问就会命中CPU cache
- 接着调用igb_clean_rx_queue进行具体的操作
- 检查clean_complete确认是否还有更多工作要做。如果有的话,返回budget。如上文所述,net_rx_action会将该NAPI结构移到poll list的尾端
- 否则,驱动会通过调用napi_complete关闭NAPI并且通过调用igb_ring_irq_enable重新开启中断。下一个中断的到来又会开启NAPI
让我们来看看igb_clean_rx_irq是如何将数据送往协议栈的
igb_clean_rx_irq
igb_clean_rx_irq函数是一个循环,它一次处理一个packet直到到达budget或者没有多余的数据需要处理了。
函数中的循环干了如下这些非常重要的事情:
- 申请额外的缓存来接收数据,因为被使用的缓存已经被清除出去了,每次新加
IGB_RX_BUFFER_WRITE(16)
- 从receive queue中获取缓存并将它存储在skb结构中
- 检查缓存是不是"End of Packet"。如果是的话,接着进行处理。否则接着从receive queue中获取缓存,将它们加入skb。这是必要的,因为接收到的数据帧可能比缓存大
- 确认数据的分布和头部是否正确
- 处理的字节数被保存在skb->len
- 设置skb的hash,checksum,timestamp,VLAN id和protocol field。hash,checksum,timestamp,VLAN id都是由硬件提供的。如果硬件声明了一个checksum error,csum_error就会增加。如果checksum成功了,并且数据是UDP或TCP数据,那么该skb就被标记为CHECKSUM_UNNECESSARY。如果checksum失败了,就交由协议栈进行处理。protocol通过调用eth_type_trans计算并且被存放在skb结构中
- 组织好的skb结构通过调用napi_gro_receive被传递给网络栈
- 已处理包的统计数据增加
- 循环继续,直到处理包的数目达到budget
一旦循环结束,函数会将收到的packet数和字节数加到统计数据中。
接着我们首先来聊一聊Generic Receive Offloading(GRO),之后再进入函数napi_gro_receive
Generic Receive Offloading(GRO)
Generic Receive Offloading(GRO)是硬件层面的优化Large Receive Offloading(LRO)的软件实现。这两种方法的核心思想都是通过将"类似"的包组合起来以减少传输给网络栈的包的数量,从而减少CPU的使用。例如我们要传输一个大文件,其中有许多包都包含的都是文件中的数据块。显然,我们可以不用每次都将一个small packet发往网络栈,而是将这些包组合起来,增大负载,最后让这个组合起来的包发往协议栈。这就可以让协议层只处理一个包的头部,就能传输更多的数据到用户空间。
但是这类优化的最大问题就是,信息丢失。如果一个packet中设置了一些重要的选项或者标志,如果将这个包和其他包合并,这些选项或者标志就会丢失。这也就是为什么很多人都不建议使用LRO的原因。事实上,LRO对于合并包的规则的定义是非常宽松的。
GRO作为LRO的硬件实现被引入,但是对于哪些包可以组合有着更为严格的规则
如果你有使用过tcpdump并且看到了一些大的不可思议的包,那么很有可能你的系统已经打开了GRO。你很快就能看到,抓包工具进行抓包的位置是在GRO发生之后,在协议栈的更上层。
napi_gro_receive
函数napi_gro_receive用于处理网络数据的GRO操作(如果GRO打开的话)并将数据传送到协议栈。而一个叫做dev_gro_receive的函数处理了其中的大部分逻辑
dev_gro_receive
这个函数首先检查GRO是否打开,如果打开的话,则准备进行GRO操作。当GRO打开时,首先会遍历一系列的GRO offload filter从而让上层的协议栈对要进行GRO的数据进行处理。这样协议层就能让设备层知道,该packet是否属于正在处理的network flow以及处理一些对于GRO所需要做的特定于协议的事情。例如,TCP协议需要知道是否或者何时需要给一个已经组合到现有packet的packet发送ACK
list_for_each_entry_rcu(ptype, head, list) {
if (ptype->type != type || !ptype->callbacks.gro_receive)
continue; skb_set_network_header(skb, skb_gro_offset(skb));
skb_reset_mac_len(skb);
NAPI_GRO_CB(skb)->same_flow = 0;
NAPI_GRO_CB(skb)->flush = 0;
NAPI_GRO_CB(skb)->free = 0; pp = ptype->callbacks.gro_receive(&napi->gro_list, skb);
break;
}
如果协议层认为是时候清除GRO packet了,之后就会调用napi_gro_complete进行处理,之后它就会调用协议层对应的gro_complete,最后再调用netif_receive_skb将包传送给网络栈
if (pp) {
struct sk_buff *nskb = *pp; *pp = nskb->next;
nskb->next = NULL;
napi_gro_complete(nskb);
napi->gro_count--;
}
如果协议层将packet合并进existing flow,napi_gro_receive就会直接返回。如果packet没有被合并,并且现在的GRO flow小于MAX_GRO_SKBS,之后就会在该NAPI的gro_list新增一个条目
if (NAPI_GRO_CB(skb)->flush || napi->gro_count >= MAX_GRO_SKBS)
goto normal; napi->gro_count++;
NAPI_GRO_CB(skb)->count = 1;
NAPI_GRO_CB(skb)->age = jiffies;
skb_shinfo(skb)->gso_size = skb_gro_len(skb);
skb->next = napi->gro_list;
napi->gro_list = skb;
ret = GRO_HELD;
Linux网络栈中的GRO系统就是这样工作的
napi_skb_finish
一旦dev_gro_receive运行完成,napi_skb_finish就会被调用,要不就是释放因为包已经被合并就没用了的数据结构,要么调用netif_receive_skb将数据传输给网络栈(因为现在已经有MAX_GRO_SKBS个flow了)。现在是时候看看netif_receive_skb是如何将数据传输给协议层了。但是在此之前,我们先来看看什么是Receive Packet Steering(RPS)
Receive Packet Steering(RPS)
我们已经知道每个网络设备驱动都注册了一个NAPI poll函数。每个NAPI poller实例都执行在每个CPU的softirq上下文中。而处理驱动的IRQ handler的CPU会唤醒它的softirq processing loop去处理包。换句话说:处理硬件中断的CPU也会用于poll相应的输入数据。
有的硬件(例如Intel I350)在硬件层面支持multiple queue。这意味着输入的数据会被分流到不同的receive queue,并被DMA到不同的内存区域,从而会有不同的NAPI结构处理对应的区域。从而能让多个CPU并行地处理来自设备的中断并且对数据进行处理。
这个特性我们就称作Receive Side Scaling(RSS)
而Receive Packet Steering(RPS)是RSS的软件实现。因为它是由软件实现的,因此它可以用于任何网卡,即使是那些只有一个receive queue的网卡。然而,同样因为是软件层面的实现,RPS只能在包从DMA内存区域中取出之后,才能对它进行处理。这意味着,你并不会看到CPU使用在处理IRQ或者NAPI poll loop的时间下降,但是你可以从获取到包之后,对它进行负载均衡,并且从此处开始,到协议层向上减少CPU时间。
RPS通过对输入的数据计算出一个哈希值确定该由哪个CPU对其进行处理。之后,该数据会被排入每个CPU的receive network backlog等待处理。一个Inter-processor Interrupt(IPI)会被发往拥有该backlog的CPU。这会帮助触发backlog的处理,如果它当前仍未进行处理的话。/proc/net/softnet_stat中包含了每一个softnet_data中接收到的IPI的次数
因此,netif_receive_skb要么会接着将数据送往网络栈,要么就会通过RPS将它发往其他CPU进行处理
Receive Flow Steering(RFS)
Receive Flow Steering(RFS)通常会和RPS混合使用。RPS会将输入数据在多个CPU之间进行负载均衡,但是它并不会考虑局部性从而最大化CPU cache的命中率。你可以使用RFS将属于同一个flow的数据送往同一个CPU处理,从而提高cache命中率。
Hardware accelerated Receive Flow Steering(aRFS)
RFS可以使用hardware acceleration来加速。网卡和内核可以联合起来,共同决定哪个flow需要发往哪个CPU进行处理。为了使用这一特性,你的网卡和驱动必须对它支持。如果你的网卡驱动有一个叫做ndo_rx_flow_steer的函数,那么该驱动支持accelerated RFS。
Moving up the network stack with netif_receive_skb
netif_receive_skb会在以下两个地方被调用:
- napi_skb_finish,如果packet没有被合并进已经存在的GRO flow
- napi_gro_complete,如果协议层表示是时候传输这个flow了
需要注意的是netif_receive_skb以及它后续调用的函数都是在softirq processing loop的上下文中进行的。netif_receive_skb首先检查一个sysctl的值用来确认用户是否要求在packet进入backlog queue之前或之后加入receive timestamp。如果有设置的话,现在就对该数据进行timestamp,在进行RPS之前。如果未被设置,则会在它加入队列之后在打timestamp。这可以将timestamp造成的负载在多个CPU间进行均衡,不过同样会引入延迟
netif_receive_skb
当timestamp被处理完之后,netif_receive_skb会根据RPS是否可用进行不同的操作。让我们先从最简单的开始:RPS不可用
Without RPS(default setting)
如果RPS不可用,首先会调用__netif_receive_skb做一些bookkeeping接着再调用__netif_receive_skb_core将数据移往协议栈。我们很快就会看到__netif_receive_skb_core是如何工作的,不过在此之前,我们先来看看RPS可用时的传输路径是怎样的,因为该代码同样会调用__netif_receive_skb_core。
With RPS enabled
如果RPS可用的话,在timestamp选项被处理完之后,netif_receive_skb会进行一些计算用于决定该使用哪个CPU的backlog queue。这是通过函数get_rps_cpu完成的
cpu = get_rps_cpu(skb->dev, skb, &rflow); if (cpu >= 0) {
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
return ret;
}
get_rps_cpu会将上文所述的RFS和aRFS都考虑在内,并调用enqueue_to_backlog将数据加入相应的CPU的backlog queue
enqueue_to_backlog
该函数首先获取远程CPU的softnet_data结构的指针,其中包含了一个指向input_pkt_queue的指针。接着,获取远程CPU的input_pkt_queue的队列长度
qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
input_pkt_queue的长度首先和netdev_max_backlog相比较。如果队列的长度大于该值,则数据被丢弃。同样flow limit也会被检查,如果超过了,数据同样会被丢弃。这两种情况下,softnet_data节后中丢弃包的数目都将增加。注意这里的softnet_data是数据将要发往的CPU的。
enqueue_to_backlog并不会在很多地方被调用。它只会在RPS可用的包处理过程中或者netif_rx中。许多驱动不应该使用netif_rx,而应该使用netif_receive_skb。如果你不使用RPS或者你的驱动不使用netif_rx,那么增加backlog不会对你的系统产生任何影响,因为它根本就没被用到。(如果你的驱动使用netif_receive_skb并且未使用RPS,那么增加netdev_max_backlog不会产生任何性能上的提高,因为没有数据会被加入到input_pkt_queue中)
如果input_pkt_queue足够小,而也没有超过flow limit,数据就会被加入队列。大概的逻辑如下:
- 如果队列为空,检查远端CPU的NAPI是否启动。如果没有,检查是否有IPI准备发送。如果没有,则准备一个并且通过调用__napi_schedule启动NAPI processing loop,用于处理数据
- 如果队列不为空,或者上述操作都已做完,则将数据加入队列
if (skb_queue_len(&sd->input_pkt_queue)) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags);
return NET_RX_SUCCESS;
} /* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
Flow limits
RPS会将包分发到多个CPU进行处理,不过一个large flow很可能会占据整个CPU,从而让其他small flow处于饥饿状态。flow limit能够让每个flow添加到backlog中的包的数目有一个最大值。这个特性可以帮助small flow同样能够得到处理,即使有larger flow的包也在入队
backlog queue NAPI poller
每个CPU的backlog queue以和设备驱动程序一样的方式插入NAPI。一个poll函数用于处理来自softirq上下文的包,同样还提供了一个weight。这个NAPI结构在网络系统初始化的时候被处理:
sd->backlog.poll = process_backlog;
sd->backlog.weight = weight_p;
sd->backlog.gro_list = NULL;
sd->backlog.gro_count = 0;
backlog的NAPI结构和驱动的NAPI有所不同,它的weight参数是可以调节的,而驱动程序则会将它们的NAPI weight硬编码为64。
process_backlog
process_backlog函数是一个循环,直到它的weight耗尽或者backlog中没有其他数据需要处理。每一个在backlog中的数据都将从backlog queue传输到__netif_receive_skb。一旦数据到达__netif_receive_skb之后,它的传输路径就和RPS不可用时一样了。__netif_receive_skb只是在调用__netif_receive_skb_core将数据传输到协议栈之前做一些bookkeeping。
process_backlog和驱动程序使用NAPI的方式相同:如果weight没用完,那么关闭NAPI。而poller在enqueue_to_backlog调用__napi_schedule之后被重新启动。
该函数会返回已经完成的工作量,net_rx_action会将它从budget中减去
__netif_receive_skb_core delivers data to packet taps and protocol layer
__netif_receive_skb_core用于完成将数据传往网络栈的工作。在此之前,它先确认是否安装了packet taps用来抓取输入的包。其中一个例子就是libcap使用的AF_PACKET address family。如果有这样的tap存在,则数据先被发往tap,在被发往协议层。
Packet tap delivery
如果安装了packet tap,则包将安装以下代码被发送:
list_for_each_entry_rcu(ptype, &ptype_all, list) {
if (!ptype->dev || ptype->dev == skb->dev) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
}
Protocol layer delivery
一旦tap处理完成之后,__netif_receive_skb_core会将数据发往协议层。先从数据中获取protocol field,然后再遍历一系列该协议类型对应的deliver functions
type = skb->protocol;
list_for_each_entry_rcu(ptype,
&ptype_base[ntohs(type) & PTYPE_HASH_MASK], list) {
if (ptype->type == type &&
(ptype->dev == null_or_dev || ptype->dev == skb->dev ||
ptype->dev == orig_dev)) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
}
上文中的ptype_base是一个如下所示的哈希表:
struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;
每个协议层都会在哈希表给定的slot中加入一个filter,通过如下的ptype_head函数计算:
static inline struct list_head *ptype_head(const struct packet_type *pt)
{
if (pt->type == htons(ETH_P_ALL))
return &ptype_all;
else
return &ptype_base[ntohs(pt->type) & PTYPE_HASH_MASK];
}
将filter加入list的操作是由dev_add_pack完成的。这就是协议层如何注册自己,从而获取发往它们的数据的方法。
现在我们就知道了数据如何从网卡发往协议层
Protocol layer registration
现在我们已经知道了数据如何从网络设备发往协议栈,下面我们就来看看协议层是如何注册自己的。
IP protocol layer
IP协议层会先把自己注册到ptype_base这个哈希表中,从而让数据能够从网络设备发往它
dev_add_pack(&ip_packet_type);
static struct packet_type ip_packet_type __read_mostly = {
.type = cpu_to_be16(ETH_P_IP),
.func = ip_rcv,
};
__netif_receive_skb_core会调用deliver_skb,而它最终会调用func(在这里,即为ip_rcv)
ip_rcv
ip_rcv的操作非常直接,首先对数据进行检查,然后更新一些统计数据。ip_rcv会最终通过netfilter将packet发往ip_rcv_finish,从而让那些iptables中匹配IP协议层的规则能够对数据进行处理
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL, ip_rcv_finish);
需要注意的是,如果你有非常多,非常复杂的netfilter或者iptables规则,这些规则都会在softirq上下文中执行,从而导致网络栈的延时,而这往往是不可避免的。
ip_rcv_finish
当netfilter并没有把包丢弃时,就会调用ip_rcv_finish。ip_rcv_finish开始就有一个优化,为了将包传输到合适的地方,首先要从路由系统中获取dst_entry。因此,首先需要调用该数据发往的高层协议的early_demux。early_demux首先会判断是否有dst_entry缓存在socket结构中
if (sysctl_ip_early_demux && !skb_dst(skb) && skb->sk == NULL) {
const struct net_protocol *ipprot;
int protocol = iph->protocol; ipprot = rcu_dereference(inet_protos[protocol]);
if (ipprot && ipprot->early_demux) {
ipprot->early_demux(skb);
/* must reload iph, skb->head might have changed */
iph = ip_hdr(skb);
}
}
我们可以看到这部分代码是由sysctl_ip_early_demux控制的。early_demux默认是打开的。如果该优化是打开的,并且没有cached entry(因为这是第一个到达的packet),则这个packet会被发往路由系统,在那能够获取dst_entry。
一旦路由系统工作完毕之后,就会更新计数器,然后再调用dst_input(skb),它转而会调用刚刚获取的dst_entry结构中的input function pointer。
如果packet的最终目的地是本地,那么路由系统就会将ip_local_deliver赋值给dst_entry中的input function pointer。
ip_local_deliver
/*
* Deliver IP Packets to the higher protocol layers.
*/
int ip_local_deliver(struct sk_buff *skb)
{
/*
* Reassemble IP fragments.
*/ if (ip_is_fragment(ip_hdr(skb))) {
if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
} return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL,
ip_local_deliver_finish);
}
和ip_rcv_finish类似,netfilter会先对packet进行检查,若未被丢弃,则调用ip_local_deliver_finish
ip_local_deliver_finish
ip_local_deliver_finish从packet中获取protocol,然后查询该protocol注册的net_protocol结构,接着再调用该net_protocol结构中的handler函数指针。这就将packet发往更高的协议层了。
Higher level protocol registration
本篇文章主要分析UDP,但是TCP protocol handler和UDP protocol handler的注册方式是相同的。在net/ipv4/af_inet.c中的函数定义包含了用于UDP,TCP和ICMP协议和IP协议层进行连接的处理函数
static const struct net_protocol tcp_protocol = {
.early_demux = tcp_v4_early_demux,
.handler = tcp_v4_rcv,
.err_handler = tcp_v4_err,
.no_policy = 1,
.netns_ok = 1,
}; static const struct net_protocol udp_protocol = {
.early_demux = udp_v4_early_demux,
.handler = udp_rcv,
.err_handler = udp_err,
.no_policy = 1,
.netns_ok = 1,
}; static const struct net_protocol icmp_protocol = {
.handler = icmp_rcv,
.err_handler = icmp_err,
.no_policy = 1,
.netns_ok = 1,
};
这些结构都在inet address family的初始化代码中被注册
/*
* Add all the base protocols.
*/ if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
pr_crit("%s: Cannot add ICMP protocol\n", __func__);
if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
pr_crit("%s: Cannot add UDP protocol\n", __func__);
if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
pr_crit("%s: Cannot add TCP protocol\n", __func__);
我们关注的是UDP协议层,因此对应的处理函数是udp_rcv。这就是数据从IP层通往UDP层的入口
UDP protocol layer
udp_rcv
udp_rcv函数只有一行代码用于直接调用__udp4_lib_rcv用于接收数据
__udp4_lib_rcv
__udp4_lib_rcv函数会检查packet是否合法,接着再获取UDP header,UDP数据报长度,源地址,目的地址,然后是一些完整性检查和checksum verification。
之前在IP层的时候,我们已经看到在将包传送到上层协议之前会将dst_entry和packet相绑定。如果socket和对应的dst_entry已经找到了,那么__udp4_lib_rcv会将包存入socket:
sk = skb_steal_sock(skb);
if (sk) {
struct dst_entry *dst = skb_dst(skb);
int ret; if (unlikely(sk->sk_rx_dst != dst))
udp_sk_rx_dst_set(sk, dst); ret = udp_queue_rcv_skb(sk, skb);
sock_put(sk);
/* a return value > 0 means to resubmit the input, but
* it wants the return to be -protocol, or 0
*/
if (ret > 0)
return -ret;
return 0;
} else {
如果在之前的early_demux操作中没有找到socket,那么就会调用__udp4_lib_lookup_skb对receiving socket进行查找。无论上述哪种情况,最终数据将被存入socket:
ret = udp_queue_rcv_skb(sk, skb);
sock_put(sk);
如果没有找到socket,那么数据报将被丢弃:
/* No socket. Drop packet silently, if checksum is wrong */
if (udp_lib_checksum_complete(skb))
goto csum_error; UDP_INC_STATS_BH(net, UDP_MIB_NOPORTS, proto == IPPROTO_UDPLITE);
icmp_send(skb, ICMP_DEST_UNREACH, ICMP_PORT_UNREACH, 0); /*
* Hmm. We got an UDP packet to a port to which we
* don't wanna listen. Ignore it.
*/
kfree_skb(skb);
return 0;
udp_queue_rcv_skb
这个函数的初始部分如下所示:
- 判断该socket是不是一个encapsulation socket,如果是的话,在继续处理前,将packet传送给本层的处理函数。
- 确定该包是不是UPD-Lite数据包并做一些完整性检查
- 检查UDP checksum,如果失败的话则丢弃
最终我们到达了处理receive queue的逻辑,首先检查socket对应的receive queue是不是已经满了:
if (sk_rcvqueues_full(sk, skb, sk->sk_rcvbuf))
goto drop;
sk_rcvqueue_full
sk_rcvqueue_full函数会检查socket的backlog长度以及socket的sk_rmem_alloc来确认它们的和是否大于socket的sk_rcvbuf
/*
* Take into account size of receive queue and backlog queue
* Do not take into account this skb truesize,
* to allow even a single big packet to come.
*/
static inline bool sk_rcvqueues_full(const struct sock *sk, const struct sk_buff *skb,
unsigned int limit)
{
unsigned int qsize = sk->sk_backlog.len + atomic_read(&sk->sk_rmem_alloc); return qsize > limit;
}
udp_queue_rcv_skb
一旦证明队列未满,则会继续将数据加入队列
bh_lock_sock(sk);
if (!sock_owned_by_user(sk))
rc = __udp_queue_rcv_skb(sk, skb);
else if (sk_add_backlog(sk, skb, sk->sk_rcvbuf)) {
bh_unlock_sock(sk);
goto drop;
}
bh_unlock_sock(sk); return rc;
第一步先判断socket当前是否被用户进程占用。如果不是,则调用__udp_queue_rcv_skb将数据加入receive queue。如果是,则通过调用sk_add_backlog将数据加入backlog。backlog中的数据最终都会加入receive queue,socket相关的系统调用通过调用release_sock释放了该socket
__udp_queue_rcv_skb
__udp_queue_rcv_skb通过调用sock_queue_rcv_skb将数据加入receive queue,如果该数据不能被加入receive queue,则更新统计数据
rc = sock_queue_rcv_skb(sk, skb);
if (rc < 0) {
int is_udplite = IS_UDPLITE(sk); /* Note that an ENOMEM error is charged twice */
if (rc == -ENOMEM)
UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_RCVBUFERRORS,is_udplite); UDP_INC_STATS_BH(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
kfree_skb(skb);
trace_udp_fail_queue_rcv_skb(rc, sk);
return -1;
}
Queuing data to a socket
现在数据已经通过调用sock_queue_rcv加入socket的队列了。这个函数在将数据加入队列前做了如下的操作:
- 判断socket申请的内存数量是否超过了receive buffer size,如果是的话,socket丢弃包的计数器将会增加
- sk_filter用于处理任何施加到该socket的Berkeley Packet Filter
- 运行sk_rmem_schedule确保有足够的receive buffer space用于接收数据
- 调用skb_set_owner_r,增加sk->sk_rmem_alloc
- 通过调用__skb_queue_tail将数据加入队列
- 最终,那些监听该socket的进程都会收到通知,从而调用sk_data_ready函数
以上就是数据如何到达系统,并通过整个协议栈到达socket并准备给用户进程使用的过程
原文链接:
https://blog.packagecloud.io/eng/2016/06/22/monitoring-tuning-linux-networking-stack-receiving-data/