【操作系统实验】进程同步控制:基于信号量控制的读-写问题

这是笔者的操作系统实验作业,在完成过程中遇到了很多问题,也收获了很多知识和教训,在此分享自己的见解。

1.实验要求

利用信号量机制,实现读者写者问题的解决方案。

在Linux 环境下,创建一个控制台进程,此进程包含 n 个线程。用这 n 个线程来表示 n 个读者或写者。
每个线程按相应测试数据文件 (后面有介绍)的要求进行读写操作。用信号量机制分别实现读者优先和写者优先的读者-写者问题。

具体的读-写要求:
读者-写者问题的读写操作限制(包括读者优先和写者优先) :

  • 写-写互斥,即不能有两个写者同时进行写操作。
  • 读-写互斥,即不能同时有一个线程在读,而另一个线程在写。
  • 读-读允许,即可以有一个或多个读者在读。

读者优先的附加限制:

  • 如果一个读者申请进行读操作时已有另一个读者正在进行读操作,则该读者可直接开始读操作。

写者优先的附加限制:

  • 如果一个读者申请进行读操作时已有另一写者在等待访问共享资源,则该读者必须等到没有写者处于等待状态才能开始读操作。

具体的实现要求

  • 数据文件:
    要求程序从指定文件中读取具体的读写请求,具体如下:
R 100  //表示读者线程,持续时间100ms
W 200  //表示写者线程,持续时间200ms

//注:这里的要求是我自己设置的,具体形式如何非常轻松

  • 临界区实现
    可以定义一个环形队列,读者每次从队首读取一个数据(并拿出),写者每次从队尾写入一个数据。

现在来解析一下代码实现。

2.数据结构设计

1.信号量设计

typedef struct
{
    int head,front;  //等待队列的队首和队尾指针,即数组下标
    int thread[21];  //等待队列,保存子线程的id
}semaphore_2;        //二进制信号量,即同时可执行的线程只能有1个

void init_sema(semaphore_2 *s)
{
    s->head=0;
    s->front=0;     //初始化指针
}

void wait(semaphore_2 *s,int thread_id)   //要控制的信号量和访问线程id
{
    s->thread[s->front] = thread_id;
    s->front+=1;                          //将线程id加入等待队列
    while(s->thread[s->head]!=thread_id); //等待队首指针指向本线程id,表示可以运行了
}

void signal(semaphore_2 *s,int thread_id)
{
    s->head+=1; //本线程释放信号量,允许等待队列里的下一个线程进行
}

考虑到性能限制以及要求不高,我设置了子线程的创建上限,因此等待队列不需要是循环的,大于子线程上限即可。

2.临界区内数据读取

int quene[30];
int head = 0;
int last = 0;

int read_in_quene()
{
    if(head==last)
        return -1; //空队列,读取错误
    int result=quene[head];
    head=(head+1)%30; //队首元素出队
    return result; //返回读取结果
}

int write_in_quene()
{
    srand(clock());
    int num = rand()%100;
    quene[last]=num;
    last=(last+1)%30;  //入队
    return num;
}

这里将数据队列设为全局变量,每次写入时写入一个随机数。需要注意用srand函数控制种子,避免每次写入同一个随机数。

3.时间相关函数

这里算是一个小工具了,实验要求展示每次在创建线程成功、线程进入临界区、退出临界区时写出当前时间。最开始我是用clock()来读取时间的,但是后来仔细看才知道它返回的是线程在CPU中工作的时钟数,如果用sleep或者usleep函数阻塞线程,这段时间不会计入clock的,因此改用了gettimeofdat()函数,用它可以返回精确到微秒级别的时间,函数定义如下:

#include<sys/time.h>

int gettimeofday(struct  timeval*tv,struct  timezone *tz )

其中第一个参数是当前时间,第二个参数是当前时区信息,我们不使用它,直接传NULL即可。struct timeval定义如下:

