摘要: 鸿蒙轻内核的任务排序链表,用于任务延迟到期/超时唤醒等业务场景,是一个非常重要、非常基础的数据结构。
本文会继续给读者介绍鸿蒙轻内核源码中重要的数据结构:任务排序链表 TaskSortLinkAttr。鸿蒙轻内核的任务排序链表,用于任务延迟到期/超时唤醒等业务场景,是一个非常重要、非常基础的数据结构。本文中所涉及的源码,以 OpenHarmony LiteOS-M 内核为例,均可以在开源站点gitee.com/openharmony… 获取。
1、任务排序链表
我们先看下任务排序链接的数据结构。任务排序链表是一个环状的双向链表数组,任务排序链表属性结构体 TaskSortLinkAttr 作为双向链表的头结点,指向双向链表数组的第一个元素,还维护游标信息,记录当前的位置信息。我们先看下排序链表属性的结构体的定义。
1.1 任务排序链表属性结构体定义
在 kernel\include\los_task.h 头文件中定义了排序链表属性的结构体 TaskSortLinkAttr。该结构体定义了排序链表的头节点 LOS_DL_LIST*sortLink,游标 UINT16 cursor,还有一个保留字段,暂时没有使用。
源码如下:
typedef struct { LOS_DL_LIST *sortLink; UINT16 cursor; UINT16 reserved; } TaskSortLinkAttr;
在文件 kernel\src\los_task.c 中定义了排序链表属性结构体 TaskSortLinkAttr 类型的全局变量 g_taskSortLink,该全局变量的成员变量 sortLink 作为排序链表的头结点,指向一个长度为 32 的环状的双向链表数组,成员变量 cursor 作为游标记录环状数组的当前游标位置。源代码如下。
LITE_OS_SEC_BSS TaskSortLinkAttr g_taskSortLink;
我们使用示意图来讲述一下。任务排序链表是环状双向链表数组,长度为 32,每一个元素是一个双向链表,挂载任务 LosTaskCB 的链表节点 timerList。任务 LosTaskCB 的成员变量 idxRollNum 记录数组的索引和滚动数。全局变量 g_taskSortLink 的成员变量 cursor 记录当前游标位置,每过一个 Tick,游标指向下一个位置,转一轮需要 32 ticks。当运行到的数组位置,双向链表不为空,则把第一个节点维护的滚动数减 1。这样的数据结构类似钟表表盘,也称为时间轮。
我们举个例子来说明,基于时间轮实现的任务排序链表是如何管理任务延迟超时的。假如当前游标 cursor 为 1,当一个任务需要延时 72 ticks,72=2*32+8,表示排序索引 sortIndex 为 8,滚动数 rollNum 为 2。会把任务插入数组索引为 sortIndex+cursor=9 的双向链表位置,索要 9 处的双向链表维护节点的滚动为 2。随着 Tick 时间的进行,从当前游标位置运行到数组索引位置 9,历时 8 ticks。运行到 9 时,如果滚动数大于 0,则把滚动数减 1。等再运行 2 轮,共需要 72 ticks,任务就会延迟到期,可以从排序链表移除。每个数组元素对应的双向链表的第一个链表节点的滚动数表示需要转多少轮,节点任务才到期。第二个链表节点的滚动数需要加上第一个节点的滚动数,表示第二个节点需要转的轮数。依次类推。
示意图如下:
1.2 任务排序链表宏定义
在 OS_TSK_SORTLINK_LEN 头文件中定义了一些和任务排序链表相关的宏定义。延迟任务双向链表数组的长度定义为 32,高阶 bit 位位数为 5,低阶 bit 位位数为 27。对于任务的超时时间,取其高 27 位作为滚动数,低 5 位作为数组索引。
源码如下:
/** * 延迟任务双向链表数组的数量(桶的数量):32 */ #define OS_TSK_SORTLINK_LEN 32 /** * 高阶bit位数目:5 */ #define OS_TSK_HIGH_BITS 5U /** * 低阶bit位数目:27 */ #define OS_TSK_LOW_BITS (32U - OS_TSK_HIGH_BITS) /** * 滚动数最大值:0xFFFF FFDF,1111 0111 1111 1111 1111 1111 1101 1111 */ #define OS_TSK_MAX_ROLLNUM (0xFFFFFFFFU - OS_TSK_SORTLINK_LEN) /** * 任务延迟时间数的位宽:5 */ #define OS_TSK_SORTLINK_LOGLEN 5 /** * 延迟任务的桶编号的掩码:31、0001 1111 */ #define OS_TSK_SORTLINK_MASK (OS_TSK_SORTLINK_LEN - 1U) /** * 滚动数的高阶掩码:1111 1000 0000 0000 0000 0000 0000 0000 */ #define OS_TSK_HIGH_BITS_MASK (OS_TSK_SORTLINK_MASK << OS_TSK_LOW_BITS) /** * 滚动数的低阶掩码:0000 0111 1111 1111 1111 1111 1111 1111 */ #define OS_TSK_LOW_BITS_MASK (~OS_TSK_HIGH_BITS_MASK)
2、任务排序链表操作
我们分析下任务排序链表的操作,包含初始化,插入,删除,滚动数更新,获取下一个到期时间等。
2.1 初始化排序链表
在系系统内核初始化启动阶段,在函数 UINT32OsTaskInit(VOID)中初始化任务排序链表。该函数的调用关系如下,main.c:main() --> kernel\src\los_init.c:LOS_KernelInit()--> kernel\src\los_task.c:OsTaskInit()。
初始化排序链表函数的源码如下:
LITE_OS_SEC_TEXT_INIT UINT32 OsTaskInit(VOID) { UINT32 size; UINT32 index; LOS_DL_LIST *listObject = NULL; ...... ⑴ size = sizeof(LOS_DL_LIST) * OS_TSK_SORTLINK_LEN; listObject = (LOS_DL_LIST *)LOS_MemAlloc(m_aucSysMem0, size); ⑵ if (listObject == NULL) { (VOID)LOS_MemFree(m_aucSysMem0, g_taskCBArray); return LOS_ERRNO_TSK_NO_MEMORY; } ⑶ (VOID)memset_s((VOID *)listObject, size, 0, size); ⑷ g_taskSortLink.sortLink = listObject; g_taskSortLink.cursor = 0; for (index = 0; index < OS_TSK_SORTLINK_LEN; index++, listObject++) { ⑸ LOS_ListInit(listObject); } return LOS_OK; }
⑴处代码计算需要申请的双向链表的内存大小,OS_TSK_SORTLINK_LEN 为 32,即需要为 32 个双向链表节点申请内存空间。然后申请内存,⑵处申请内存失败时返回相应错误码。⑶处初始化申请的内存区域为 0 等。⑷处把申请的双向链表节点赋值给 g_taskSortLink 的链表节点.sortLink,作为排序链表的头节点,游标.cursor 初始化为 0。然后⑸处的循环,调用 LOS_ListInit()函数把双向链表数组每个元素都初始化为双向循环链表。
2.2 插入排序链表
插入排序链表的函数为 OsTaskAdd2TimerList()。在任务等待互斥锁/信号量等资源时,都需要调用该函数将任务加入到对应的排序链表中。该函数包含两个入参,第一个参数 LosTaskCB *taskCB 用于指定要延迟的任务,第二个参数 UINT32 timeout 指定超时等待时间。
源码如下:
LITE_OS_SEC_TEXT VOID OsTaskAdd2TimerList(LosTaskCB *taskCB, UINT32 timeout) { LosTaskCB *taskDelay = NULL; LOS_DL_LIST *listObject = NULL; UINT32 sortIndex; UINT32 rollNum; ⑴ sortIndex = timeout & OS_TSK_SORTLINK_MASK; rollNum = (timeout >> OS_TSK_SORTLINK_LOGLEN); ⑵ (sortIndex > 0) ? 0 : (rollNum--); ⑶ EVALUATE_L(taskCB->idxRollNum, rollNum); ⑷ sortIndex = (sortIndex + g_taskSortLink.cursor); sortIndex = sortIndex & OS_TSK_SORTLINK_MASK; ⑸ EVALUATE_H(taskCB->idxRollNum, sortIndex); ⑹ listObject = g_taskSortLink.sortLink + sortIndex; ⑺ if (listObject->pstNext == listObject) { LOS_ListTailInsert(listObject, &taskCB->timerList); } else { ⑻ taskDelay = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); do { ⑼ if (UWROLLNUM(taskDelay->idxRollNum) <= UWROLLNUM(taskCB->idxRollNum)) { UWROLLNUMSUB(taskCB->idxRollNum, taskDelay->idxRollNum); } else { ⑽ UWROLLNUMSUB(taskDelay->idxRollNum, taskCB->idxRollNum); break; } ⑾ taskDelay = LOS_DL_LIST_ENTRY(taskDelay->timerList.pstNext, LosTaskCB, timerList); } while (&taskDelay->timerList != (listObject)); ⑿ LOS_ListTailInsert(&taskDelay->timerList, &taskCB->timerList); } }
⑴处代码计算等待时间 timeout 的低 5 位作为数组索引,高 27 位作为滚动数 rollNum。这 2 行代码数学上的意义,就是把等待时间处于 32 得到的商作为滚动数,余数作为数组索引。⑵处代码,如果余数为 0,可以整除时,滚动数减 1。减 1 设计的原因是,在函数 VOID OsTaskScan(VOID)中,每一个 tick 到来时,如果滚动数大于 0,滚动数减 1,并继续滚动一圈。后文会分析该函数 VOID OsTaskScan(VOID)。
⑶处代码把滚动数赋值给任务 taskCB->idxRollNum 的低 27 位。⑷处把数组索引加上游标,然后执行⑸赋值给任务 taskCB->idxRollNum 的高 5 位。⑹根据数组索引获取双向链表头结点,⑺如果此处双向链表为空,直接插入链表里。如果链表不为空,执行⑻获取第一个链表节点对应的任务 taskDelay,然后遍历循环双向链表,把任务插入到合适的位置。⑼处如果待插入任务 taskCB 的滚动数大于等于当前链表节点对应任务的滚动数,则从待插入任务 taskCB 的滚动数中减去当前链表节点对应任务的滚动数,然后执行⑾获取下一个节点继续遍历。⑽处如果待插入任务 taskCB 的滚动数小于当前链表节点对应任务的滚动数,则从当前链表节点对应任务的滚动数中减去待插入任务 taskCB 的滚动数,然后跳出循环。执行⑿,完成任务插入。插入过程,可以结合上文的示意图进行理解。
2.3 从排序链表中删除
从排序链表中删除的函数为 VOIDOsTimerListDelete(LosTaskCB *taskCB)。在任务恢复/删除等场景中,需要调用该函数将任务从任务排序链表中删除。该函数包含一个参数 LosTaskCB *taskCB,用于指定要从排序链表中删除的任务。
源码如下:
LITE_OS_SEC_TEXT VOID OsTimerListDelete(LosTaskCB *taskCB) { LOS_DL_LIST *listObject = NULL; LosTaskCB *nextTask = NULL; UINT32 sortIndex; ⑴ sortIndex = UWSORTINDEX(taskCB->idxRollNum); ⑵ listObject = g_taskSortLink.sortLink + sortIndex; ⑶ if (listObject != taskCB->timerList.pstNext) { ⑷ nextTask = LOS_DL_LIST_ENTRY(taskCB->timerList.pstNext, LosTaskCB, timerList); UWROLLNUMADD(nextTask->idxRollNum, taskCB->idxRollNum); } ⑸ LOS_ListDelete(&taskCB->timerList); }
⑴处代码获取待从排序链表中删除的任务对应的数字索引。⑵处代码获取排序链表的头节点 listObject。⑶处代码判断待删除节点是否是最后一个节点,如果不是最后一个节点,执行执行⑷处代码获取待删除节点的下一个节点对应的任务 nextTask,在下一个节点的滚动数中增加待删除节点的滚动数,然后执行⑸处代码执行删除操作。如果是最后一个节点,直接执行⑸处代码删除该节点即可。
2.4 获取下一个超时到期时间
获取下一个超时到期时间的函数为 OsTaskNextSwitchTimeGet(),我们分析下其代码。
源码如下:
UINT32 OsTaskNextSwitchTimeGet(VOID) { LosTaskCB *taskCB = NULL; UINT32 taskSortLinkTick = LOS_WAIT_FOREVER; LOS_DL_LIST *listObject = NULL; UINT32 tempTicks; UINT32 index; ⑴ for (index = 0; index < OS_TSK_SORTLINK_LEN; index++) { ⑵ listObject = g_taskSortLink.sortLink + ((g_taskSortLink.cursor + index) % OS_TSK_SORTLINK_LEN); ⑶ if (!LOS_ListEmpty(listObject)) { ⑷ taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); ⑸ tempTicks = (index == 0) ? OS_TSK_SORTLINK_LEN : index; ⑹ tempTicks += (UINT32)(UWROLLNUM((UINT32)taskCB->idxRollNum) * OS_TSK_SORTLINK_LEN); ⑺ if (taskSortLinkTick > tempTicks) { taskSortLinkTick = tempTicks; } } } return taskSortLinkTick; }
⑴处代码循环遍历双向链表数组,⑵处代码从当前游标位置开始获取排序链表的头节点 listObject。⑶处代码判断排序链表是否为空,如果排序链表为空,则继续遍历下一个数组。如果链表不为空,⑷处代码获取排序链表的第一个链表节点对应的任务。⑸处如果遍历的数字索引为 0,tick 数目使用 32,否则使用具体的数字索引。⑹处获取任务的滚动数,计算出需要的等待时间,加上⑸处计算出的不足滚动一圈的时间。⑺处计算出需要等待的最小时间,即下一个最快到期的时间。
3、排序链表和 Tick 时间的关系
任务加入到排序链表后,时间一个 tick 一个 tick 的逝去,排序链表中的滚动数该如何更新呢?
时间每走过一个 tick,系统就会调用 Tick 中断的处理函数 OsTickHandler(),该函数在 kernel\src\los_tick.c 文件中实现。下面是该函数的代码片段,⑴处代码分别任务的超时到期情况。
LITE_OS_SEC_TEXT VOID OsTickHandler(VOID) { #if (LOSCFG_BASE_CORE_TICK_HW_TIME == 1) platform_tick_handler(); #endif g_ullTickCount++; #if (LOSCFG_BASE_CORE_TIMESLICE == 1) OsTimesliceCheck(); #endif ⑴ OsTaskScan(); // task timeout scan #if (LOSCFG_BASE_CORE_SWTMR == 1) (VOID)OsSwtmrScan(); #endif }
详细分析下函数 OsTaskScan(),来了解排序链表和 tick 时间的关系。函数在 kernel\base\los_task.c 文件中实现,代码片段如下:
LITE_OS_SEC_TEXT VOID OsTaskScan(VOID) { LosTaskCB *taskCB = NULL; BOOL needSchedule = FALSE; LOS_DL_LIST *listObject = NULL; UINT16 tempStatus; UINTPTR intSave; intSave = LOS_IntLock(); ⑴ g_taskSortLink.cursor = (g_taskSortLink.cursor + 1) % OS_TSK_SORTLINK_LEN; listObject = g_taskSortLink.sortLink + g_taskSortLink.cursor; ⑵ if (listObject->pstNext == listObject) { LOS_IntRestore(intSave); return; } ⑶ for (taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); &taskCB->timerList != (listObject);) { tempStatus = taskCB->taskStatus; ⑷ if (UWROLLNUM(taskCB->idxRollNum) > 0) { UWROLLNUMDEC(taskCB->idxRollNum); break; } ⑸ LOS_ListDelete(&taskCB->timerList); ⑹ if (tempStatus & OS_TASK_STATUS_PEND) { taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND); LOS_ListDelete(&taskCB->pendList); taskCB->taskSem = NULL; taskCB->taskMux = NULL; } ⑺ else if (tempStatus & OS_TASK_STATUS_EVENT) { taskCB->taskStatus &= ~(OS_TASK_STATUS_EVENT); } ⑻ else if (tempStatus & OS_TASK_STATUS_PEND_QUEUE) { LOS_ListDelete(&taskCB->pendList); taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND_QUEUE); } else { taskCB->taskStatus &= ~(OS_TASK_STATUS_DELAY); } ⑼ if (!(tempStatus & OS_TASK_STATUS_SUSPEND)) { taskCB->taskStatus |= OS_TASK_STATUS_READY; OsHookCall(LOS_HOOK_TYPE_MOVEDTASKTOREADYSTATE, taskCB); OsPriqueueEnqueue(&taskCB->pendList, taskCB->priority); needSchedule = TRUE; } if (listObject->pstNext == listObject) { break; } taskCB = LOS_DL_LIST_ENTRY(listObject->pstNext, LosTaskCB, timerList); } LOS_IntRestore(intSave); ⑽ if (needSchedule) { LOS_Schedule(); } }
⑴处代码更新全局变量 g_taskSortLink 的游标,指向双向链表数组下一个位置,然后获取该位置的双向链表头结点 listObject。⑵如果链表为空,则返回。如果双向链表不为空,则执行⑶循环遍历每一个链表节点。⑷处如果链表节点的滚动数大于 0,则滚动数减 1,说明任务还需要继续等待一轮。如果链表节点的滚动数等于 0,说明任务超时到期,执行⑸从排序链表中删除。接下来需要根据任务状态分别处理,⑹处如果代码是阻塞状态,取消阻塞状态,并从阻塞链表中删除。⑺处如果任务阻塞在事件中,取消阻塞状态。⑻如果任务阻塞在队列,从阻塞链表中删除,取消阻塞状态,如果不是上述状态,取消延迟状态 OS_TASK_STATUS_DELAY。⑼处如果代码是挂起状态,设置任务为就绪状态,加入任务就绪队列,设置需要重新调度标记。⑽如果设置需要重新调度,调用调度函数触发任务调度。
小结
掌握鸿蒙轻内核的排序链表 TaskSortLinkAttr 这一重要的数据结构,会给进一步学习、分析鸿蒙轻内核源代码打下了基础,让后续的学习更加容易。后续也会陆续推出更多的分享文章,敬请期待,也欢迎大家分享学习、使用鸿蒙轻内核的心得,有任何问题、建议,都可以留言给我们:gitee.com/openharmony… 。为了更容易找到鸿蒙轻内核代码仓,建议访问 gitee.com/openharmony… ,关注 Watch、点赞 Star、并 Fork 到自己账户下,谢谢。