第四章 并发编程
1.摘要
- 本章论述了并发编程,介绍了并行计算的概念,指出了并行计算的重要性;比较了顺序算法与并行算法,以及并行性与并发性;解释了线程的原理及其相对于进程的优势;介绍了Pthread 中的线程操作,包括线程管理函数,互斥量、连接、条件变量和屏障等线程同步工具;演示了如何使用线程进行并发编程;解释了死锁问题,并说明了如何防止并发程序中的死锁问题;讨论了信号量,并论证了它们相对于条件变量的优点;解释了支持Linux中线程的独特方式。
2.线程
2.1线程优点
- 线程创建和切换速度更快:若要在某个进程中创建线程,操作系统不必为新的线程分配内存和创建页表,因为线程与进程共用同一个地址空间。所以,创建线程比创建进程更快。
- 线程的响应速度更快:一个进程只有一个执行路径。当某个进程被挂起时,帮个进程都将停止执行。相反,当某个线程被挂起时,同一进程中的其他线程可以继续执行。
- 线程更适合井行计算:并行计算的目标是使用多个执行路径更快地解决间题。基于分治原则(如二叉树查找和快速排序等)的算法经常表现出高度的并行性,可通过使用并行或并发执行来提高计算速度。
2.2线程缺点
- 由于地址空间共享,线程需要来自用户的明确同步。
- 许多库函数可能对线程不安全
- 在单CPU系统上,使用线程解决间题实际上要比使用顺序程序慢,这是由在运行时创建线程和切换上下文的系统开销造成的。
2.3线程操作
- 线程的执行轨迹与进程类似。线程可在内核模式或用户模式下执行。在用户模式下,线程在进程的相同地址空间中执行,但每个线程都有自己的执行堆栈。线程是独立的执行单元,可根据操作系统内核 的调度策略,对内核进行系统调用,变为桂起激活以继续执行等。为了利用线程的共享地址空间,操作系统内核的调度策略可能会优先选择同一进程中的线程,而不是不同进程中的线程。
2.4Pthread并发编程
- Pthread库提供了用于线程管理的以下APT:
pthread_create(thread, attr, function, arg): create thread
pthread_exit(status):terminate thread
pthread_cancel(thread) : cancel thread
pthread_attr_init(attr) : initialize thread attributes
pthread_attr_destroy(attr): destroy thread attribute
2.5线程操作
- 创建线程:使用pthread_create()函数创建线程。
int pthread_create (pthread_t *pthread_id,pthread_attr_t•attr,void * (*func) (void *), void *arg);
其中,attr最复杂,其使用步骤为:- 1.定义一个pthread展性变址pt:hread_attr_tattr。
- 2.用pthread_attr_init(&attr)初始化屈性变掀。
- 3.设置属性变垃并在pthread_ create()调用中使用。
- 4.必要时,通过pthread_attr_destroy(&attr)释放attr资源。
- 线程终止:线程函数结束后,线程即终止,或者,线程可以调用函数
int pthraad_exit {void *status)
- 线程连接:一个线程可以等待另一个线程的终止, 通过:
int pthread_join (pthread_t thread, void **status__ptr);
终止线程的退出状态以status_ptr返回。
3.线程同步
- 当多个线程试图修改同一共享变量或数据结构时,如果修改结果取决于线程的执行顺序,则称之为竞态条件
- 互斥量:在 Pthread中,锁被称为互斥量,意思是相互排斥。互斥变呈是用 ptbread_mutex_t 类型声明的在使,用之前必须对它们进行初始化。有两种方法可以初始化互斥址:
静态方法:pthreaa—mutex_t m = PTHREAD_MUTEX_INITIALIZER;
定义互斥量 m, 并使用默认属性对其进行初始化。
动态方法:使用 pthread_ mutex _init()
函数
线程通过互斥量来保护共享数据对象
4.死锁预防
-
预防死锁
通过破坏产生死锁的四个必要条件来预防死锁,
但因互斥条件是必须的,所以不能破坏该条件。 -
预防死锁-破坏“请求和保持”条件
为了破坏该条件,OS需要保证:
当进程请求资源时,不可持有不可抢占资源。
可通过两个协议实现:
- 第一种协议
进程在开始运行前,一次性申请整个运行过程中所需的全部资源。
缺点:
资源严重浪费。
进程进程出现饥饿现象。
- 第二种协议
进程在开始运行前,仅请求初期所需资源,运行时逐步释放不再需要的资源,并请求新的所需资源。
- 预防死锁-破坏“不可抢占”条件
保持了不可抢占资源的进程,提出新资源申请,但不被满足时,释放所有已得到的资源。
实现复杂,且释放已有资源很可能付出很大代价。
预防死锁-破坏“循环等待”条件
对系统所有资源类型进行排序,并赋予不同序号,进程在请求资源时,必须按序号递增顺序请求资源。如果一个拥有高序号资源的进程,请求低序号资源,则需要先释放高序号资源,再请求低序号资源。
资源利用率与吞吐量相较前两策略改善不少。
缺点:
新设备的增加被限制。
资源标号困难。
限制用户编程思路。
- 避免死锁
在资源动态分配过程中,防止系统进入不安全状态,限制弱,但成本低。
避免死锁-系统安全状态
死锁避免中,系统状态被划分成安全状态与不安全状态,处于不安全状态时,系统可能进入死锁。
允许进程动态申请资源,但OS进行分配资源前,应先评估资源分配安全性,仅安全状态下可分配。
- 银行家算法的数据结构
Available[]:可利用资源向量,包含m个元素的数组,表示系统中该类资源的数目。
Max[][]:nm矩阵,n个进程对m个资源的最大需求。
Allocation[][]:nm矩阵,n个进程已获得m个资源的数量。
Need[][]:n*m矩阵,n个进程所需的m个资源的数量。
关系:Need[i, j] = Max[i, j] - Allocation[i, j]
书本中的Need等二维向量,仅用一对[],但为了区分,变量名中我使用两对[]。
- 银行家算法
Request是进程p的请求向量,
Request[j]=k表示当前p进程对j资源需要k个数量。
如无特别说明,带序号的步骤为顺序步骤。
Request[][]<=Need[][],如果大于,则认为出错,因为超过了它原先宣布的最大值。
Request[][]<=Available[],如果大于,则等待。
将数据结构中各数据修改成分配资源后的值。
执行安全性算法,若安全,则分配数据,若不安全,则本次分配作废,进程等待。
- 安全性算法
设置两个临时变量Work[]=Available[]、Finish[]=false,分别表示当前可用资源数目、是否已经安全分配资源。
从进程集合中找到一进程i满足:Finish[i]false,并且Need[i, j]<Work[j]的进程,若找到,进入3,否则进入4.
设置Work[j]+=Available[i, j],Finish[i]=true,即分配完资源给进程,进程结束后释放资源,Work增加,安全分配,Finish置真。
步骤3结束后,跳回步骤2.
所有进程Finish[]true,是则表示处于安全状态,不是则不安全。
5.实践
1. 用并发线程快速排序
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
typedef struct{
int upperbound;
int lowerbound;
}PARM;
#define N 10
int a[N]={5,1,6,4,7,2,9,8,0,3};// unsorted data
int print(){//print current a[] contents
int i;
printf("[");
for(i=0;i<N;i++)
printf("%d ",a[i]);
printf("]\n");
}
void *Qsort(void *aptr){
PARM *ap, aleft, aright;
int pivot, pivotIndex,left, right,temp;
int upperbound,lowerbound;
pthread_t me,leftThread,rightThread;
me = pthread_self();
ap =(PARM *)aptr;
upperbound = ap->upperbound;
lowerbound = ap->lowerbound;
pivot = a[upperbound];//pick low pivot value
left = lowerbound - 1;//scan index from left side
right = upperbound;//scan index from right side
if(lowerbound >= upperbound)
pthread_exit (NULL);
while(left < right){//partition loop
do{left++;} while (a[left] < pivot);
do{right--;}while(a[right]>pivot);
if (left < right ) {
temp = a[left];a[left]=a[right];a[right] = temp;
}
}
print();
pivotIndex = left;//put pivot back
temp = a[pivotIndex] ;
a[pivotIndex] = pivot;
a[upperbound] = temp;
//start the "recursive threads"
aleft.upperbound = pivotIndex - 1;
aleft.lowerbound = lowerbound;
aright.upperbound = upperbound;
aright.lowerbound = pivotIndex + 1;
printf("%lu: create left and right threadsln", me) ;
pthread_create(&leftThread,NULL,Qsort,(void * )&aleft);
pthread_create(&rightThread,NULL,Qsort,(void *)&aright);
//wait for left and right threads to finish
pthread_join(leftThread,NULL);
pthread_join(rightThread, NULL);
printf("%lu: joined with left & right threads\n",me);
}
int main(int argc, char *argv[]){
PARM arg;
int i, *array;
pthread_t me,thread;
me = pthread_self( );
printf("main %lu: unsorted array = ", me);
print( ) ;
arg.upperbound = N-1;
arg. lowerbound = 0 ;
printf("main %lu create a thread to do QS\n" , me);
pthread_create(&thread,NULL,Qsort,(void * ) &arg);//wait for Qs thread to finish
pthread_join(thread,NULL);
printf ("main %lu sorted array = ", me);
print () ;
}
2.消费者-消费者问题
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NBUF 5
#define N 10
//shared g1obal variab1es
int buf [NBUF];//circular buffers
int head, tail;//indices
int data;//number of full buffers
pthread_mutex_t mutex;//mutex lock
pthread_cond_t empty,full;//condition variables
int init(){
head = tail = data = 0;
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&full,NULL);
pthread_cond_init(&empty,NULL);
}
void *producer (){
int i;
pthread_t me = pthread_self() ;
for (i=0; i<N; i++){ //try to put N items into buf[ ]
pthread_mutex_lock(&mutex);//lock mutex
if(data == NBUF) {
printf("producer %lu: all bufs FULL: wait\n",me);
pthread_cond_wait(&empty, &mutex);//wait
}
buf[head++] = i+1;//item = 1,2锛?.锛孨
head %=NBUF;//circular bufs
data++;//inc data by 1
printf("producer %lu: data=%d value=%d\n",me,data,i+1);
pthread_mutex_unlock(&mutex);//unlock mutex
pthread_cond_signal(&full);//unblock a consumer锛?if any
}
printf("producer %lu: exit \n",me);
}
void *consumer(){
int i, c;
pthread_t me = pthread_self();
for(i=0;i<N;i++){
pthread_mutex_lock(&mutex);//1ock mutex
if(data == 0){
printf ("consumer %lu: all bufs EMPTY : wait\n",me);
pthread_cond_wait(&full,&mutex);//wait
}
c=buf[tail++];//get an item
tail%=NBUF;
data--;//dec data by 1
printf("consumer %lu: value=%d\n",me,c);
pthread_mutex_unlock(&mutex);//unlock mutex
pthread_cond_signal(&empty);//unblock a producer锛宨f any
}
printf("consumer %lu: exit\n",me);
}
int main(){
pthread_t pro, con;
init();
printf("main: create producer and consumer threads \n");
pthread_create(&pro,NULL, producer,NULL);
pthread_create (&con,NULL,consumer,NULL);
printf("main: join with threads\n");
pthread_join(pro,NULL);
pthread_join(con,NULL);
printf("main: exit\n");
}