Linux内核学习之工作队列

Linux内核学习之工作队列

  • Author       : Toney
  • Email         : vip_13031075266@163.com
  • Date          : 2020.12.02
  • Copyright : 未经同意不得转载!!!
  • Version    : Linux-2.6.12
  • Reference:https://www.linux.org/

目录

一、核心数据结构

 1. struct work_struct

 2. struct cpu_workqueue_struct

 3. struct workqueue_struct

 4. 这三个数据结构之间的关系

5. 工作队列实现框架(singlethread)

二、创建并初始化工作队列

三、工作者线程

四、调度一个任务到工作队列中

五、销毁工作队列

六、Linux内核维护的工作队列


Linux内核学习之工作队列

双向链表操作函数

一、核心数据结构

 1. struct work_struct

struct work_struct {

unsigned long pending;

struct list_head entry; /*将工作节点构成链表*/

void (*func)(void *);   /*延时处理函数*/

void *data;

void *wq_data;

struct timer_list timer;

};

 2. struct cpu_workqueue_struct

struct cpu_workqueue_struct {

 

spinlock_t lock;

 

long remove_sequence;        /* Least-recently added (next to run) */

long insert_sequence;        /* Next to add */

 

struct list_head worklist;

wait_queue_head_t more_work;

wait_queue_head_t work_done;

 

struct workqueue_struct *wq;

task_t *thread;

 

int run_depth;                /* Detect run_workqueue() recursion depth */

} ____cacheline_aligned;

 3. struct workqueue_struct

struct workqueue_struct {

struct cpu_workqueue_struct cpu_wq[NR_CPUS];

const char *name;

struct list_head list;         /* Empty if single thread */

};

 

 4. 这三个数据结构之间的关系

 

Linux内核学习之工作队列

 

5. 工作队列实现框架(singlethread)

 

Linux内核学习之工作队列

 

 

二、创建并初始化工作队列

Linux内核学习之工作队列

 

#define create_workqueue(name) __create_workqueue((name), 0)

#define create_singlethread_workqueue(name) __create_workqueue((name), 1)

struct workqueue_struct *__create_workqueue(const char *name,

    int singlethread)

{

int cpu, destroy = 0;

struct workqueue_struct *wq;

struct task_struct *p;

 

BUG_ON(strlen(name) > 10);

 

/*分配一个workqueue结构,其中包括ncore个cpu_workqueue_struct*/

wq = kmalloc(sizeof(*wq), GFP_KERNEL);

if (!wq)

return NULL;

memset(wq, 0, sizeof(*wq));/*初始化workqueue结构*/

 

wq->name = name;

/* We don't need the distraction of CPUs appearing and vanishing. */

lock_cpu_hotplug();

if (singlethread) {/*如果为singlethread模式,则只需要创建一个workqueue*/

INIT_LIST_HEAD(&wq->list);/*自成一派,无需加入全局workqueue链表*/

p = create_workqueue_thread(wq, 0);/*只创建一个workqueue线程*/

if (!p)

destroy = 1;

else

wake_up_process(p);/*启动workqueue线程*/

} else {

spin_lock(&workqueue_lock);

list_add(&wq->list, &workqueues);/*加入全局的workqueue链表中*/

spin_unlock(&workqueue_lock);

for_each_online_cpu(cpu) {

p = create_workqueue_thread(wq, cpu);/*为每一个CPU都创建一个workqueue线程*/

if (p) {

kthread_bind(p, cpu);/*为每一个线程绑定特定cpu*/

wake_up_process(p);/*启动线程*/

} else

destroy = 1;

}

}

unlock_cpu_hotplug();

 

/*

 * Was there any error during startup? If yes then clean up:

 */

if (destroy) {/*出现错误,则销毁workqueue*/

destroy_workqueue(wq);

wq = NULL;

}

return wq;

}

工作者线程相关参见下一小节。

三、工作者线程