struct  timeval{
       long  tv_sec;  //秒数
       long  tv_usec; //微秒数
};

这里两个数并不是相同的意思,具体的时间应该是tv_sec秒+tv_usec微秒,因此不能简单的相减获得时间,为此设计了相减的函数:

long difftime_timeval(struct timeval start,struct timeval end)  //end时间大于start时间
{
    long val_usec = end.tv_usec-start.tv_usec +(end.tv_sec-start.tv_sec)*1000000;

    return val_usec; //返回相差的微秒数
}

然后设置一个全局变量prog_start表示程序运行开始的时间,然后每次调用difftime_timeval,传入prog_start和得到的当前时间就可以获得时间差了,这个时间差可以理解为自程序运行起经过的时间。

4.控制线程

控制线程需要用到pthread.h的相关函数。我们需要使用

int pthread_create(pthread_t *newthread, const pthread_attr_t *attr, void *(*__start_routine)(void *), void *arg)

函数,这里第一个参数为线程id,创建完毕后会修改为对应得值,第二个参数为创建线程的模式,也可以传为NULL表示默认方式。
注意第三个参数,这里要求我们传入子线程执行的函数,即子线程执行的内容需要用函数包装起来,这个函数必须为void类型,参数只能有一个参数且必须为void类型。第四个参数即为要传入的参数。如果你的子线程没有参数要传,可以传NULL。

在这里我想传入线程的id和读写的持续时间(其实就是等待时间),因此包装一个struct来让一个指针传入这些参数:

typedef struct
{
    int thread_type;      //线程类型,0表示读,1表示写
    int thread_id;        //线程id,表示线程创建的顺序(调用pthread_create的顺序)
    int thread_lasttime;  //线程读写的持续时间
}thread_arg;

这样在线程内部解析*arg即可得到参数。

3.读者优先方式

所谓读者优先,就是在等待进入临界区的线程里,优先考虑读线程,由于多个线程读是不会产生排斥的(一般来说,读取时不能修改数据,但本实验要求读者拿走队首的数据,可以认为这个操作很快,不会发生冲突),因此只要有读线程在读,后来的读线程就可以跳过前面的写线程来进行。

这里设计了一个计数量和两个信号量:

int read_count = 0;
semaphore_2 mutex;
semaphore_2 RP_write;

其中read_count表示读者的计数,mutex是对read_count修改的互斥,RP_write是写线程的互斥。

对于写线程,只需要关注RP_write即可,而对于读线程则有些不同。因为读线程之间不互斥,因此有如下机制:

  • 当读线程是第一个开始读的线程时,挂起RP_write让写线程不能进入临界区,直到没有读线程在临界区;
  • 当读线程是最后一个读的线程时,退出临界区后取消RP_write表示写线程可以工作了。

这样就满足了读者优先的需求。具体的读写进程函数如下:

void* RP_ReaderThread(void* arg)
{
    //Reader first
    struct timeval nowtime;
    thread_arg* self_arg = (thread_arg*)arg;
    int id = self_arg->thread_id;
    int lasttime = self_arg->thread_lasttime;  //以上获取参数信息
    gettimeofday(&nowtime,NULL); //获取时间
    printf("Id:%d Reader was created at %ld us\n",id,difftime_timeval(prog_start,nowtime)); //这里prog_start是全局变量,表示进程开始的时间
    wait(&mutex,id);
    read_count++;
    if(read_count==1)
        wait(&RP_write,id);  //进行自加后为1,表示本线程为当前唯一的读线程,阻塞其他写线程
    signal(&mutex,id);
    
    gettimeofday(&nowtime,NULL);
    printf("Id:%d Reader was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    //operation
    usleep(lasttime);
    printf("Id:%d Read :%d\n",id,read_in_quene());

    wait(&mutex,id);
    read_count--;
    if(read_count==0)
        signal(&RP_write,id);  //再无读线程,可以允许写线程写入了
    signal(&mutex,id);

    gettimeofday(&nowtime,NULL);
    printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    pthread_exit(NULL);
    //exit
}

void* RP_WriterThread(void* arg)
{
    struct timeval nowtime;
    thread_arg* self_arg = (thread_arg*)arg;
    int id = self_arg->thread_id;
    int lasttime = self_arg->thread_lasttime;
    gettimeofday(&nowtime,NULL);
    printf("Id:%d Writer was created at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    
    wait(&RP_write,id);
    
    gettimeofday(&nowtime,NULL);
    printf("Id:%d Writer was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));

    //operation
    usleep(lasttime);
    printf("Id:%d Writer :%d\n",id,write_in_quene());
    signal(&RP_write,id);

    gettimeofday(&nowtime,NULL);
    printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    pthread_exit(NULL);
    //exit
}

这样即可保证满足需求。整个程序代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
#include <string.h>

/*信号量定义*/
typedef struct
{
    int head,front;  //等待队列的队首和队尾指针,即数组下标
    int thread[21];  //等待队列,保存子线程的id
}semaphore_2;        //二进制信号量,即同时可执行的线程只能有1个

int read_count = 0;
semaphore_2 mutex;
semaphore_2 RP_write;

void init_sema(semaphore_2 *s)
{
    s->head=0;
    s->front=0;     //初始化指针
}

void wait(semaphore_2 *s,int thread_id)   //要控制的信号量和访问线程id
{
    s->thread[s->front] = thread_id;
    s->front+=1;                          //将线程id加入等待队列
    while(s->thread[s->head]!=thread_id); //等待队首指针指向本线程id,表示可以运行了
}

void signal(semaphore_2 *s,int thread_id)
{
    s->head+=1; //本线程释放信号量,允许等待队列里的下一个线程进行
}

/*数据区*/
int quene[30];
int head = 0;
int last = 0;

int read_in_quene()
{
    if(head==last)
        return -1; //空队列,读取错误
    int result=quene[head];
    head=(head+1)%30; //队首元素出队
    return result; //返回读取结果
}

int write_in_quene()
{
    srand(clock());
    int num = rand()%100;
    quene[last]=num;
    last=(last+1)%30;  //入队
    return num;
}

/*线程参数结构*/
typedef struct
{
    int thread_type;      //线程类型,0表示读,1表示写
    int thread_id;        //线程id,表示线程创建的顺序(调用pthread_create的顺序)
    int thread_lasttime;  //线程读写的持续时间
}thread_arg;

/*时间相关内容*/
struct timeval prog_start;  //程序起始时间,需要在main函数中初始化

long difftime_timeval(struct timeval start,struct timeval end)  //end时间大于start时间
{
    long val_usec = end.tv_usec-start.tv_usec +(end.tv_sec-start.tv_sec)*1000000;

    return val_usec; //返回相差的微秒数
}

/*读、写进程函数*/

void* RP_ReaderThread(void* arg)
{
    //Reader first
    struct timeval nowtime;
    thread_arg* self_arg = (thread_arg*)arg;
    int id = self_arg->thread_id;
    int lasttime = self_arg->thread_lasttime;  //以上获取参数信息
    gettimeofday(&nowtime,NULL); //获取时间
    printf("Id:%d Reader was created at %ld us\n",id,difftime_timeval(prog_start,nowtime)); //这里prog_start是全局变量,表示进程开始的时间
    wait(&mutex,id);
    read_count++;
    if(read_count==1)
        wait(&RP_write,id);  //进行自加后为1,表示本线程为当前唯一的读线程,阻塞其他写线程
    signal(&mutex,id);
    
    gettimeofday(&nowtime,NULL);
    printf("Id:%d Reader was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    //operation
    usleep(lasttime);
    printf("Id:%d Read :%d\n",id,read_in_quene());

    wait(&mutex,id);
    read_count--;
    if(read_count==0)
        signal(&RP_write,id);  //再无读线程,可以允许写线程写入了
    signal(&mutex,id);

    gettimeofday(&nowtime,NULL);
    printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    pthread_exit(NULL);
    //exit
}

void* RP_WriterThread(void* arg)
{
    struct timeval nowtime;
    thread_arg* self_arg = (thread_arg*)arg;
    int id = self_arg->thread_id;
    int lasttime = self_arg->thread_lasttime;
    gettimeofday(&nowtime,NULL);
    printf("Id:%d Writer was created at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    
    wait(&RP_write,id);
    
    gettimeofday(&nowtime,NULL);
    printf("Id:%d Writer was ready at %ld us\n",id,difftime_timeval(prog_start,nowtime));

    //operation
    usleep(lasttime);
    printf("Id:%d Writer :%d\n",id,write_in_quene());
    signal(&RP_write,id);

    gettimeofday(&nowtime,NULL);
    printf("Id:%d exit the critical section at %ld us\n",id,difftime_timeval(prog_start,nowtime));
    pthread_exit(NULL);
    //exit
}

int main(int argc,char* argv[])
{
    FILE* fp = fopen("thread.txt","r");

    if(fp==NULL)  //读取线程记录文件失败
    {
        printf("Reading file error!");
        fclose(fp);
        return 0;
    }

    if(argc==2 && !strcmp("-log",argv[1])) //如果设定了参数,可以将输出信息重定向到记录文件
    {
        if(freopen("record.txt","w",stdout)==NULL)
        {
            printf("Can‘t write into record file!\n");
        }
    }

    thread_arg threadrecord[20];
    int threadNum=0;  //线程总数
    char mode;        //线程读写类型
    int lasting;      //线程持续时间
    while(feof(fp)==0&&threadNum<=20)
    {
        fscanf(fp,"%c %d",&mode,&lasting);
        if(mode==‘R‘)
        {
            threadrecord[threadNum].thread_type=0;
            threadrecord[threadNum].thread_id=threadNum+1;
            threadrecord[threadNum].thread_lasttime=lasting*1000; //这里是因为记录里是毫秒,而usleep函数的参数单位为微秒
            threadNum++;
        }
        if(mode==‘W‘)
        {
            threadrecord[threadNum].thread_type=1;
            threadrecord[threadNum].thread_id=threadNum+1;
            threadrecord[threadNum].thread_lasttime=lasting*1000;
            threadNum++;
        }
    }
    fclose(fp);
    pthread_t threadId[threadNum]; //这里保存pthread_create需要的参数id
    int i;
    pthread_attr_t attr;   //线程创建属性
    pthread_attr_init(&attr);
    //pthread_attr_setschedpolicy(&attr,SCHED_FIFO);
    //上一行表示创建线程的调度为FIFO,即先入先执行

    init_sema(&RP_write);
    init_sema(&mutex);
    gettimeofday(&prog_start,NULL);
    //初始化信号量和起始时间

    for(i=0;i<threadNum;i++)
    {
        if(threadrecord[i].thread_type==0)  //读者
        {
            pthread_create(&threadId[i],&attr,(void*)*RP_ReaderThread,(void*)&threadrecord[i]);
        }
        else if(threadrecord[i].thread_type==1) //写者
        {
            pthread_create(&threadId[i],&attr,(void*)*RP_WriterThread,(void*)&threadrecord[i]);
        }
    }

    for(i=0;i<threadNum;i++)
    {
        pthread_join(threadId[i],NULL);
    }
    //主线程等待所有子线程完毕后再结束
    return 0;
}

最后谨记:在编译时要加入额外参数:-pthread,链接相关库,否则pthread相关函数都会显示未定义。

4.写者优先方式

待施工。

【操作系统实验】进程同步控制:基于信号量控制的读-写问题

上一篇:Linux网络编程


下一篇:Shell与文本处理