RT-Thread版本:4.0.5
MCU型号:STM32F103RCT6(ARM Cortex-M3 内核)
1 同步与互斥
先补充几个概念:
- 执行单元:由当前硬件平台及其运行的操作系统来决定的,对于RT-Thread与STM32的执行单元为:线程与中断
- 临界区:多个执行单元同时操作/访问同一块区域(代码)
- 竞态:指多线程情况下计算的正确性依赖于相对时间顺序或线程的交错
当多个执行单元(线程、中断)同时执行临界区,操作临界资源,会导致竞态产生,此时可以采用同步或互斥的方式解决该问题。
-
同步:指按预定的先后次序运行。(有顺序性)
- 线程同步:指多个线程通过特定的机制来控制线程之间的执行顺序。
-
互斥:即不允许多个执行单元同时操作临界区(无顺序性)
- 线程互斥:指对于临界区资源访问的排它性。当多个线程都要使用临界区资源时,任何时刻最多只允许一个线程去使用,其它要使用该资源的线程必须等待,直到占用资源者释放该资源。
线程互斥可以看成是一种特殊的线程同步。
对于裸机而言,一般采用全局标志实现对临界资源的同步与互斥访问,通常CPU会一直轮询查询该标志是否满足占用条件。在RTOS中,一般采用信号量、互斥量、事件集等方式保护临界区资源,就信号量来说,当信号量实例资源为空时,线程进入阻塞状态等待信号量到来,一但有信号量资源时会立刻唤醒该线程,这样可以减少CPU资源消耗,而且实时响应速度也是最快的。
2 信号量概念与分类
信号量(Semaphore)是一种轻型的用于解决线程间同步问题的内核对象,线程可以获取或释放它,从而达到同步或互斥的目的。
RT-Thread中信号量对象的组成:
- 信号量值:即信号量对象的实例(资源)数目,表示共有多少个信号量实例可以被使用
- 线程等待队列:未成功获取信号量的线程会挂在此队列上等待
通常用一个信号量的计数值(sem->value
)表示可被占用的资源数,其值为只能为0和正整数最大(65535),成功释放信号量会让该值加1,成功获取信号量会让该值减1。当值为0时表示资源为空,想获取该信号量的线程会被阻塞挂在线程等待队列上。
信号量互斥与同步在使用方式上有如下不同:
- 用作互斥:信号量在创建后资源个数应该是满的,线程需要使用临界资源时,先获取信号量,使其变为空,这样实现了任何时刻只允许一个线程去使用该资源。但这会带来线程优先级翻转的问题。(后面会专门写一遍谈谈其危害与解决方法)。
- 用作同步:信号量创建后应被置为空。假设有线程1和线程2,线程1取信号量会被阻塞,线程2在某种条件下释放信号量后,如果线程1的优先级最高则会立马切到该线程,实现两个线程间的同步。同样,中断和线程也可以采用此方式实现同步(在中断中只能释放信号量,不能获取信号量)。
注意:中断与线程间的互斥不能采用信号量的方式,而应采用开关中断的方式。
2.1 二值信号量
二值信号量类似一个标志位,其资源计数值只能为0和1,0表示该资源被获取,1表示被释放。
前面说过使用全局变量做标志位CPU需要一直查询,造成CPU资源浪费。使用二值信号量可以很好解决该问题,当二值信号量为空时,线程进入阻塞态等待即可,当该信号量被标记为1时唤醒该线程。但是使用二值信号量保护临界资源时会导致*优先级翻转。
2.2 计数型信号量
计数型信号量其资源计数值范围为0~65535,即可允许多个线程获取信号量访问资源,但会限制资源的最大数目,当访问的线程达到此数目时,会阻塞其他试图获取该信号量的线程,直至有线程释放了信号量(未获取信号量的线程也可以释放信号量)。一般用于:
- 事件计数:事件产生时释放信号量,事件处理时获取信号量,当值为0表示所有事件处理完毕
- 资源管理:信号量值代表当前资源可以数量,值为0说明没有资源了(比如停车场车位)。
3 信号量控制块
信号量控制块是操作系统用于管理信号量的一个数据结构:
struct rt_ipc_object
{
struct rt_object parent; /**< 继承自 rt_object 类 */
rt_list_t suspend_thread; /**< 挂在此资源上的线程 */
};
struct rt_semaphore
{
struct rt_ipc_object parent; /* 继承自 ipc_object 类 */
rt_uint16_t value; /* 信号量的值 */
rt_uint16_t reserved; /* 保留字段 */
};
typedef struct rt_semaphore *rt_sem_t;
其中sem->value
即为资源计数值,sem->parent.suspend_thread
为因获取该信号量失败而被挂起的线程,利用双向循环链表存储,按线程优先级或FIFO顺序插入到链表中。
4 信号量函数接口
4.1 创建/删除
- 创建信号量
/**
* @brief Creating a semaphore object.
* @param name 信号量名称
* @param 资源计数值
* value 如果用于共享资源,则应该将该值初始化为可用资源的数量。
* 如果用于表示事件的发生,则应该将该值初始化为0。
*
* @param flag 表示当信号量不可用时(value==0),想获取该信号量的线程等待的排队方式:
* RT_IPC_FLAG_PRIO: 等待线程队列将按照线程优先级进行排队,优先级高的等待线程将先获得等待的信号量
* RT_IPC_FLAG_FIFO: 等待线程队列将按照先进先出的方式排队,先进入的线程将先获得等待的信号量
*
* NOTE: RT_IPC_FLAG_FIFO 属于非实时调度方式,通常不建议使用,除非应用程序非常在意先来后到,
* 并且用户清楚地明白所有涉及到该信号量的线程都将会变为非实时线程,
* 否则建议采用 RT_IPC_FLAG_PRIO,即确保线程的实时性。
*
* @return 返回一个指向信号量对象的指针。创建失败返回值为RT_NULL
*/
rt_sem_t rt_sem_create(const char *name, rt_uint32_t value, rt_uint8_t flag)
{
rt_sem_t sem;
RT_DEBUG_NOT_IN_INTERRUPT;
RT_ASSERT(value < 0x10000U); // 值必须小于2^16(65536)
/* 分配内核对象 */
sem = (rt_sem_t)rt_object_allocate(RT_Object_Class_Semaphore, name);
if (sem == RT_NULL)
return sem;
/* 初始化信号量对象 */
_ipc_object_init(&(sem->parent));
/* 设置可用信号量的值 */
sem->value = value; // 如果创建的是二值信号量,其取值范围为[0,1],如果是计数信号量,其取值范围为[0,65535]
/* 设置信号量模式 */
sem->parent.parent.flag = flag;
return sem;
}
}
- 删除信号量
rt_err_t rt_sem_delete(rt_sem_t sem)
{
RT_DEBUG_NOT_IN_INTERRUPT;
/* 参数检查 */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_ASSERT(rt_object_is_systemobject(&sem->parent.parent) == RT_FALSE);
/* 唤醒所有阻塞挂着此信号量上的线程 */
_ipc_list_resume_all(&(sem->parent.suspend_thread));
/* 删除信号量对象 */
rt_object_delete(&(sem->parent.parent));
return RT_EOK;
}
调用该函数时,系统将删除这个信号量(必须是动态创建的),如果删除该信号量时,有线程正在等待该信号量,那么删除操作会先唤醒等待在该信号量上的线程(等待线程的返回值是 -RT_ERROR),然后再释放信号量的内存资源。
4.2 初始化/脱离
静态初始化与脱离(静态对象不能被删除)对象,功能与上述函数一致,不再赘述。
4.3 获取/无等待获取
- 获取信号量
rt_err_t rt_sem_take(rt_sem_t sem, rt_int32_t time)
{
register rt_base_t temp;
struct rt_thread *thread;
/* 参数检查 */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(sem->parent.parent)));
/* 关中断 */
temp = rt_hw_interrupt_disable();
if (sem->value > 0)
{
/* 有可用信号量 */
sem->value --;
/* 开中断 */
rt_hw_interrupt_enable(temp);
}
else
{
/* 不等待,返回超时错误 */
if (time == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
else
{
/* 当前上下文检查 */
RT_DEBUG_IN_THREAD_CONTEXT;
/* 信号量不可用,挂起当前线程 */
/* 获取当前线程 */
thread = rt_thread_self();
/* 设置线程错误码 */
thread->error = RT_EOK;
/* 挂起线程 */
_ipc_list_suspend(&(sem->parent.suspend_thread),
thread,
sem->parent.parent.flag);
/* 有等待时间则开始计时 */
if (time > 0)
{
/* 设置线程超时时间,并启动定时器 */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&time);
rt_timer_start(&(thread->thread_timer));
}
/* 开中断 */
rt_hw_interrupt_enable(temp);
/* 发起线程调度 */
rt_schedule();
if (thread->error != RT_EOK)
{
return thread->error;
}
}
}
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(sem->parent.parent)));
return RT_EOK;
}
- 当信号量计数值
sem->vaule
大于0,线程将获取信号量,并将sem->vaule
减1 - 当
sem->vaule
为0,表示当前信号量资源实例为空,申请信号量的线程根据time参数决定调用方式:- time==0:直接返回,返回值为-RT_ETIMEOUT;
- time>0:挂起当前线程,如果有等待时间则设置线程超时回调时间,并启动该线程的定时,然后开始线程调度。
- 无等待获取信号量
rt_err_t rt_sem_trytake(rt_sem_t sem)
{
return rt_sem_take(sem, RT_WAITING_NO);
}
RT_WAITING_NO
宏值为0,即使用没有等待时间的方式获取信号量。
4.4 释放信号量
rt_err_t rt_sem_release(rt_sem_t sem)
{
register rt_base_t temp;
register rt_bool_t need_schedule;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(sem->parent.parent)));
need_schedule = RT_FALSE;
/* disable interrupt */
temp = rt_hw_interrupt_disable();
if (!rt_list_isempty(&sem->parent.suspend_thread))
{
/* 恢复阻塞线程 */
_ipc_list_resume(&(sem->parent.suspend_thread));
need_schedule = RT_TRUE;
}
else
{
if(sem->value < RT_SEM_VALUE_MAX)
{
sem->value ++; /* value值自增1 */
}
else
{
rt_hw_interrupt_enable(temp); /* enable interrupt */
return -RT_EFULL; /* value值溢出 */
}
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* 恢复当前线程,发起任务调度 */
if (need_schedule == RT_TRUE)
rt_schedule();
return RT_EOK;
}
- 若信号量等待队列为空(即没有线程因访问该信号量而进入阻塞态),直接把
sem->value
加1 - 若信号量等待队列不为空,则唤醒队列中第一个线程(将线程从阻塞列表中删除,添加到就绪列表中),由它获取信号量
注:中断中可以调用此函数实现中断与线程的同步(中断中仅可以释放信号量,不能获取信号量)。
5 信号量应用示例
5.1 互斥访问临界资源
/*
* Date Author
* 2022-02-05 issac wan
*/
#include <rtthread.h>
#define my_printf(fmt, ...) rt_kprintf(fmt"\n", ##__VA_ARGS__)
#define THREAD_PRIORITY 20
#define THREAD_TIMESLICE 5
static uint16_t num1 = 0, num2 = 0;
static rt_sem_t sem1;
static rt_thread_t send_thread = RT_NULL;
void send_thread_entry(void * param){
while(1){
rt_sem_take(sem1, RT_WAITING_FOREVER);
num1++;
rt_thread_delay(100);
num2++;
rt_sem_release(sem1);
}
}
ALIGN(RT_ALIGN_SIZE)
static struct rt_thread receive_thread;
static rt_uint8_t receive_thread_stack[512] = {0};
void receive_thread_entry(void * param){
while(1){
rt_sem_take(sem1, RT_WAITING_FOREVER);
if (num1 == num2)
my_printf("[%u]Successful! num1:[%d], num2:[%d]", rt_tick_get(), num1, num2);
else
my_printf("[%u]Fail! num1:[%d], num2:[%d]", rt_tick_get(), num1, num2);
rt_sem_release(sem1);
rt_thread_delay(1000);
}
}
int semaphore_sample(void){
rt_err_t receive_thread_ret = 0;
sem1 = rt_sem_create("sem1", 1, RT_IPC_FLAG_FIFO);
if (sem1 == RT_NULL)
return -RT_ERROR;
send_thread = rt_thread_create("send_th",
send_thread_entry,
RT_NULL,
512,
THREAD_PRIORITY,
THREAD_TIMESLICE);
if (send_thread == RT_NULL)
return -RT_ERROR;
else
rt_thread_startup(send_thread);
receive_thread_ret = rt_thread_init(&receive_thread,
"rec_th",
receive_thread_entry,
RT_NULL,
receive_thread_stack,
sizeof(receive_thread_stack),
THREAD_PRIORITY - 1,
THREAD_TIMESLICE);
if (receive_thread_ret != RT_EOK)
return receive_thread_ret;
else
rt_thread_startup(&receive_thread);
return RT_EOK;
}
INIT_APP_EXPORT(semaphore_sample);
- 创建一个二值信号量,初始资源计数值为1;
- 创建两个线程:发送线程与接收线程,两线程均采用无限等待的方式获取线程;
- 发送线程中在成功获取信号量后,先让
num1
+1,然后用延时函数模拟占用信号量的时间,延时结束后再让num2
+1,最后释放信号量; - 接收线程在成功获取信号量后,如果
num1==num2
则表示同步成功,反之失败,最后释放信号量。
串口打印信息如下:
注:互斥可以看成特殊的线程同步
5.2 资源计数(模拟停车场)
/*
* Date Author
* 2022-02-05 issac wan
*/
#include <rtthread.h>
#define my_printf(fmt, ...) rt_kprintf("[%u]"fmt"\n", rt_tick_get(), ##__VA_ARGS__)
#define THREAD_PRIORITY 20
#define THREAD_TIMESLICE 5
#define MAX_TRUCK_SPACE (5U) // 最大5个车位
static rt_sem_t sem;
static rt_thread_t park_thread = RT_NULL;
static rt_thread_t pick_thread = RT_NULL;
void park_thread_entry(void* param){
rt_err_t park_ret = RT_EOK;
while(1){
park_ret = rt_sem_trytake(sem); // 无等待获取
if(RT_EOK == park_ret){
my_printf("成功获取1个车位, 当前还有%d个空车位", sem->value);
}else{
my_printf("车位已满!");
}
rt_thread_delay(1000);
}
}
void pick_thread_entry(void* param){
while(1){
if(MAX_TRUCK_SPACE == sem->value){
my_printf("车位已全部空出!");
}
else{
rt_sem_release(sem);
my_printf("成功释放1个车位, 当前共有%d个空车位", sem->value);
}
rt_thread_delay(3000);
}
}
int semaphore_sample(void){
sem = rt_sem_create("sem", MAX_TRUCK_SPACE, RT_IPC_FLAG_PRIO);
if (sem == RT_NULL)
return -RT_ERROR;
park_thread = rt_thread_create("park_th",
park_thread_entry,
RT_NULL,
512,
THREAD_PRIORITY - 1,
THREAD_TIMESLICE);
if (park_thread == RT_NULL)
return -RT_ERROR;
else
rt_thread_startup(park_thread);
pick_thread = rt_thread_create("pick_th",
pick_thread_entry,
RT_NULL,
512,
THREAD_PRIORITY,
THREAD_TIMESLICE);
if (pick_thread == RT_NULL)
return -RT_ERROR;
else
rt_thread_startup(pick_thread);
return RT_EOK;
}
INIT_APP_EXPORT(semaphore_sample);
- 初始化两个线程:停车线程与取车线程;
- 初始化1个计数信号量,用于停车场空位计数,最大值为5。
- 停车线程每1秒无等待获取一次信号量,当信号量为空时,停车失败;
- 取车线程每3秒释放一次信号量,当信号量计数值达设定的最大值时,取车失败。
串口打印信息如下:
6 总结
- 同步允许多个线程同时访问临界资源,具体顺序性;互斥仅允许1个线程访问临界资源,无顺序性。
- 信号量主要用于解决线程间同步与互斥问题,分为二值信号量(0-1)与计数型信号量(0-65535)。
- 信号量保护临界资源时会带来优先级翻转的问题,需注意。
- 信号量可实现中断与线程间的同步(仅允许在中断中释放信号量,不允许获取信号量),但无法实现两者间的互斥(可通过关中断来实现)。