在创建工作队列时,需要创建对应的工作者线程。那么为什么要创建工作者线程呢?原因是这样的:工作队列主要的目的是:为了简化在内核中创建线程而设计的。此外为了实现延迟任务,需要通过异步手段将此任务交由其他进程处理,而当前进程可以继续处理其他事物。通过工作队列相关的基础设施,人们不必再关心内核中如何创建、维护、销毁内核线程,如何调度任务等,而只需要关心与特定功能相关的事务。下面简单介绍下工作者线程:

/*工作队列管理结构初始化,并创建对应的线程*/

static struct task_struct

*create_workqueue_thread(struct workqueue_struct *wq, int cpu)

{

struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;

struct task_struct *p;

 

spin_lock_init(&cwq->lock);

cwq->wq = wq;

cwq->thread = NULL;

cwq->insert_sequence = 0;

cwq->remove_sequence = 0;

INIT_LIST_HEAD(&cwq->worklist);/*清空工作列表*/

init_waitqueue_head(&cwq->more_work);

init_waitqueue_head(&cwq->work_done);

 

/*创建工作者线程并初始化为相应的name: name/cpu

*工作者线程的执行体为:worker_thread

*/

if (is_single_threaded(wq))

p = kthread_create(worker_thread, cwq, "%s", wq->name);

else

p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);

if (IS_ERR(p))

return NULL;

cwq->thread = p;/*工作队列绑定线程*/

return p;

}

static int worker_thread(void *__cwq)

{

struct cpu_workqueue_struct *cwq = __cwq;

DECLARE_WAITQUEUE(wait, current);

struct k_sigaction sa;

sigset_t blocked;

 

current->flags |= PF_NOFREEZE;

 

set_user_nice(current, -5);

 

/* Block and flush all signals */

sigfillset(&blocked);

sigprocmask(SIG_BLOCK, &blocked, NULL);

flush_signals(current);

 

/* SIG_IGN makes children autoreap: see do_notify_parent(). */

sa.sa.sa_handler = SIG_IGN;

sa.sa.sa_flags = 0;

siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));

do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);

 

set_current_state(TASK_INTERRUPTIBLE);

while (!kthread_should_stop()) {/*没有其他线程调用kthread_stop关闭此线程*/

add_wait_queue(&cwq->more_work, &wait);

if (list_empty(&cwq->worklist))/*如果任务列表中没有工作任务,则让出CPU*/

schedule();

else

__set_current_state(TASK_RUNNING);

remove_wait_queue(&cwq->more_work, &wait);

 

if (!list_empty(&cwq->worklist))/*任务列表中有工作任务,则执行任务*/

run_workqueue(cwq);

set_current_state(TASK_INTERRUPTIBLE);

}

__set_current_state(TASK_RUNNING);

return 0;

}

/*工作者线程执行工作链表上的延时任务*/

static inline void run_workqueue(struct cpu_workqueue_struct *cwq)

{

unsigned long flags;

 

/*

 * Keep taking off work from the queue until

 * done.

 */

spin_lock_irqsave(&cwq->lock, flags);

cwq->run_depth++;

if (cwq->run_depth > 3) {

/* morton gets to eat his hat */

printk("%s: recursion depth exceeded: %d\n",

__FUNCTION__, cwq->run_depth);

dump_stack();

}

while (!list_empty(&cwq->worklist)) {

struct work_struct *work = list_entry(cwq->worklist.next,

struct work_struct, entry);

void (*f) (void *) = work->func;/*取出任务操作以及参数*/

void *data = work->data;

list_del_init(cwq->worklist.next);/*从worklist中删除任务*/

 

spin_unlock_irqrestore(&cwq->lock, flags);

 

BUG_ON(work->wq_data != cwq);

clear_bit(0, &work->pending);/*标识此任务已经被调度执行*/

f(data);/*执行此任务*/

 

spin_lock_irqsave(&cwq->lock, flags);

cwq->remove_sequence++;

wake_up(&cwq->work_done);

}

cwq->run_depth--;

spin_unlock_irqrestore(&cwq->lock, flags);

}

