实验环境:信号量的实现和应用
实验任务:
- 在 Ubuntu 下编写程序,用信号量解决生产者——消费者问题;
- 在 linux-0.11 中实现信号量,用生产者—消费者程序检验之。
用信号量解决生产者—消费者问题
实验要求:
pc.c
程序需打开一个文件buffer.txt
作为共享缓冲区,缓冲区同时最多只能保存 10 个数;创建一个生产者进程和N个消费者进程,其中生产者进程向缓冲区写入连续的整数,0,1,2,……,M,M>=500;消费者进程从缓冲区依次读取数字,每次读一个,并将读出的数字从缓冲区删除,然后将本进程 ID 和数字输出到标准输出。
- 为什么要有信号量?
- 对于生产者来说,当缓冲区满,也就是空闲缓冲区个数为0时,此时生产者不能继续向缓冲区写数,必须等待,直到有消费者从满缓冲区取走数后,再次有了空闲缓冲区,生产者才能向缓冲区写数。
- 对于消费者来说,当缓冲区空时,此时没有数可以被取走,消费者必须等待,直到有生产者向缓冲区写数后,消费者才能取数。并且如果当缓冲区空时,先后有多个消费者均想从缓冲区取数,那么它们均需要等待,此时需要记录下等待的消费者的个数,以便缓冲区有数可取后,能将所有等待的消费者唤醒,确保请求取数的消费者最终都能取到数。
- 也就是说,当多个进程需要协同合作时,需要根据某个信息,判断当前进程是否需要停下来等待;同时,其他进程需要根据这个信息判断是否有进程在等待,或者有几个进程在等待,以决定是否需要唤醒等待的进程。而这个信息,就是信号量。
- 信号量用来做什么?
设有一整形变量sem,作为一个信号量。此时缓冲区为空,sem=0。注意区分sem_wait和sem_post
- 消费者C1请求从缓冲区取数,不能取到,睡眠等待。sem=-1<0,表示有一个进程因缺资源而等待。
- 消费者C2也请求从缓冲区取数,仍不能取到,睡眠等待。sem=-2<0,表示有两个进程因缺资源而等待。
- 生产者P往缓冲区写入一个数,sem=sem+1=-1<=0,并唤醒等待队列的头进程C1,C1处于就绪态,C2仍处于睡眠等待。
- 生产者P继续往缓冲区写入一个数,sem=0<=0,并唤醒C2,C1、C2都处于就绪状态。
由此可见,通过判断sem的值以及改变sem的值,就保证了多进程合作的合理有序的推进,这就是信号量的作用。
-
信号量实现生产者消费者模型思路:
-
本程序进行循环读写缓冲区的前 10 个数字的空间,生产者在第 10 个位置写入数字后,又回到第 1 个位置继续写入;消费者读取了第 10 个位置的数字后,也回到第 1 个位置。利用对10取余的方法控制读写位置。
-
对缓冲区的读写操作只能允许一个进程,使用信号量为1的mutex(即互斥量)进行控制保护。
-
限制缓冲区的最大保存数量需要两个信号量来共同控制:empty 视为缓冲区剩余的空间资源、full 视为缓冲区已被使用的资源;另外对文件的读写和消费者的整个“消费”过程都是一个原子操作,不可以打断,于是使用两个信号量进行限制。
-
信号量相关函数
-
sem_open()
的功能是创建一个信号量,或打开一个已经存在的信号量。如失败,返回值是 NULLname
是信号量的名字。不同的进程可以通过提供同样的 name 而共享同一个信号量。value
是信号量的初值,仅当新建信号量时,此参数才有效,其余情况下它被忽略。sem_t *sem_open(const char *name, int oflag,mode_t mode, unsigned int value);
-
sem_wait()
就是信号量的 P 原子操作。返回 0 表示成功,返回 -1 表示失败。如果信号量的值比0大,那么进行减一的操作,函数立即返回,往下执行。 如果信号量当前为0值,那么调用就会一直阻塞或者直到信号量变得可以进行减一操作,达到阻塞进程的目的.
int sem_wait(sem_t *sem);
-
sem_post()
就是信号量的 V 原子操作。如果有等待 sem 的进程,它会唤醒其中的一个。返回 0 表示成功,返回 -1 表示失败。sem_post函数的作用是给信号量的值加上一个“1”,即解锁操作; 它是一个原子操作。
int sem_post(sem_t *sem);
-
sem_unlink()
的功能是删除名为 name 的信号量。返回 0 表示成功,返回 -1 表示失败。int sem_unlink(const char *name);
-
lseek()
函数可以改变文件的文件偏移量,所有打开的文件都有一个当前文件偏移量,用于表明文件开始处到文件当前位置的字节数。成功返回新的偏移量,失败返回-1。
SEEK_SET 将读写位置指向文件头后再增加offset个位移量。
SEEK_CUR 以目前的读写位置往后增加offset个位移量。
SEEK_END 将读写位置指向文件尾后再增加offset个位移量。off_t lseek(int filedes, off_t offset, int whence);
用lseek创建一个空洞文件
- 空洞文件就是这个文件有一段是空的;
- 普通文件中间不能有空,write文件时从前往后移动文件指针,依次写入;
- 用lseek往后跳过一段,就形成空洞文件;
- 空洞文件对多线程共同操作文件非常有用。需要创建一个很大文件时,从头开始依次创建时间很长,可以将文件分成多段,多个线程操作每个线程负责其中一段的写入。
ret = lseek(fd, 10, SEEK_SET);
-
信号量实现生产者消费者模型代码 pc.c:
#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/wait.h>
#define M 530 /*打出数字总数*/
#define N 5 /*消费者进程数*/
#define BUFSIZE 10 /*缓冲区大小*/
int main()
{
sem_t *empty, *full, *mutex;/*3个信号量*/
int fd; /*共享缓冲区文件描述符*/
int i,j,k,child;
int data;/*写入的数据*/
pid_t pid;
int buf_out = 0; /*从缓冲区读取位置*/
int buf_in = 0; /*写入缓冲区位置*/
/*打开信号量*/
empty = sem_open("empty", O_CREAT|O_EXCL, 0644, BUFSIZE); /*剩余资源,初始化为size*/
full = sem_open("full", O_CREAT|O_EXCL, 0644, 0); /*已使用资源,初始化为0*/
mutex = sem_open("mutex", O_CREAT|O_EXCL, 0644, 1); /*互斥量,初始化为1*/
fd = open("buffer.txt", O_CREAT|O_TRUNC|O_RDWR,0666);
lseek(fd,BUFSIZE*sizeof(int),SEEK_SET);/*刷新了40个字节的缓冲区,存放10个数字*/
write(fd,(char *)&buf_out,sizeof(int));/*将待读取位置存入buffer后,以便子进程之间通信*/
/*生产者进程*/
if((pid=fork())==0)
{
printf("I'm producer. pid = %d\n", getpid());
/*生产多少个产品就循环几次*/
for( i = 0 ; i < M; i++)
{
/*empty大于0,才能生产*/
sem_wait(empty);
sem_wait(mutex);
/*写入一个字符*/
lseek(fd, buf_in*sizeof(int), SEEK_SET);
write(fd,(char *)&i,sizeof(int));
/*更新写入缓冲区位置,保证在0-9之间*/
/*生产完一轮产品(文件缓冲区只能容纳BUFSIZE个产品编号)后*/
/*将缓冲文件的位置指针重新定位到文件首部。*/
buf_in = (buf_in + 1) % BUFSIZE;
sem_post(mutex);
sem_post(full);/*共享区中已使用资源++,唤醒消费者线程*/
}
printf("producer end.\n");
fflush(stdout);/*确保将输出立刻输出到标准输出。*/
return 0;
}
else if(pid < 0)
{
perror("Fail to fork!\n");
return -1;
}
/*消费者进程*/
for( j = 0; j < N ; j++ )
{
if((pid=fork())==0)
{
for( k = 0; k < M/N; k++ )
{
sem_wait(full);/*共享区中已使用资源--,一开始为0会阻塞此处*/
sem_wait(mutex);
/*获得读取位置*/
lseek(fd,BUFSIZE*sizeof(int),SEEK_SET);
read(fd,(char *)&buf_out,sizeof(int));
/*读取数据*/
lseek(fd,buf_out*sizeof(int),SEEK_SET);
read(fd,(char *)&data,sizeof(int));
/*写入读取位置*/
buf_out = (buf_out + 1) % BUFSIZE;
lseek(fd,BUFSIZE*sizeof(int),SEEK_SET);
write(fd,(char *)&buf_out,sizeof(int));
sem_post(mutex);
sem_post(empty);/*共享区中剩余资源++,唤醒生产者进程*/
/*消费资源*/
printf("%d: %d\n",getpid(),data);
fflush(stdout);
}
printf("child-%d: pid = %d end.\n", j, getpid());
return 0;
}
else if(pid<0)
{
perror("Fail to fork!\n");
return -1;
}
}
/*回收线程资源*/
child = N + 1;
while(child--)
wait(NULL);
/*释放信号量*/
sem_unlink("full");
sem_unlink("empty");
sem_unlink("mutex");
/*释放资源*/
close(fd);
return 0;
}
在ubuntu下执行,结果如下:
gcc pc.c -o pc -lpthread
./pc > 1.txt
cat 1.txt | more
。。。。。。
在 linux-0.11 中实现信号量
-
在linux-0.11/include/linux目录下新建sem.h,定义信号量的数据结构,包括保存名字和值两个属性以及一个等待进程的队列。
#ifndef _SEM_H #define _SEM_H #include <linux/sched.h> #define SEMTABLE_LEN 20 #define SEM_NAME_LEN 20 typedef struct semaphore { char name[SEM_NAME_LEN]; int value; struct task_struct *queue; } sem_t; extern sem_t semtable[SEMTABLE_LEN]; #endif
-
在linux-0.11/kernel/sem.c目录下新建sem.c,实现信号量
sem_open
和sem_unlink
函数的实现:由于
sem_open()
的第一个参数name,传入的是应用程序所在地址空间的逻辑地址,在内核中如果直接访问这个地址,访问到的是内核空间中的数据,不会是用户空间的。所以要用get_fs_byte()
函数获取用户空间的数据。get_fs_byte()
函数的功能是获得一个字节的用户空间中的数据。同样,sem_unlink()
函数的参数name也要进行相同的处理。sem_open()
函数的功能是新建一个信号量或打开一个已存在的信号量,首先需要将参数字符串从用户空间复制到内核空间,然后对比已经存在的信号量,如果名字相同则直接返回该信号量,否则创建新的信号量。sem_unlink()
函数的功能是删除名为 name 的信号量,为了保证系统的多个信号量能在有限长度的数组内都正常工作,当删去信号量时数组后面的元素往前移动,腾出空间。
sem_wait()
和sem_post()
函数的实现:可参照实验楼6.4提供的
lock_buffer()
以及unlock_buffer
实现方法。sem_wait()
函数是 P 原子操作,功能是使信号量的值减一,如果信号量值为 0 就阻塞当前进程,在sem_wait()
中使用 while 循环,当信号量的值小于等于 0 时保证其他进程一直阻塞;sem_post()
函数是 V 原子操作,功能是使信号量的值加一,如果有等待该信号量的进程,则唤醒其中一个。阻塞和唤醒进程由kernel/sched.c
中的sleep_on()
、wake_up()
函数实现,它们的参数都是一个结构体指针——struct task_struct *
,即进程都睡眠或唤醒在该参数指向的一个进程 PCB 结构链表上。sleep_on()
函数参数获取的指针*p
是等待队列的头指针,每次执行该函数时,指针*tmp
指向原本的等待队列,*p
则指向当前进程,即将当前进程插入等待队列头部,然后将当前进程设为睡眠状态,执行schedule()
进行调度。wake_up()
函数将唤醒*p
指向的进程。某个进程被唤醒后,回到sleep_on()
继续执行,它将*tmp
指向的进程继续唤醒,即唤醒等待队列的上一个进程。依次执行下去,等待队列的所有进程都会被唤醒。完整
sem.c
代码如下:#include <linux/sem.h> #include <linux/sched.h> #include <unistd.h> #include <asm/segment.h> #include <linux/tty.h> #include <linux/kernel.h> #include <linux/fdreg.h> #include <asm/system.h> #include <asm/io.h> //#include <string.h> sem_t semtable[SEMTABLE_LEN]; int cnt = 0; sem_t *sys_sem_open(const char *name,unsigned int value) { char kernelname[100]; /* 应该足够大了 */ int isExist = 0; int i=0; int name_cnt=0; while( get_fs_byte(name+name_cnt) != '\0') name_cnt++; if(name_cnt>SEM_NAME_LEN) return NULL; for(i=0;i<name_cnt;i++) /* 从用户态复制到内核态 */ kernelname[i]=get_fs_byte(name+i); int name_len = strlen(kernelname); int sem_name_len =0; sem_t *p=NULL; for(i=0;i<cnt;i++) { sem_name_len = strlen(semtable[i].name); if(sem_name_len == name_len) { if( !strcmp(kernelname,semtable[i].name) ) { isExist = 1; break; } } } if(isExist == 1) { p=(sem_t*)(&semtable[i]); //printk("find previous name!\n"); } else { i=0; for(i=0;i<name_len;i++) { semtable[cnt].name[i]=kernelname[i]; } semtable[cnt].value = value; p=(sem_t*)(&semtable[cnt]); //printk("creat name!\n"); cnt++; } return p; } int sys_sem_wait(sem_t *sem) { cli(); while( sem->value <= 0 ) //使得所有小于0的进程阻塞 sleep_on(&(sem->queue)); sem->value--; sti(); return 0; } int sys_sem_post(sem_t *sem) { cli(); sem->value++; if( (sem->value) <= 1) //小于0阻塞,只有为1时才唤醒,保证一次仅唤醒一个进程 wake_up(&(sem->queue)); sti(); return 0; } int sys_sem_unlink(const char *name) { char kernelname[100]; /* 应该足够大了 */ int isExist = 0; int i=0; int name_cnt=0; while( get_fs_byte(name+name_cnt) != '\0') name_cnt++; if(name_cnt>SEM_NAME_LEN) return NULL; for(i=0;i<name_cnt;i++) kernelname[i]=get_fs_byte(name+i); int name_len = strlen(name); int sem_name_len =0; for(i=0;i<cnt;i++) { sem_name_len = strlen(semtable[i].name); if(sem_name_len == name_len) { if( !strcmp(kernelname,semtable[i].name)) { isExist = 1; break; } } } if(isExist == 1) { int tmp=0; for(tmp=i;tmp<=cnt;tmp++) { semtable[tmp]=semtable[tmp+1]; } cnt = cnt-1; return 0; } else return -1; }
-
修改/include/unistd.h,添加新增的系统调用的编号:
#define __NR_setregid 71 /* 添加系统调用号 */ #define __NR_whoami 72 /* 实验2新增 */ #define __NR_iam 73 #define __NR_sem_open 74 /* 实验5新增 */ #define __NR_sem_wait 75 #define __NR_sem_post 76 #define __NR_sem_unlink 77
-
修改/kernel/system_call.s,需要修改总的系统调用的和值:
nr_system_calls = 78
-
修改/include/linux/sys.h,声明新增函数
extern int sys_sem_open(); extern int sys_sem_wait(); extern int sys_sem_post(); extern int sys_sem_unlink(); fn_ptr sys_call_table[] = { //...sys_setreuid,sys_setregid,sys_whoami,sys_iam, sys_sem_open,sys_sem_wait,sys_sem_post,sys_sem_unlink };
-
修改linux-0.11/kernel目录下的Makefile
OBJS = sched.o system_call.o traps.o asm.o fork.o \ panic.o printk.o vsprintf.o sys.o exit.o \ signal.o mktime.o who.o sem.o // ... ### Dependencies: sem.s sem.o: sem.c ../include/linux/sem.h ../include/linux/kernel.h \ ../include/unistd.h
-
重新编译内核:make all
-
修改pc.c,适用于linux-0.11运行:
注意:
(1). 在linux0.11系统的应用程序中,注释不能写
//
,必须要写/* */
(2). 不能在程序中间对变量定义,比如使用循环时的i
要在开始定义,所有变量都必须要在一开始统一定义。#define __LIBRARY__ #include <unistd.h> #include <sys/types.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <linux/sem.h> _syscall2(sem_t*,sem_open,const char *,name,unsigned int,value); _syscall1(int,sem_wait,sem_t*,sem); _syscall1(int,sem_post,sem_t*,sem); _syscall1(int,sem_unlink,const char *,name); int main() { ... /*打开信号量*/ if((mutex = sem_open("mutex",1)) == NULL) { perror("sem_open() error!\n"); return -1; } if((empty = sem_open("empty",10)) == NULL) { perror("sem_open() error!\n"); return -1; } if((full = sem_open("full",0)) == NULL) { perror("sem_open() error!\n"); return -1; } ... }
-
将已经修改的/usr/include/unistd.h和sem.h文件以及新修改的pc.c拷贝到linux-0.11系统中,用于测试实现的信号量。
sudo ./mount-hdc cp ./unistd.h ./hdc/usr/include/ cp ./sem.h ./hdc/usr/include/linux/ cp ./pc.c ./hdc/usr/root/ sudo umount hdc/
-
启动新编译的linux-0.11内核,用pc.c测试实现的信号量
./run gcc -o pc pc.c ./pc > 1.txt sync
-
关闭linux-0.11,挂载虚拟磁盘,查看信息输出文件
sudo ./mount-hdc sudo cp ./hdc/usr/root/1.txt ./ sudo chmod 777 1.txt cat 1.txt | more
。。。。。。
实验问题
在 pc.c 中去掉所有与信号量有关的代码,再运行程序,执行效果有变化吗?为什么会这样? 实验的设计者在第一次编写生产者——消费者程序的时候,是这么做的:
Producer()
{
P(Mutex); //互斥信号量
//生产一个产品item;
P(Empty); //空闲缓存资源
//将item放到空闲缓存中;
V(Full); //产品资源
V(Mutex);
}
Consumer()
{
P(Mutex);
P(Full);
//从缓存区取出一个赋值给item;
V(Empty);
//消费产品item;
V(Mutex);
}
这样可行吗?如果可行,那么它和标准解法在执行效果上会有什么不同?如果不可行,那么它有什么问题使它不可行?
答:
去掉所有与信号量有关的代码,再运行程序,执行效果有变化,输出的数字顺序完全混乱。
没有了信号量,进程之间无法同步工作,而且无法保证对文件的读写是原子操作,无法保证顺序,工作异常。
实验楼中提供的题目代码逻辑正确,应该分析的代码如上所示。
分析流程:
- 假设Producer刚生产完一件商品,释放了Mutex,Mutex为1,此时缓存区满了,Empty为0;
- 然后OS执行调度,若又被Producer拿到CPU,它拿到Mutex,使Mutex为0,而Empty为0阻塞,Producer让出CPU,等待Consumer执行V(Empty);
- 而Consumer拿到CPU后,Mutex还为0,要阻塞等待Producer执行V(Mutex);
- 两者相互持有对方需要的资源,造成死锁。
参考链接:
https://blog.csdn.net/weixin_41761478/article/details/100093113
https://blog.csdn.net/laoshuyudaohou/article/details/103842973