这是笔者的操作系统实验作业,在完成过程中遇到了很多问题,也收获了很多知识和教训,在此分享自己的见解。
1.实验要求
利用信号量机制,实现读者写者问题的解决方案。
在Linux 环境下,创建一个控制台进程,此进程包含 n 个线程。用这 n 个线程来表示 n 个读者或写者。
每个线程按相应测试数据文件 (后面有介绍)的要求进行读写操作。用信号量机制分别实现读者优先和写者优先的读者-写者问题。
具体的读-写要求:
读者-写者问题的读写操作限制(包括读者优先和写者优先) :
- 写-写互斥,即不能有两个写者同时进行写操作。
- 读-写互斥,即不能同时有一个线程在读,而另一个线程在写。
- 读-读允许,即可以有一个或多个读者在读。
读者优先的附加限制:
- 如果一个读者申请进行读操作时已有另一个读者正在进行读操作,则该读者可直接开始读操作。
写者优先的附加限制:
- 如果一个读者申请进行读操作时已有另一写者在等待访问共享资源,则该读者必须等到没有写者处于等待状态才能开始读操作。
具体的实现要求
- 数据文件:
要求程序从指定文件中读取具体的读写请求,具体如下:
R 100 //表示读者线程,持续时间100ms
W 200 //表示写者线程,持续时间200ms
//注:这里的要求是我自己设置的,具体形式如何非常轻松
- 临界区实现
可以定义一个环形队列,读者每次从队首读取一个数据(并拿出),写者每次从队尾写入一个数据。
现在来解析一下代码实现。
2.数据结构设计
1.信号量设计
typedef struct
{
int head,front; //等待队列的队首和队尾指针,即数组下标
int thread[21]; //等待队列,保存子线程的id
}semaphore_2; //二进制信号量,即同时可执行的线程只能有1个
void init_sema(semaphore_2 *s)
{
s->head=0;
s->front=0; //初始化指针
}
void wait(semaphore_2 *s,int thread_id) //要控制的信号量和访问线程id
{
s->thread[s->front] = thread_id;
s->front+=1; //将线程id加入等待队列
while(s->thread[s->head]!=thread_id); //等待队首指针指向本线程id,表示可以运行了
}
void signal(semaphore_2 *s,int thread_id)
{
s->head+=1; //本线程释放信号量,允许等待队列里的下一个线程进行
}
考虑到性能限制以及要求不高,我设置了子线程的创建上限,因此等待队列不需要是循环的,大于子线程上限即可。
2.临界区内数据读取
int quene[30];
int head = 0;
int last = 0;
int read_in_quene()
{
if(head==last)
return -1; //空队列,读取错误
int result=quene[head];
head=(head+1)%30; //队首元素出队
return result; //返回读取结果
}
int write_in_quene()
{
srand(clock());
int num = rand()%100;
quene[last]=num;
last=(last+1)%30; //入队
return num;
}
这里将数据队列设为全局变量,每次写入时写入一个随机数。需要注意用srand函数控制种子,避免每次写入同一个随机数。
3.时间相关函数
这里算是一个小工具了,实验要求展示每次在创建线程成功、线程进入临界区、退出临界区时写出当前时间。最开始我是用clock()来读取时间的,但是后来仔细看才知道它返回的是线程在CPU中工作的时钟数,如果用sleep或者usleep函数阻塞线程,这段时间不会计入clock的,因此改用了gettimeofdat()函数,用它可以返回精确到微秒级别的时间,函数定义如下:
#include<sys/time.h>
int gettimeofday(struct timeval*tv,struct timezone *tz )
其中第一个参数是当前时间,第二个参数是当前时区信息,我们不使用它,直接传NULL即可。struct timeval定义如下:
struct timeval{
long tv_sec; //秒数
long tv_usec; //微秒数
};
这里两个数并不是相同的意思,具体的时间应该是tv_sec秒+tv_usec微秒,因此不能简单的相减获得时间,为此设计了相减的函数:
long difftime_timeval(struct timeval start,struct timeval end) //end时间大于start时间
{
long val_usec = end.tv_usec-start.tv_usec +(end.tv_sec-start.tv_sec)*1000000;
return val_usec; //返回相差的微秒数
}
然后设置一个全局变量prog_start表示程序运行开始的时间,然后每次调用difftime_timeval,传入prog_start和得到的当前时间就可以获得时间差了,这个时间差可以理解为自程序运行起经过的时间。
4.控制线程
控制线程需要用到pthread.h的相关函数。我们需要使用
int pthread_create(pthread_t *newthread, const pthread_attr_t *attr, void *(*__start_routine)(void *), void *arg)
函数,这里第一个参数为线程id,创建完毕后会修改为对应得值,第二个参数为创建线程的模式,也可以传为NULL表示默认方式。
注意第三个参数,这里要求我们传入子线程执行的函数,即子线程执行的内容需要用函数包装起来,这个函数必须为void类型,参数只能有一个参数且必须为void类型。第四个参数即为要传入的参数。如果你的子线程没有参数要传,可以传NULL。
在这里我想传入线程的id和读写的持续时间(其实就是等待时间),因此包装一个struct来让一个指针传入这些参数:
typedef struct
{
int thread_type; //线程类型,0表示读,1表示写
int thread_id; //线程id,表示线程创建的顺序(调用pthread_create的顺序)
int thread_lasttime; //线程读写的持续时间
}thread_arg;
这样在线程内部解析*arg即可得到参数。
3.读者优先方式
所谓读者优先,就是在等待进入临界区的线程里,优先考虑读线程,由于多个线程读是不会产生排斥的(一般来说,读取时不能修改数据,但本实验要求读者拿走队首的数据,可以认为这个操作很快,不会发生冲突),因此只要有读线程在读,后来的读线程就可以跳过前面的写线程来进行。
这里设计了一个计数量和两个信号量:
int read_count = 0;
semaphore_2 mutex;
semaphore_2 RP_write;
其中read_count表示读者的计数,mutex是对read_count修改的互斥,RP_write是写线程的互斥。
对于写线程,只需要关注RP_write即可,而对于读线程则有些不同。因为读线程之间不互斥,因此有如下机制:
- 当读线程是第一个开始读的线程时,挂起RP_write让写线程不能进入临界区,直到没有读线程在临界区;
- 当读线程是最后一个读的线程时,退出临界区后取消RP_write表示写线程可以工作了。
这样就满足了读者优先的需求。具体的读写进程函数如下:
void* RP_ReaderThread(void* arg)
{
//Reader first
struct timeval nowtime;
thread_arg* self_arg = (thread_arg*)arg;
int id = self_arg->thread_id;
int lasttime = self_arg->thread_lasttime; //以上获取参数信息
gettimeofday(&nowtime,NULL); //获取时间
printf("Id:%d Reader was created at %ld us\n",id,difftime_timeval(prog_start,nowtime)); //这里prog_start是全局变量,表示进程开始的时间
wait(&mutex,id);
read_count++;
if(read_count==1)
wait(&RP_write,id); //进行自加后为1,表示本线程为当前唯一的读线程,阻塞其他写线程
signal(&mutex,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d Reader was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));
//operation
usleep(lasttime);
printf("Id:%d Read :%d\n",id,read_in_quene());
wait(&mutex,id);
read_count--;
if(read_count==0)
signal(&RP_write,id); //再无读线程,可以允许写线程写入了
signal(&mutex,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
pthread_exit(NULL);
//exit
}
void* RP_WriterThread(void* arg)
{
struct timeval nowtime;
thread_arg* self_arg = (thread_arg*)arg;
int id = self_arg->thread_id;
int lasttime = self_arg->thread_lasttime;
gettimeofday(&nowtime,NULL);
printf("Id:%d Writer was created at %ld us\n",id,difftime_timeval(prog_start,nowtime));
wait(&RP_write,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d Writer was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));
//operation
usleep(lasttime);
printf("Id:%d Writer :%d\n",id,write_in_quene());
signal(&RP_write,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
pthread_exit(NULL);
//exit
}
这样即可保证满足需求。整个程序代码如下:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
/*信号量定义*/
typedef struct
{
int head,front; //等待队列的队首和队尾指针,即数组下标
int thread[21]; //等待队列,保存子线程的id
}semaphore_2; //二进制信号量,即同时可执行的线程只能有1个
int read_count = 0;
semaphore_2 mutex;
semaphore_2 RP_write;
void init_sema(semaphore_2 *s)
{
s->head=0;
s->front=0; //初始化指针
}
void wait(semaphore_2 *s,int thread_id) //要控制的信号量和访问线程id
{
s->thread[s->front] = thread_id;
s->front+=1; //将线程id加入等待队列
while(s->thread[s->head]!=thread_id); //等待队首指针指向本线程id,表示可以运行了
}
void signal(semaphore_2 *s,int thread_id)
{
s->head+=1; //本线程释放信号量,允许等待队列里的下一个线程进行
}
/*数据区*/
int quene[30];
int head = 0;
int last = 0;
int read_in_quene()
{
if(head==last)
return -1; //空队列,读取错误
int result=quene[head];
head=(head+1)%30; //队首元素出队
return result; //返回读取结果
}
int write_in_quene()
{
srand(clock());
int num = rand()%100;
quene[last]=num;
last=(last+1)%30; //入队
return num;
}
/*线程参数结构*/
typedef struct
{
int thread_type; //线程类型,0表示读,1表示写
int thread_id; //线程id,表示线程创建的顺序(调用pthread_create的顺序)
int thread_lasttime; //线程读写的持续时间
}thread_arg;
/*时间相关内容*/
struct timeval prog_start; //程序起始时间,需要在main函数中初始化
long difftime_timeval(struct timeval start,struct timeval end) //end时间大于start时间
{
long val_usec = end.tv_usec-start.tv_usec +(end.tv_sec-start.tv_sec)*1000000;
return val_usec; //返回相差的微秒数
}
/*读、写进程函数*/
void* RP_ReaderThread(void* arg)
{
//Reader first
struct timeval nowtime;
thread_arg* self_arg = (thread_arg*)arg;
int id = self_arg->thread_id;
int lasttime = self_arg->thread_lasttime; //以上获取参数信息
gettimeofday(&nowtime,NULL); //获取时间
printf("Id:%d Reader was created at %ld us\n",id,difftime_timeval(prog_start,nowtime)); //这里prog_start是全局变量,表示进程开始的时间
wait(&mutex,id);
read_count++;
if(read_count==1)
wait(&RP_write,id); //进行自加后为1,表示本线程为当前唯一的读线程,阻塞其他写线程
signal(&mutex,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d Reader was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));
//operation
usleep(lasttime);
printf("Id:%d Read :%d\n",id,read_in_quene());
wait(&mutex,id);
read_count--;
if(read_count==0)
signal(&RP_write,id); //再无读线程,可以允许写线程写入了
signal(&mutex,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
pthread_exit(NULL);
//exit
}
void* RP_WriterThread(void* arg)
{
struct timeval nowtime;
thread_arg* self_arg = (thread_arg*)arg;
int id = self_arg->thread_id;
int lasttime = self_arg->thread_lasttime;
gettimeofday(&nowtime,NULL);
printf("Id:%d Writer was created at %ld us\n",id,difftime_timeval(prog_start,nowtime));
wait(&RP_write,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d Writer was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));
//operation
usleep(lasttime);
printf("Id:%d Writer :%d\n",id,write_in_quene());
signal(&RP_write,id);
gettimeofday(&nowtime,NULL);
printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
pthread_exit(NULL);
//exit
}
int main(int argc,char* argv[])
{
FILE* fp = fopen("thread.txt","r");
if(fp==NULL) //读取线程记录文件失败
{
printf("Reading file error!");
fclose(fp);
return 0;
}
if(argc==2 && !strcmp("-log",argv[1])) //如果设定了参数,可以将输出信息重定向到记录文件
{
if(freopen("record.txt","w",stdout)==NULL)
{
printf("Can‘t write into record file!\n");
}
}
thread_arg threadrecord[20];
int threadNum=0; //线程总数
char mode; //线程读写类型
int lasting; //线程持续时间
while(feof(fp)==0&&threadNum<=20)
{
fscanf(fp,"%c %d",&mode,&lasting);
if(mode==‘R‘)
{
threadrecord[threadNum].thread_type=0;
threadrecord[threadNum].thread_id=threadNum+1;
threadrecord[threadNum].thread_lasttime=lasting*1000; //这里是因为记录里是毫秒,而usleep函数的参数单位为微秒
threadNum++;
}
if(mode==‘W‘)
{
threadrecord[threadNum].thread_type=1;
threadrecord[threadNum].thread_id=threadNum+1;
threadrecord[threadNum].thread_lasttime=lasting*1000;
threadNum++;
}
}
fclose(fp);
pthread_t threadId[threadNum]; //这里保存pthread_create需要的参数id
int i;
pthread_attr_t attr; //线程创建属性
pthread_attr_init(&attr);
//pthread_attr_setschedpolicy(&attr,SCHED_FIFO);
//上一行表示创建线程的调度为FIFO,即先入先执行
init_sema(&RP_write);
init_sema(&mutex);
gettimeofday(&prog_start,NULL);
//初始化信号量和起始时间
for(i=0;i<threadNum;i++)
{
if(threadrecord[i].thread_type==0) //读者
{
pthread_create(&threadId[i],&attr,(void*)*RP_ReaderThread,(void*)&threadrecord[i]);
}
else if(threadrecord[i].thread_type==1) //写者
{
pthread_create(&threadId[i],&attr,(void*)*RP_WriterThread,(void*)&threadrecord[i]);
}
}
for(i=0;i<threadNum;i++)
{
pthread_join(threadId[i],NULL);
}
//主线程等待所有子线程完毕后再结束
return 0;
}
最后谨记:在编译时要加入额外参数:-pthread
,链接相关库,否则pthread相关函数都会显示未定义。
4.写者优先方式
待施工。