生产者消费者问题
生产者消费者问题模型
生产者消费者问题是一个经典的、多线程同步问题。
有两个线程: 一个生产者线程和一个消费者线程。两个线程共享一个初始为空、固定大小为n的缓存区。
生产着的工作是“生产”一段数据,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待,如此反复;
同时,只有缓冲区非空时,消费者才能从中取出数据,一次“消费”一段数据,否则必须等待,如此反复。
问题的核心是:
- 要保证不让生产者在缓存还是满的时候仍然要向内写数据;
- 不让消费者试图从空的缓存中取出数据。
生产者消费者问题本质
解决生产者消费者问题实际上是要解决线程间互斥关系问题和同步关系问题
由于缓冲区是临界资源,它一个时刻只允许一个生产者放入消息,或者一个消费者从中取出消息,所以这里我们需要解决一个互斥访问的问题。同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,所以我们还需要解决一个同步的问题。
生产者消费者问题解决模型
- 互斥访问:定义一个二值信号量,初始值为value=1。首先生产者线程获取二值信号量,value减一,使得消费者线程无法获取改信号量,处于挂起状态,之后进行一系列操作并释放该信号量,value此时变为1,被消费者线程获取;此时消费者线程获取信号量,value减一,使得生产者线程无法获取改信号量,处于挂起状态,消费者线程执行相应操作,并释放信号量,value此时变为1,又被生产者线程获取;如此反复,任一时刻,生产者线程和消费者线程都不会同时执行。
- 同步问题:定义两个信号量,缓冲区空信号量(初始值value1等于缓冲区大小)和缓冲区满信号量(初始值value2=0)。生产者线程运行时,首先获取缓冲区空信号量的value1,若value1非0,代表缓冲区未满,则value1减一,从而释放一个缓冲区满信号量,使得value2加一;消费者线程运行时,首先获取缓冲区满信号量的value2,若value2非0,则代表缓冲区未空,则value2减一,从而释放一个缓冲区空信号量,使得value1加一。
小例
代码
/*
* 程序清单:生产者消费者例子
*
* 这个例子中将创建两个线程用于实现生产者消费者问题
*(1)生产者线程将cnt值每次加1并循环存入array数组的5个成员内;
*(2)消费者线程将生产者中生产的数值打印出来,并累加求和
*/
#include <rtthread.h>
#define THREAD_PRIORITY 6
#define THREAD_STACK_SIZE 512
#define THREAD_TIMESLICE 5
/* 定义最大5个元素能够被产生 */
#define MAXSEM 5
/* 用于放置生产的整数数组 */
rt_uint32_t array[MAXSEM];
/* 指向生产者、消费者在array数组中的读写位置 */
static rt_uint32_t set, get;
/* 指向线程控制块的指针 */
static rt_thread_t producer_tid = RT_NULL;
static rt_thread_t consumer_tid = RT_NULL;
/* 定义二值信号量 缓冲区空信号量 缓冲区满信号量 */
struct rt_semaphore sem_lock;
struct rt_semaphore sem_empty, sem_full;
/* 生产者线程入口 */
void producer_thread_entry(void *parameter)
{
int cnt = 0;
/* 运行10次 */
while (cnt < 10)
{
/* 获取一个空位 */
rt_sem_take(&sem_empty, RT_WAITING_FOREVER);
/* 修改array内容,上锁 */
rt_sem_take(&sem_lock, RT_WAITING_FOREVER);
array[set % MAXSEM] = cnt + 1;
rt_kprintf("the producer generates a number: %d\n", array[set % MAXSEM]);
set++;
rt_sem_release(&sem_lock);
/* 释放一个满位 */
rt_sem_release(&sem_full);
cnt++;
/* 暂停一段时间 */
rt_thread_mdelay(20);
}
rt_kprintf("the producer exit!\n");
}
/* 消费者线程入口 */
void consumer_thread_entry(void *parameter)
{
rt_uint32_t sum = 0;
while (1)
{
/* 获取一个满位 */
rt_sem_take(&sem_full, RT_WAITING_FOREVER);
/* 临界区,上锁进行操作 */
rt_sem_take(&sem_lock, RT_WAITING_FOREVER);
sum += array[get % MAXSEM];
rt_kprintf("the consumer[%d] get a number: %d\n", (get % MAXSEM), array[get % MAXSEM]);
get++;
rt_sem_release(&sem_lock);
/* 释放一个空位 */
rt_sem_release(&sem_empty);
/* 生产者生产到10个数目,停止,消费者线程相应停止 */
if (get == 10) break;
/* 暂停一段时间 生产速度大于消费速度 */
rt_thread_mdelay(50);
}
rt_kprintf("the consumer sum is: %d\n", sum);
rt_kprintf("the consumer exit!\n");
}
int producer_consumer(void)
{
set = 0;
get = 0;
/* 初始化3个信号量 */
rt_sem_init(&sem_lock, "lock", 1, RT_IPC_FLAG_FIFO);
rt_sem_init(&sem_empty, "empty", MAXSEM, RT_IPC_FLAG_FIFO);
rt_sem_init(&sem_full, "full", 0, RT_IPC_FLAG_FIFO);
/* 创建生产者线程 */
producer_tid = rt_thread_create("producer",
producer_thread_entry, RT_NULL,
THREAD_STACK_SIZE,
THREAD_PRIORITY - 1, THREAD_TIMESLICE);
if (producer_tid != RT_NULL)
{
rt_thread_startup(producer_tid);
}
else
{
rt_kprintf("create thread producer failed");
return -1;
}
/* 创建消费者线程 */
consumer_tid = rt_thread_create("consumer",
consumer_thread_entry, RT_NULL,
THREAD_STACK_SIZE,
THREAD_PRIORITY + 1, THREAD_TIMESLICE);
if (consumer_tid != RT_NULL)
{
rt_thread_startup(consumer_tid);
}
else
{
rt_kprintf("create thread consumer failed");
return -1;
}
return 0;
}
/* 导出到 msh 命令列表中 */
MSH_CMD_EXPORT(producer_consumer, producer_consumer sample);