四、调度一个任务到工作队列中

 

Linux内核学习之工作队列

将任务work_struct添加到工作队列中相对比较简单:只需要将work_struct结构通过struct_listhead链到workqueue的worklist上即可。代码实现详见下表:

int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)

{

int ret = 0, cpu = get_cpu();

/*如果work->pending第0位为1,则说明当前任务已经被提交,但尚未执行

* 如果为work->pending第0位为0,表示该任务尚未提交,可以进行提交

*/

if (!test_and_set_bit(0, &work->pending)) {/*返回pending的第0位,并将其置1*/

if (unlikely(is_single_threaded(wq)))

cpu = 0;

BUG_ON(!list_empty(&work->entry));

__queue_work(wq->cpu_wq + cpu, work);/*调度到特定CPU的worklist上*/

ret = 1;

}

put_cpu();

return ret;

}

/* Preempt must be disabled. */

static void __queue_work(struct cpu_workqueue_struct *cwq,

 struct work_struct *work)

{

unsigned long flags;

 

spin_lock_irqsave(&cwq->lock, flags);

work->wq_data = cwq;

list_add_tail(&work->entry, &cwq->worklist);/*将任务添加到工作列表中*/

cwq->insert_sequence++;

wake_up(&cwq->more_work);/*唤醒可能正在睡眠的工作者线程*/

spin_unlock_irqrestore(&cwq->lock, flags);

}

五、销毁工作队列

在销毁工作队列时,如果工作链表worklist中仍然有等待执行的任务,可以有两种操作:①全部丢弃;②全部执行完毕后再销毁工作队列。Linux内核中不同版本处理方式不一致,参考Linux-2.6.12源码,是讲工作链表中的任务全部执行完毕后再销毁工作队列。

void destroy_workqueue(struct workqueue_struct *wq)

{

int cpu;

 

flush_workqueue(wq);/*确保提交到worklist的任务都全部执行完成*/

 

/* We don't need the distraction of CPUs appearing and vanishing. */

lock_cpu_hotplug();

if (is_single_threaded(wq))

cleanup_workqueue_thread(wq, 0);/*清除工作者线程*/

else {

for_each_online_cpu(cpu)

cleanup_workqueue_thread(wq, cpu);/*清除工作者线程*/

spin_lock(&workqueue_lock);

list_del(&wq->list);/*从全局工作队列链表上摘除*/

spin_unlock(&workqueue_lock);

}

unlock_cpu_hotplug();

kfree(wq);

}

static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)

{

struct cpu_workqueue_struct *cwq;

unsigned long flags;

struct task_struct *p;

 

cwq = wq->cpu_wq + cpu;

spin_lock_irqsave(&cwq->lock, flags);

p = cwq->thread;

cwq->thread = NULL;

spin_unlock_irqrestore(&cwq->lock, flags);

if (p)

kthread_stop(p);/*停止工作者线程,此函数会阻塞*/

}

 

六、Linux内核维护的工作队列

内核在启动过程中初始化了一个(可能有多个)工作队列keventd_wq(name为events.)

这个工作队列我们都可以使用,如果不想自己创建单独的工作队列,我们完全可以使用events队列来完成我们的任务。需要说明的是:events可能有很多任务需要处理,因此效率上可能不是很高,因此需要视具体情况而定。events调用接口在上面提供的接口基础上又重新做了封装。

/*内核启动过程中定义的工作队列:kevenetd_wq*/

static struct workqueue_struct *keventd_wq;

void init_workqueues(void)

{

hotcpu_notifier(workqueue_cpu_callback, 0);

keventd_wq = create_workqueue("events");/*创建keventd_wq*/

BUG_ON(!keventd_wq);

}

int fastcall schedule_work(struct work_struct *work)

{

return queue_work(keventd_wq, work);

}

int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)

{

return queue_delayed_work(keventd_wq, work, delay);

}

 

上一篇:Linux服务器配置(jdk安装及环境变量配置)


下一篇:CentOS下启动oracle数据库