一、前言
前一节我们知道了可以用下面几个函数来创建workqueue。
#define alloc_ordered_workqueue(fmt, flags, args...) \
alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | \
__WQ_ORDERED_EXPLICIT | (flags), 1, ##args)
#define create_workqueue(name) \
alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, 1, (name))
#define create_freezable_workqueue(name) \
alloc_workqueue("%s", __WQ_LEGACY | WQ_FREEZABLE | WQ_UNBOUND | \
WQ_MEM_RECLAIM, 1, (name))
#define create_singlethread_workqueue(name) \
alloc_ordered_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, name)
见得分析上面函数,可以看到调用都是alloc_workqueue,它的代码如下:
#define alloc_workqueue(fmt, flags, max_active, args...) \
__alloc_workqueue_key((fmt), (flags), (max_active), \
NULL, NULL, ##args)
可以看到根本的调用都是__alloc_workqueue_key来实现的
本文主要以__alloc_workqueue_key函数为主线,描述CMWQ中的创建一个workqueue实例的代码过程。
二、工作、工作队列、工作线程池、工作线程数据结构
workqueue机制最小的调度单元是work_struct ,即工作任务
struct work_struct {
atomic_long_t data; //低比特位部分是work的标志位,剩余比特位通常用于存放上一次运行的worker_pool ID或pool_workqueue的指针。存放的内容由WORK_STRUCT_PWQ标志位来决定
struct list_head entry; //用于把work挂到工作队列上
work_func_t func; //作任务的处理函数
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
};
工作队列由struct workqueue_struct数据结构描述:
/*
* The externally visible workqueue. It relays the issued work items to
* the appropriate worker_pool through its pool_workqueues.
*/
struct workqueue_struct {
struct list_head pwqs; /* WR: all pwqs of this wq 该workqueue所在的所有pool_workqueue链表 */
struct list_head list; /* PR: list of all workqueues 系统所有workqueue_struct的全局链表*/
struct mutex mutex; /* protects this wq */
int work_color; /* WQ: current work color */
int flush_color; /* WQ: current flush color */
atomic_t nr_pwqs_to_flush; /* flush in progress */
struct wq_flusher *first_flusher; /* WQ: first flusher */
struct list_head flusher_queue; /* WQ: flush waiters */
struct list_head flusher_overflow; /* WQ: flush overflow list */
struct list_head maydays; /* MD: pwqs requesting rescue 所有rescue状态下的pool_workqueue数据结构链表 */
struct worker *rescuer; /* I: rescue workerrescue内核线程,内存紧张时创建新的工作线程可能会失败,如果创建workqueue是设置了WQ_MEM_RECLAIM,那么rescuer线程会接管这种情况。 */
int nr_drainers; /* WQ: drain in progress */
int saved_max_active; /* WQ: saved pwq max_active */
struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs UNBOUND类型属性 */
struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs unbound类型的pool_workqueue */
#ifdef CONFIG_SYSFS
struct wq_device *wq_dev; /* I: for sysfs interface */
#endif
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
#endif
char name[WQ_NAME_LEN]; /* I: workqueue name 该workqueue的名字 */
/*
* Destruction of workqueue_struct is sched-RCU protected to allow
* walking the workqueues list without grabbing wq_pool_mutex.
* This is used to dump all workqueues from sysrq.
*/
struct rcu_head rcu;
/* hot fields used during command issue, aligned to cacheline */
unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags 经常被不同CUP访问,因此要和cache line对齐 */
struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs 指向per cpu的pool workqueue */
struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node 指向per node的pool workqueue */
};
运行work_struct的内核线程被称为worker,即工作线程。
/*
* The poor guys doing the actual heavy lifting. All on-duty workers are
* either serving the manager role, on idle list or on busy hash. For
* details on the locking annotation (L, I, X...), refer to workqueue.c.
*
* Only to be used in workqueue and async.
*/
struct worker {
/* on idle list while idle, on busy hash table while busy */
union {
struct list_head entry; /* L: while idle */
struct hlist_node hentry; /* L: while busy */
};
struct work_struct *current_work; /* L: work being processed 当前正在处理的work */
work_func_t current_func; /* L: current_work's fn 当前正在执行的work回调函数 */
struct pool_workqueue *current_pwq; /* L: current_work's pwq 当前work所属的pool_workqueue */
struct list_head scheduled; /* L: scheduled works 所有被调度并正准备执行的work_struct都挂入该链表中 */
/* 64 bytes boundary on 64bit, 32 on 32bit */
struct task_struct *task; /* I: worker task 该工作线程的task_struct数据结构 */
struct worker_pool *pool; /* A: the associated pool 该工作线程所属的worker_pool */
/* L: for rescuers */
struct list_head node; /* A: anchored at pool->workers 可以把该worker挂入到worker_pool->workers链表中 */
/* A: runs through worker->node */
unsigned long last_active; /* L: last active timestamp */
unsigned int flags; /* X: flags */
int id; /* I: worker id */
/*
* Opaque string set with work_set_desc(). Printed out with task
* dump for debugging - WARN, BUG, panic or sysrq.
*/
char desc[WORKER_DESC_LEN];
/* used only by rescuers to point to the target workqueue */
struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */
};
CMWQ提出了工作线程池的概念,struct worker_pool数据结构用于描述工作线程池。
worker_pool是per-cpu变量,每个CPU都有worker_pool,而且有两个worker_pool。
一个用于普通优先级工作线程,另一个用于高优先级工作线程。
/*
* Structure fields follow one of the following exclusion rules.
*
* I: Modifiable by initialization/destruction paths and read-only for
* everyone else.
*
* P: Preemption protected. Disabling preemption is enough and should
* only be modified and accessed from the local cpu.
*
* L: pool->lock protected. Access with pool->lock held.
*
* X: During normal operation, modification requires pool->lock and should
* be done only from local cpu. Either disabling preemption on local
* cpu or grabbing pool->lock is enough for read access. If
* POOL_DISASSOCIATED is set, it's identical to L.
*
* A: wq_pool_attach_mutex protected.
*
* PL: wq_pool_mutex protected.
*
* PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads.
*
* PW: wq_pool_mutex and wq->mutex protected for writes. Either for reads.
*
* PWR: wq_pool_mutex and wq->mutex protected for writes. Either or
* sched-RCU for reads.
*
* WQ: wq->mutex protected.
*
* WR: wq->mutex protected for writes. Sched-RCU protected for reads.
*
* MD: wq_mayday_lock protected.
*/
/* struct worker is defined in workqueue_internal.h */
struct worker_pool {
spinlock_t lock; /* the pool lock 用于保护worker_pool的自旋锁 */
int cpu; /* I: the associated cpu对于unbound类型为-1;对于bound类型workqueue表示绑定的CPU ID */
int node; /* I: the associated node ID */
int id; /* I: pool ID 该worker_pool的ID号 */
unsigned int flags; /* X: flags */
unsigned long watchdog_ts; /* L: watchdog timestamp */
struct list_head worklist; /* L: list of pending works 挂入pending状态的work_struct */
int nr_workers; /* L: total number of workers 工作线程的数量 */
int nr_idle; /* L: currently idle workers 处于idle状态的工作线程的数量 */
struct list_head idle_list; /* X: list of idle workers 处于idle状态的工作线程链表 */
struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for workers */
/* a workers is either on busy_hash or idle_list, or the manager */
DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
/* L: hash of busy workers */
struct worker *manager; /* L: purely informational */
struct list_head workers; /* A: attached workers 该worker_pool管理的工作线程链表 */
struct completion *detach_completion; /* all workers detached */
struct ida worker_ida; /* worker IDs for task name */
struct workqueue_attrs *attrs; /* I: worker attributes 工作线程属性 */
struct hlist_node hash_node; /* PL: unbound_pool_hash node */
int refcnt; /* PL: refcnt for unbound pools */
/*
* The current concurrency level. As it's likely to be accessed
* from other CPUs during try_to_wake_up(), put it in a separate
* cacheline.
* 用于管理worker的创建和销毁的统计计数,表示运行中的worker数量。该变量可能被多CPU同时访问,因此独占一个缓存行
*/
atomic_t nr_running ____cacheline_aligned_in_smp;
/*
* Destruction of pool is sched-RCU protected to allow dereferences
* from get_work_pool().
*/
struct rcu_head rcu;
} ____cacheline_aligned_in_smp;
struct pool_workqueue用于链接workqueue和worker_pool。
/*
* The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS
* of work_struct->data are used for flags and the remaining high bits
* point to the pwq; thus, pwqs need to be aligned at two's power of the
* number of flag bits.
*/
struct pool_workqueue {
struct worker_pool *pool; /* I: the associated pool 指向worker_pool结构 */
struct workqueue_struct *wq; /* I: the owning workqueue 指向workqueue_struct结构 */
int work_color; /* L: current color */
int flush_color; /* L: flushing color */
int refcnt; /* L: reference count */
int nr_in_flight[WORK_NR_COLORS];
/* L: nr of in_flight works */
int nr_active; /* L: nr of active works 活跃的work_strcut数量 */
int max_active; /* L: max active works 最大活跃work_struct数量 */
struct list_head delayed_works; /* L: delayed works 延迟执行work_struct链表 */
struct list_head pwqs_node; /* WR: node on wq->pwqs */
struct list_head mayday_node; /* MD: node on wq->maydays */
/*
* Release of unbound pwq is punted to system_wq. See put_pwq()
* and pwq_unbound_release_workfn() for details. pool_workqueue
* itself is also sched-RCU protected so that the first pwq can be
* determined without grabbing wq->mutex.
*/
struct work_struct unbound_release_work;
struct rcu_head rcu;
} __aligned(1 << WORK_STRUCT_FLAG_BITS);
三、代码
首先列出这个函数的代码
struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
unsigned int flags,
int max_active,
struct lock_class_key *key,
const char *lock_name, ...)
{
size_t tbl_size = 0;
va_list args;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
/*
* Unbound && max_active == 1 used to imply ordered, which is no
* longer the case on NUMA machines due to per-node pools. While
* alloc_ordered_workqueue() is the right way to create an ordered
* workqueue, keep the previous behavior to avoid subtle breakages
* on NUMA.
*/
if ((flags & WQ_UNBOUND) && max_active == 1) //见下面分析 1
flags |= __WQ_ORDERED;
/* see the comment above the definition of WQ_POWER_EFFICIENT */
if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) //见下面分析2
flags |= WQ_UNBOUND;
/* allocate wq and format name */
if (flags & WQ_UNBOUND) //见下面分析3
tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]); //计算numa_pwq_tbl要占用的大小
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); //分配workqueue_struct
if (!wq)
return NULL;
if (flags & WQ_UNBOUND) {
wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL); //unbound类型的wq要有sttribute
if (!wq->unbound_attrs)
goto err_free_wq;
}
//分析4
va_start(args, lock_name);
vsnprintf(wq->name, sizeof(wq->name), fmt, args);
va_end(args);
max_active = max_active ?: WQ_DFL_ACTIVE;
max_active = wq_clamp_max_active(max_active, flags, wq->name);
/* init wq */
wq->flags = flags;
wq->saved_max_active = max_active;
mutex_init(&wq->mutex);
atomic_set(&wq->nr_pwqs_to_flush, 0);
INIT_LIST_HEAD(&wq->pwqs);
INIT_LIST_HEAD(&wq->flusher_queue);
INIT_LIST_HEAD(&wq->flusher_overflow);
INIT_LIST_HEAD(&wq->maydays);
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
INIT_LIST_HEAD(&wq->list);
//分析 5
if (alloc_and_link_pwqs(wq) < 0)
goto err_free_wq;
if (wq_online && init_rescuer(wq) < 0)
goto err_destroy;
if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
goto err_destroy;
/*
* wq_pool_mutex protects global freeze state and workqueues list.
* Grab it, adjust max_active and add the new @wq to workqueues
* list.
*/
mutex_lock(&wq_pool_mutex);
mutex_lock(&wq->mutex);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
mutex_unlock(&wq->mutex);
list_add_tail_rcu(&wq->list, &workqueues);
mutex_unlock(&wq_pool_mutex);
return wq;
err_free_wq:
free_workqueue_attrs(wq->unbound_attrs);
kfree(wq);
return NULL;
err_destroy:
destroy_workqueue(wq);
return NULL;
}
分析1:
if ((flags & WQ_UNBOUND) && max_active == 1)
flags |= __WQ_ORDERED;
对于最大worker为1且没绑定具体cpu的workqueue,系统也是默认整个workqueue是有序执行的。
虽然正确的使用有序工作队列应该使用下面的这个宏,但不能保证那个奇葩自己直接调用__alloc_workqueue_key函数,所以还是要在开始再判断一次。还有一种就是非统一内存访问的cpu也要强制加上这个标志,保证统一性。
#define alloc_ordered_workqueue(fmt, flags, args...) \
alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | \
__WQ_ORDERED_EXPLICIT | (flags), 1, ##args)
分析2:
/* see the comment above the definition of WQ_POWER_EFFICIENT */
if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
flags |= WQ_UNBOUND;
在kernel中,有两种线程池,一种是线程池是per cpu的,也就是说,系统中有多少个cpu,就会创建多少个线程池,cpu x上的线程池创建的worker线程也只会运行在cpu x上。另外一种是unbound thread pool,该线程池创建的worker线程可以调度到任意的cpu上去。由于cache locality的原因,per cpu的线程池的性能会好一些,但是对power saving有一些影响。设计往往如此,workqueue需要在performance和power saving之间平衡,想要更好的性能,那么最好让一个cpu上的worker thread来处理work,这样的话,cache命中率会比较高,性能会更好。但是,从电源管理的角度来看,最好的策略是让idle状态的cpu尽可能的保持idle,而不是反复idle,working,idle again。
我们来一个例子辅助理解上面的内容。在t1时刻,work被调度到CPU A上执行,t2时刻work执行完毕,CPU A进入idle,t3时刻有一个新的work需要处理,这时候调度work到那个CPU会好些呢?是处于working状态的CPU B还是处于idle状态的CPU A呢?如果调度到CPU A上运行,那么,由于之前处理过work,其cache内容新鲜热辣,处理起work当然是得心应手,速度很快,但是,这需要将CPU A从idle状态中唤醒。选择CPU B呢就不存在将CPU 从idle状态唤醒,从而获取power saving方面的好处。
了解了上面的基础内容之后,我们再来检视per cpu thread pool和unbound thread pool。当workqueue收到一个要处理的work,如果该workqueue是unbound类型的话,那么该work由unbound thread pool处理并把调度该work去哪一个CPU执行这样的策略交给系统的调度器模块来完成,对于scheduler而言,它会考虑CPU core的idle状态,从而尽可能的让CPU保持在idle状态,从而节省了功耗。因此,如果一个workqueue有WQ_UNBOUND这样的flag,则说明该workqueue上挂入的work处理是考虑到power saving的。如果workqueue没有WQ_UNBOUND flag,则说明该workqueue是per cpu的,这时候,调度哪一个CPU core运行worker thread来处理work已经不是scheduler可以控制的了,这样,也就间接影响了功耗。
有两个参数可以控制workqueue在performance和power saving之间的平衡:
1、各个workqueue需要通过WQ_POWER_EFFICIENT来标记自己在功耗方面的属性
2、系统级别的内核参数workqueue.power_efficient。
使用workqueue的用户知道自己在电源管理方面的特点,如果该workqueue在unbound的时候会极大的降低功耗,那么就需要加上WQ_POWER_EFFICIENT的标记。这时候,如果没有标记WQ_UNBOUND,那么缺省workqueue会创建per cpu thread pool来处理work。不过,也可以通过workqueue.power_efficient这个内核参数来修改workqueue的行为:
/* see the comment above the definition of WQ_POWER_EFFICIENT */
static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT);
module_param_named(power_efficient, wq_power_efficient, bool, 0444);
如果wq_power_efficient设定为true,那么WQ_POWER_EFFICIENT的标记的workqueue就会强制按照unbound workqueue来处理,即使没有标记WQ_UNBOUND。
分析3:
/* allocate wq and format name */
if (flags & WQ_UNBOUND)
tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
if (!wq)
return NULL;
if (flags & WQ_UNBOUND) {
wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
if (!wq->unbound_attrs)
goto err_free_wq;
}
代码很简单,与其要解释代码,不如来解释一些基本概念。
这里涉及2个数据结构:workqueue_struct和pool_workqueue,为何如此处理呢?我们知道,在CMWQ中,workqueue和thread pool没有严格的一一对应关系了,因此,系统中的workqueue们共享一组thread pool,因此,workqueue中的成员包括两个类别:global类型和per thread pool类型的,我们把那些per thread pool类型的数据集合起来就形成了pool_workqueue的定义。
挂入workqueue的work终究需要worker pool中的某个worker thread来处理,也就是说,workqueue要和系统中那些共享的worker thread pool进行连接,这是通过pool_workqueue(该数据结构会包含一个指向worker pool的指针)的数据结构来管理的。和这个workqueue相关的pool_workqueue被挂入一个链表,链表头就是workqueue_struct中的pwqs成员。
和旧的workqueue机制一样,系统维护了一个所有workqueue的list,list head定义如下:
static LIST_HEAD(workqueues); /* PR: list of all workqueues */
workqueue_struct中的list成员就是挂入这个链表的节点。
workqueue有两种:unbound workqueue和per cpu workqueue。对于per cpu类型,cpu_pwqs指向了一组per cpu的pool_workqueue数据结构,用来维护workqueue和per cpu thread pool之间的关系。每个cpu都有两个thread pool,normal和高优先级的线程池,到底cpu_pwqs指向哪一个pool_workqueue(worker thread)是和workqueue的flag相关,如果标有WQ_HIGHPRI,那么cpu_pwqs指向高优先级的线程池。unbound workqueue对应的pool_workqueue和workqueue属性相关,我们在下一节描述。
2、workqueue attribute
挂入workqueue的work终究是需要worker线程来处理,针对worker线程有下面几个考量点(我们称之attribute):
(1)该worker线程的优先级
(2)该worker线程运行在哪一个CPU上
(3)如果worker线程可以运行在多个CPU上,且这些CPU属于不同的NUMA(Non Uniform Memory Access Architecture 非统一内存访问) node,那么是否在所有的NUMA node中都可以获取良好的性能。
对于per-CPU的workqueue,2和3不存在问题,哪个cpu上queue的work就在哪个cpu上执行,由于只能在一个确定的cpu上执行,因此起NUMA的node也是确定的(一个CPU不可能属于两个NUMA node)。置于优先级,per-CPU的workqueue使用WQ_HIGHPRI来标记。综上所述,per-CPU的workqueue不需要单独定义一个workqueue attribute,这也是为何在workqueue_struct中只有unbound_attrs这个成员来记录unbound workqueue的属性。
unbound workqueue由于不绑定在具体的cpu上,可以运行在系统中的任何一个cpu,直觉上似乎系统中有一个unbound thread pool就OK了,不过让一个thread pool创建多种属性的worker线程是一个好的设计吗?本质上,thread pool应该创建属性一样的worker thread。因此,我们通过workqueue属性来对unbound workqueue进行分类,workqueue属性定义如下:
/**
* struct workqueue_attrs - A struct for workqueue attributes.
*
* This can be used to change attributes of an unbound workqueue.
*/
struct workqueue_attrs {
/**
* @nice: nice level
*/
int nice;
/**
* @cpumask: allowed CPUs
*/
cpumask_var_t cpumask;
/**
* @no_numa: disable NUMA affinity
*
* Unlike other fields, ``no_numa`` isn't a property of a worker_pool. It
* only modifies how :c:func:`apply_workqueue_attrs` select pools and thus
* doesn't participate in pool hash calculations or equality comparisons.
*/
bool no_numa;
};
- nice是一个和thread优先级相关的属性,nice越低则优先级越高。
- cpumask是该workqueue挂入的work允许在哪些cpu上运行。
- no_numa是一个和NUMA affinity相关的设定。
3、unbound workqueue和NUMA之间的联系
UMA(Uniform Memory Access Architecture 统一内存访问)系统中,所有的processor看到的内存都是一样的,访问速度也是一样,无所谓local or remote,因此,内核线程如果要分配内存,那么也是无所谓,统一安排即可。在NUMA系统中,不同的一个或者一组cpu看到的memory是不一样的,我们假设node 0中有CPU A和B,node 1中有CPU C和D,如果运行在CPU A上内核线程现在要迁移到CPU C上的时候,悲剧发生了:该线程在A CPU创建并运行的时候,分配的内存是node 0中的memory,这些memory是local的访问速度很快,当迁移到CPU C上的时候,原来local memory变成remote,性能大大降低。因此,unbound workqueue需要引入NUMA的考量点。
NUMA是内存管理的范畴,本文不会深入描述,我们暂且放开NUMA,先思考这样的一个问题:一个确定属性的unbound workqueue需要几个线程池?看起来一个就够了,毕竟workqueue的属性已经确定了,一个线程池创建相同属性的worker thread就行了。但是我们来看一个例子:假设workqueue的work是可以在node 0中的CPU A和B,以及node 1中CPU C和D上处理,如果只有一个thread pool,那么就会存在worker thread在不同node之间的迁移问题。为了解决这个问题,实际上unbound workqueue实际上是创建了per node的pool_workqueue(thread pool)
当然,是否使用per node的pool workqueue用户是可以通过下面的参数进行设定的:
(1)workqueue attribute中的no_numa成员
(2)通过workqueue.disable_numa这个参数,disable所有workqueue的numa affinity的支持。
static bool wq_disable_numa;
module_param_named(disable_numa, wq_disable_numa, bool, 0444);
分析4、初始化workqueue的成员
除了max active,没有什么要说的,代码都简单而且直观。如果用户没有设定max active(或者说max active等于0),那么系统会给出一个缺省的设定。系统定义了两个最大值WQ_MAX_ACTIVE(512)和WQ_UNBOUND_MAX_ACTIVE(和cpu数目有关,最大值是cpu数目乘以4,当然也不能大于WQ_MAX_ACTIVE),分别限定per cpu workqueue和unbound workqueue的最大可以创建的worker thread的数目。wq_clamp_max_active可以将max active限制在一个确定的范围内。
分析5
static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
bool highpri = wq->flags & WQ_HIGHPRI; //获取normal or high priority
int cpu, ret;
if (!(wq->flags & WQ_UNBOUND)) { //per cpu workqueue的处理
wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); //为每个cpu分配一个pool_workqueue数据结构
if (!wq->cpu_pwqs)
return -ENOMEM;
for_each_possible_cpu(cpu) { //逐个cpu进行设定
struct pool_workqueue *pwq =
per_cpu_ptr(wq->cpu_pwqs, cpu); //获取本cpu的pool_workqueue
struct worker_pool *cpu_pools =
per_cpu(cpu_worker_pools, cpu); //获取本cpu的worker_pool
//将动态分配的cpu_pwqs和静态定义的cpu_worker_pools关联起来
init_pwq(pwq, wq, &cpu_pools[highpri]);
mutex_lock(&wq->mutex);
link_pwq(pwq); //把pool_workqueue添加到workqueue_struct->pwqs链表中
mutex_unlock(&wq->mutex);
}
return 0;
} else if (wq->flags & __WQ_ORDERED) { //wq加入到ordered_wq_attrs的处理 (有序未绑定cpu)
ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
/* there should only be single pwq for ordering guarantee */
WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
"ordering guarantee broken for workqueue %s\n", wq->name);
return ret;
} else { //wq加入到unbound_std_wq_attrs的处理 (无序未绑定cpu)
return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
}
}
通过alloc_percpu可以为每一个cpu分配一个pool_workqueue的memory。每个pool_workqueue都有一个对应的worker thread pool,对于per-CPU workqueue,它是静态定义的,如下:
/* the per-cpu worker pools */
static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
//NR_STD_WORKER_POOLS = 2, 表示每个cpu定义两个标准的worker pool
//上面这个宏也就表示,每个cpu定义了两个struct worker_pool, 结构体的名字为cpu_worker_pools
对于未绑定cpu的wq,系统也定义了相关属性的指针(也是分为normal和high两种)
/* I: attributes used when instantiating standard unbound pools on demand */
static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
/* I: attributes used when instantiating ordered pools on demand */
static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
当然这连个真真的实例化是在系统初始化阶段,给动态申请内存的。
将动态分配的cpu_pwqs和静态定义的cpu_worker_pools关联起来
/* initialize newly alloced @pwq which is associated with @wq and @pool */
static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
struct worker_pool *pool)
{
BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
memset(pwq, 0, sizeof(*pwq));
pwq->pool = pool; //连接本cpu上的wq的线程池
pwq->wq = wq; //链接工作队列
pwq->flush_color = -1;
pwq->refcnt = 1;
INIT_LIST_HEAD(&pwq->delayed_works);
INIT_LIST_HEAD(&pwq->pwqs_node);
INIT_LIST_HEAD(&pwq->mayday_node);
INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
}
pool_workqueue添加到workqueue_struct->pwqs链表中
/* sync @pwq with the current state of its associated wq and link it */
static void link_pwq(struct pool_workqueue *pwq)
{
struct workqueue_struct *wq = pwq->wq;
lockdep_assert_held(&wq->mutex);
/* may be called multiple times, ignore if already linked */
if (!list_empty(&pwq->pwqs_node))
return;
/* set the matching work_color */
pwq->work_color = wq->work_color;
/* sync max_active to the current setting */
pwq_adjust_max_active(pwq);
/* link in @pwq */
list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
}
unbound workqueue有两种,一种是normal type。
另外一种是ordered type,这种workqueue上的work是严格按照顺序执行的,不存在并发问题。ordered unbound workqueue的行为类似过去的single thread workqueue。
但是,无论那种类型的unbound workqueue都使用apply_workqueue_attrs来建立workqueue、pool wq和thread pool之间的关系。
/**
* apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
* @wq: the target workqueue
* @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
*
* Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
* machines, this function maps a separate pwq to each NUMA node with
* possibles CPUs in @attrs->cpumask so that work items are affine to the
* NUMA node it was issued on. Older pwqs are released as in-flight work
* items finish. Note that a work item which repeatedly requeues itself
* back-to-back will stay on its current pwq.
*
* Performs GFP_KERNEL allocations.
*
* Return: 0 on success and -errno on failure.
*/
int apply_workqueue_attrs(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
int ret;
apply_wqattrs_lock();
ret = apply_workqueue_attrs_locked(wq, attrs);
apply_wqattrs_unlock();
return ret;
}
static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
struct apply_wqattrs_ctx *ctx;
/* only unbound workqueues can change attributes */
if (WARN_ON(!(wq->flags & WQ_UNBOUND))) //参数检查,这里是只处理未绑定cpu的
return -EINVAL;
/* creating multiple pwqs breaks ordering guarantee */
if (!list_empty(&wq->pwqs)) { //参数检查,__WQ_ORDERED_EXPLICIT标志的属于他的pwq只能有一个
if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
return -EINVAL;
wq->flags &= ~__WQ_ORDERED;
}
ctx = apply_wqattrs_prepare(wq, attrs); //申请特定属性的wq
if (!ctx)
return -ENOMEM;
/* the ctx has been prepared successfully, let's commit it */
apply_wqattrs_commit(ctx); //提交安装,见最后面的分析 7
apply_wqattrs_cleanup(ctx);
return 0;
}
/* allocate the attrs and pwqs for later installation */
static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
struct apply_wqattrs_ctx *ctx;
struct workqueue_attrs *new_attrs, *tmp_attrs;
int node;
lockdep_assert_held(&wq_pool_mutex);
ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_node_ids), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs(GFP_KERNEL); //申请一个workqueue_attrs
tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL); //申请临时sttr
if (!ctx || !new_attrs || !tmp_attrs)
goto out_free;
/*
* Calculate the attrs of the default pwq.
* If the user configured cpumask doesn't overlap with the
* wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
*/
copy_workqueue_attrs(new_attrs, attrs); //系统默认的拷贝的新的
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask); //对新的设置属性
if (unlikely(cpumask_empty(new_attrs->cpumask))) //检视是不是对所有cpu都无效
cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
/*
* We may create multiple pwqs with differing cpumasks. Make a
* copy of @new_attrs which will be modified and used to obtain
* pools.
*/
copy_workqueue_attrs(tmp_attrs, new_attrs); //拷贝一份副本
/*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
*/
ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs); //分配default pool workqueue
if (!ctx->dfl_pwq)
goto out_free;
//遍历node
for_each_node(node) {
if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) { //是否使用default pool wq
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); 该node使用自己的pool wq
if (!ctx->pwq_tbl[node])
goto out_free;
} else {
ctx->dfl_pwq->refcnt++; //默认的使用计数
ctx->pwq_tbl[node] = ctx->dfl_pwq; //该node使用default pool wq
}
}
/* save the user configured attrs and sanitize it. */
copy_workqueue_attrs(new_attrs, attrs);
cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
ctx->attrs = new_attrs;
ctx->wq = wq;
free_workqueue_attrs(tmp_attrs); //释放掉临时的workqueue_attrs
return ctx;
out_free:
free_workqueue_attrs(tmp_attrs);
free_workqueue_attrs(new_attrs);
apply_wqattrs_cleanup(ctx);
return NULL;
}
/* obtain a pool matching @attr and create a pwq associating the pool and @wq */
static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
const struct workqueue_attrs *attrs)
{
struct worker_pool *pool;
struct pool_workqueue *pwq;
lockdep_assert_held(&wq_pool_mutex);
pool = get_unbound_pool(attrs); //获取一个worker_pool
if (!pool)
return NULL;
//指定内存节点份分配pool_workqueue ,后面 分析6
pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
if (!pwq) {
put_unbound_pool(pool);
return NULL;
}
init_pwq(pwq, wq, pool); //未绑定cpu的也要对应的worker_pool和wq绑定到pool_workqueue上
return pwq;
}
pwq_tbl数组用来保存unbound workqueue各个node的pool workqueue的指针,new_attrs和tmp_attrs都是一些计算workqueue attribute的中间变量,开始的时候设定为用户传入的workqueue的attribute。
如何为unbound workqueue的pool workqueue寻找对应的线程池?
具体的代码在get_unbound_pool函数中。
/**
* get_unbound_pool - get a worker_pool with the specified attributes
* @attrs: the attributes of the worker_pool to get
*
* Obtain a worker_pool which has the same attributes as @attrs, bump the
* reference count and return it. If there already is a matching
* worker_pool, it will be used; otherwise, this function attempts to
* create a new one.
*
* Should be called with wq_pool_mutex held.
*
* Return: On success, a worker_pool with the same attributes as @attrs.
* On failure, %NULL.
*/
static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{
u32 hash = wqattrs_hash(attrs);
struct worker_pool *pool;
int node;
int target_node = NUMA_NO_NODE;
lockdep_assert_held(&wq_pool_mutex);
/* do we already have a matching pool? 有相同属相的,则不需要再创建了新的线程池 */
hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
if (wqattrs_equal(pool->attrs, attrs)) {
pool->refcnt++;
return pool;
}
}
/* if cpumask is contained inside a NUMA node, we belong to that node */
if (wq_numa_enabled) {
for_each_node(node) {
if (cpumask_subset(attrs->cpumask,
wq_numa_possible_cpumask[node])) {
target_node = node;
break;
}
}
}
/* nope, create a new one,没有相同的,创建一个这个属性的线程池 */
pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
if (!pool || init_worker_pool(pool) < 0)
goto fail;
lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
copy_workqueue_attrs(pool->attrs, attrs);
pool->node = target_node;
/*
* no_numa isn't a worker_pool attribute, always clear it. See
* 'struct workqueue_attrs' comments for detail.
*/
pool->attrs->no_numa = false;
if (worker_pool_assign_id(pool) < 0)
goto fail;
/* create and start the initial worker */
if (wq_online && !create_worker(pool))
goto fail;
/* install */
hash_add(unbound_pool_hash, &pool->hash_node, hash); //把新的这个属性的线程池也挂到unbound_pool_hash的hash表上
return pool;
fail:
if (pool)
put_unbound_pool(pool);
return NULL;
}
per cpu的workqueue的pool workqueue对应的线程池也是per cpu的,每个cpu有两个线程池(normal和high priority),因此将pool workqueue和thread pool对应起来是非常简单的事情。对于unbound workqueue,对应关系没有那么直接,如果属性相同,多个unbound workqueue的pool workqueue可能对应一个thread pool。
系统使用哈希表来保存所有的unbound worker thread pool,定义如下:
/* PL: hash of all unbound pools keyed by pool->attrs */
static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
在创建unbound workqueue的时候,pool workqueue对应的worker thread pool需要在这个哈希表中搜索,如果有相同属性的worker thread pool的话,那么就不需要创建新的线程池,代码如下:
/* do we already have a matching pool? 有相同属相的,则不需要再创建了新的线程池 */
hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
if (wqattrs_equal(pool->attrs, attrs)) {
pool->refcnt++;
return pool;
}
}
分析6 各个node分配pool workqueue并初始化
在进入代码之前,先了解一些基础知识。缺省情况下,挂入unbound workqueue的works最好是考虑NUMA Affinity,这样可以获取更好的性能。当然,实际上用户可以通过workqueue.disable_numa这个内核参数来关闭这个特性,这时候,系统需要一个default pool workqueue(workqueue_struct的dfl_pwq成员),所有的per node的pool workqueue指针都是执行default pool workqueue。
workqueue.disable_numa是enable的情况下是否不需要default pool workqueue了呢?也不是,我们举一个简单的例子,一个系统的构成是这样的:node 0中有CPU A和B,node 1中有CPU C和D,node 2中有CPU E和F,假设workqueue的attribute规定work只能在CPU A 和C上运行,那么在node 0和node 1中创建自己的pool workqueue是ok的,毕竟node 0中有CPU A,node 1中有CPU C,该node创建的worker thread可以在A或者C上运行。但是对于node 2节点,没有任何的CPU允许处理该workqueue的work,在这种情况下,没有必要为node 2建立自己的pool workqueue,而是使用default pool workqueue。
OK,我们来看代码:
/*
* If something goes wrong during CPU up/down, we'll fall back to
* the default pwq covering whole @attrs->cpumask. Always create
* it even if we don't use it immediately.
*/
ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs); //分配default pool workqueue
if (!ctx->dfl_pwq)
goto out_free;
//遍历node
for_each_node(node) {
if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) { //是否使用default pool wq
ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); 该node使用自己的pool wq
if (!ctx->pwq_tbl[node])
goto out_free;
} else {
ctx->dfl_pwq->refcnt++; //默认的使用计数
ctx->pwq_tbl[node] = ctx->dfl_pwq; //该node使用default pool wq
}
}
分析7:
所有的node的pool workqueue及其worker thread pool已经ready,需要安装到workqueue中了
/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
{
int node;
/* all pwqs have been created successfully, let's install'em */
mutex_lock(&ctx->wq->mutex);
copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
/* save the previous pwq and install the new one */
for_each_node(node)
ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
ctx->pwq_tbl[node]);
/* @dfl_pwq might not have been used, ensure it's linked */
link_pwq(ctx->dfl_pwq);
swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
mutex_unlock(&ctx->wq->mutex);
}
本文参考:
http://www.wowotech.net/sort/irq_subsystem
https://www.cnblogs.com/arnoldlu/p/8659988.html