1. 工作队列是什么
工作队列:我们可以将不是很紧急处理的事情放到 workqueue
中处理,等待系统空闲时就会去执行 workqueue
里的事情。工作队列的本质就是开一个线程去处理排列好的任务,所以工作队列中不能有死循环,尽可能的不要使用使用会导致线程阻塞的API
。
workqueue
的作用就是将工作延迟处理, workqueue
是中断处理的后半程。
2. 工作队列怎么使用
- 初始化 work
rt_inline void rt_work_init(struct rt_work *work, void (*work_func)(struct rt_work *work, void *work_data),
void *work_data)
work : 工作队列的控制块
*work_func : 指定任务回调函数的指针
*work_data : 待处理的数据指针
-
创建工作队列
struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
name : 创建工作队列线程的名字
stack_size : 创建工作队列线程的栈大小
priority :创建工作队列线程的优先级
-
删除工作队列
rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
queue :工作队列线程块的删除
-
增加工作队列任务
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
queue : 工作队列控制块
work : work 控制块
-
提交工作队列任务
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time)
queue : 工作队列控制块
work : work 控制块
time : 超时时间
-
提交临界工作队列
rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work)
queue : 工作队列控制块
work : work 控制块
-
取消工作队列任务
rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
queue : 工作队列控制块
work : work 控制块
-
同步取消工作队列任务
rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
queue : 工作队列控制块
work : work 控制块
-
取消工作队列所有任务
rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
queue : 工作队列控制块
-
初始化延迟队列
void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
void *work_data), void *work_data)
3. 工作队列的实现原理
先看一下工作队列相关的结构体
/* workqueue implementation */
struct rt_workqueue
{
rt_list_t work_list;
struct rt_work *work_current; /* current work */
struct rt_semaphore sem;
rt_thread_t work_thread;
};
struct rt_work
{
rt_list_t list;
void (*work_func)(struct rt_work *work, void *work_data);
void *work_data;
rt_uint16_t flags;
rt_uint16_t type;
struct rt_timer timer;
struct rt_workqueue *workqueue;
};
struct rt_delayed_work
{
struct rt_work work;
};
通过结构体可以知道, 每一个 work
都通过链表挂载 work_queue
-
初始化 work
rt_inline void rt_work_init(struct rt_work *work, void (*work_func)(struct rt_work *work, void *work_data), void *work_data) { rt_list_init(&(work->list)); // 初始化 work 链表 work->work_func = work_func; // work 回调函数 work->work_data = work_data; // 带处理的数据缓冲区指针 work->workqueue = RT_NULL; // 初始化的时候不挂载到任何 workqueue 上 work->flags = 0; work->type = 0; }
-
创建工作队列
struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority) { struct rt_workqueue *queue = RT_NULL; /* 为 queue 申请内存 */ queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue)); if (queue != RT_NULL) { /* initialize work list */ rt_list_init(&(queue->work_list));// 初始化链表 &(queue->work_list) queue->work_current = RT_NULL; // 当前还没有 work rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO); // 初始化一个信号量 /* create the work thread */ queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10); // 创建一个线程 if (queue->work_thread == RT_NULL) { RT_KERNEL_FREE(queue); return RT_NULL; } rt_thread_startup(queue->work_thread); // 启动这个线程 } return queue; }
-
删除工作队列
rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue) { RT_ASSERT(queue != RT_NULL); rt_thread_delete(queue->work_thread);// 删除创建的线程 RT_KERNEL_FREE(queue);// 释放 queue 申请的内存 return RT_EOK; }
-
增加任务到工作队列
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work) { RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); return _workqueue_submit_work(queue, work); // 提交到工作队列 }
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work) { rt_base_t level; level = rt_hw_interrupt_disable(); // 关中断 if (work->flags & RT_WORK_STATE_PENDING) // 确保工作队列不是在挂起状态 { rt_hw_interrupt_enable(level); // 开中断 return -RT_EBUSY; // 返回 忙 } if (queue->work_current == work) // 如果插入的 work 是当前的 work { rt_hw_interrupt_enable(level); return -RT_EBUSY; // 返回忙 } /* NOTE: the work MUST be initialized firstly */ rt_list_remove(&(work->list));// 把 work 的 list 移除,确保链表的首尾都指向自己 rt_list_insert_after(queue->work_list.prev, &(work->list));//把work链表插入到queue work->flags |= RT_WORK_STATE_PENDING;//设置工作队列为挂起状态 /* whether the workqueue is doing work */ if (queue->work_current == RT_NULL) // 如果当前工作队列中没有work { rt_hw_interrupt_enable(level);// 开启中断 /* resume work thread */ rt_thread_resume(queue->work_thread);// 恢复当前 queue 上的线程 rt_schedule();// 开调度 } else { rt_hw_interrupt_enable(level); } return RT_EOK; }
-
提交工作到工作队列
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time) { RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); if (time > 0) //如果时间设置大于0 { work->type |= RT_WORK_TYPE_DELAYED; //设置 WORK 的状态为等待状态 } if (work->type & RT_WORK_TYPE_DELAYED) // 如果是等待状态 { return _workqueue_submit_delayed_work(queue, work, time); } else { return _workqueue_submit_work(queue, work); // 参考前面的分析 } }
static rt_err_t _workqueue_submit_delayed_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t ticks) { rt_base_t level; rt_err_t ret = RT_EOK; /* Work cannot be active in multiple queues */ if (work->workqueue && work->workqueue != queue) // 确保工作不在多个工作队列中 { ret = -RT_EINVAL; goto __exit; } /* Cancel if work has been submitted */ if (work->workqueue == queue) //如果已经提交了 { ret = _workqueue_cancel_delayed_work(work); // 那么就取消这工作 if (ret < 0) { goto __exit; } } level = rt_hw_interrupt_disable(); // 关中断 /* Attach workqueue so the timeout callback can submit it */ work->workqueue = queue; // work 在 queue 上 rt_hw_interrupt_enable(level); // 开中断 if (!ticks) // 如果ticks 是 0 { /* Submit work if no ticks is 0 */ ret = _workqueue_submit_work(work->workqueue, work); } else { level = rt_hw_interrupt_disable(); // 关中断 /* Add timeout */ work->flags |= RT_WORK_STATE_SUBMITTING; // 设置状态为 提交状态 rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, work, ticks, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER); // 初始化定时器 rt_hw_interrupt_enable(level); // 开中断 rt_timer_start(&(work->timer)); // 启动定时器 } __exit: return ret; }
static void _delayed_work_timeout_handler(void *parameter)
{
struct rt_work *delayed_work;
rt_base_t level;
delayed_work = (struct rt_work *)parameter; // 获取参数
level = rt_hw_interrupt_disable(); // 关中断
rt_timer_stop(&(delayed_work->timer)); // 停止定时器
rt_timer_detach(&(delayed_work->timer)); // 脱离定时器
delayed_work->flags &= ~RT_WORK_STATE_SUBMITTING; // 取消标志位
delayed_work->type &= ~RT_WORK_TYPE_DELAYED; // 取消标志位
rt_hw_interrupt_enable(level); // 开中断
_workqueue_submit_work(delayed_work->workqueue, delayed_work); //把work加入到queue
}
-
提交临界工作队列
rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work) { rt_base_t level; RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); level = rt_hw_interrupt_disable(); // 关中断 if (queue->work_current == work) // 确保当前队列上的工作不是当前工作 { rt_hw_interrupt_enable(level); return -RT_EBUSY; } /* NOTE: the work MUST be initialized firstly */ rt_list_remove(&(work->list)); // 确保链表是空的 rt_list_insert_after(queue->work_list.prev, &(work->list));// 把work 插入到 queue if (queue->work_current == RT_NULL) // 如果当前的work 是空 { rt_hw_interrupt_enable(level); // 使能中断 /* resume work thread */ rt_thread_resume(queue->work_thread); // 恢复这个线程 rt_schedule(); // 开启调度 } else rt_hw_interrupt_enable(level); return RT_EOK; }
这里没有设置 工作状态为
work->flags |= RT_WORK_STATE_PENDING;
所以WORK
会立即得到执行. -
取消队列中的工作
rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work) { RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); if (work->type & RT_WORK_TYPE_DELAYED) // 如果在等待状态 { return _workqueue_cancel_delayed_work(work); // 延迟取消 } else { return _workqueue_cancel_work(queue, work); // 取消工作 } }
static rt_err_t _workqueue_cancel_delayed_work(struct rt_work *work) { rt_base_t level; int ret = RT_EOK; if (!work->workqueue) // 如果 work 不在 队列上 { ret = -EINVAL; goto __exit; } if (work->flags & RT_WORK_STATE_PENDING) // work 的状态为 挂起状态 { /* Remove from the queue if already submitted */ ret = _workqueue_cancel_work(work->workqueue, work);// 取消任务 if (ret) { goto __exit; } } else { if (work->flags & RT_WORK_STATE_SUBMITTING) // 如果状态是提交状态 { level = rt_hw_interrupt_disable(); // 关中断 rt_timer_stop(&(work->timer)); // 停止定时器 rt_timer_detach(&(work->timer));// 脱离定时器 work->flags &= ~RT_WORK_STATE_SUBMITTING; // 取消标志位 rt_hw_interrupt_enable(level);// 开中断 } } level = rt_hw_interrupt_disable(); // 关中断 /* Detach from workqueue */ work->workqueue = RT_NULL; work->flags &= ~(RT_WORK_STATE_PENDING); rt_hw_interrupt_enable(level); __exit: return ret; }
static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work) { rt_base_t level; level = rt_hw_interrupt_disable(); if (queue->work_current == work) //如果 work 的工作还没做完 { rt_hw_interrupt_enable(level); return -RT_EBUSY; } rt_list_remove(&(work->list)); //从链表上移除 work->flags &= ~RT_WORK_STATE_PENDING; // 设置为 pending 状态 rt_hw_interrupt_enable(level); // 开启使能 return RT_EOK; }
-
取消同步工作
rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work) { rt_base_t level; RT_ASSERT(queue != RT_NULL); RT_ASSERT(work != RT_NULL); level = rt_hw_interrupt_disable(); if (queue->work_current == work) /* it's current work in the queue */ { /* wait for work completion */ rt_sem_take(&(queue->sem), RT_WAITING_FOREVER); // 等待获取信号量 } else { rt_list_remove(&(work->list)); // 移除节点 } work->flags &= ~RT_WORK_STATE_PENDING; rt_hw_interrupt_enable(level); return RT_EOK; }
-
取消所有工作
rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue) { struct rt_list_node *node, *next; RT_ASSERT(queue != RT_NULL); rt_enter_critical(); // 进入临界区 for (node = queue->work_list.next; node != &(queue->work_list); node = next) { next = node->next; rt_list_remove(node);// 遍历移除 } rt_exit_critical(); // 退出临界区 return RT_EOK; }
-
创建延迟工作队列
void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work, void *work_data), void *work_data) { rt_work_init(&work->work, work_func, work_data); // 初始化 work }
4. 总结
- 工作队列的本质就一个线程完成多个工作,每个工作都是一个函数,切记函数里面不能有死循环
- 工作队列的同步的原理是使用了信号量
- 延时工作是等待设定的时间后才把
work
加入到工作队列