posix thread互斥量

互斥量

互斥量(Mutex)是“mutual exclusion”的缩写。互斥量是实现线程同步,和保护同时写共享数据的主要方法。
使用互斥量的典型顺序如下:
1. 创建和初始一个互斥量
2. 多个线程尝试去锁定该互斥量
3. 仅有一个线程可以成功锁定改互斥量
4. 锁定成功的线程做一些处理
5. 线程解锁该互斥量
6. 另外一个线程获得互斥量,重复上述过程
7. 最后销毁互斥量

创建和销毁互斥量

pthread_mutex_t_numtex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr);
int pthread_mutex_destory(pthread_mutex_t *mutex);

互斥量必须用类型pthread_mutex_t类型声明,在使用前必须初始化,这里有两种方法可以初始化互斥量:

  1. 声明时静态地,如:pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
  2. 动态地用pthread_mutex_init()函数,这种方法允许设定互斥量的属性对象attr。

互斥量初始化后是解锁的。

attr对象用于设置互斥量对象的属性,使用时必须声明为pthread_mutextattr_t类型,默认值可以是NULL。

Pthreads标准定义了三种可选的互斥量属性:

  1. 协议(Protocol): 指定了协议用于阻止互斥量的优先级改变
  2. 优先级上限(Prioceiling):指定互斥量的优先级上限
  3. 进程共享(Process-shared):指定进程共享互斥量

注意所有实现都提供了这三个可先的互斥量属性。

pthread_mutexattr_init()和pthread_mutexattr_destroy()函数分别用于创建和销毁互斥量属性对象。

pthread_mutex_destroy()应该用于释放不需要再使用的互斥量对象。

将互斥量和它要保护的数据明显的关联起来是个不错的选择。如下例:

 #include<pthread.h>
#include "errors.h" typedef struct my_struct_tag {
pthread_mutex_t mutex;
int value;
} my_struct_t; int main(int argc, char* argv[])
{
my_struct_t *data;
int status; data = malloc(sizeof(my_struct_t));
if( data == NULL )
errno_abort("malloc");
status = pthread_mutex_init(&data->mutex, NULL);
if( status != )
err_abort(status, "init mutex");
status = pthread_mutex_destroy(&data->mutex);
if( status != )
err_abort(status, "destroy mutex");
(void)free(data);
return status; }

mutex_dynamic.c

加锁和解锁互斥量

int pthread_mutex_lock(pthread_mutex_t *mutex);
int phtread_mutex_trylock(pthread_mutex_t *mutex); //非阻塞加锁
int pthrad_mutex_unlock(pthread_mutex_t *mutex);

线程用pthread_mutex_lock()函数去锁定指定的mutex变量,若该mutex已经被另外一个线程锁定了,该调用将会阻塞线程直到mutex被解锁。

pthread_mutex_trylock()尝试着去锁定一个互斥量,然而,若互斥量已被锁定,程序会立刻返回并返回一个忙错误(EBUSY)值。该函数在优先级改变情况下阻止死锁是非常有用的。

线程可以用pthread_mutex_unlock()解锁自己占用的互斥量。在一个线程完成对保护数据的使用,而其它线程要获得互斥量在保护数据上工作时,可以调用该函数。

若有一下情形则会发生错误:

  1. 互斥量已经被解锁
  2. 互斥量被另一个线程占用
 #include<pthread.h>
#include<time.h>
#include "errors.h" typedef struct alarm_tag {
struct alarm_tag *link;
int seconds;
time_t time;
char message[];
} alarm_t; pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER;
alarm_t *alarm_list = NULL; void *alarm_thread(void *arg)
{
alarm_t *alarm;
int sleep_time;
time_t now;
int status; while()
{
status = pthread_mutex_lock(&alarm_mutex);
if( status != )
err_abort(status, "Lock mutex");
alarm = alarm_list;
if( alarm == NULL )
sleep_time = ;
else{
alarm_list = alarm->link;
now = time(NULL);
if( alarm->time <= now)
sleep_time = ;
else
sleep_time = alarm->time - now;
#ifdef DEBUG
printf("[waiting: %d(%d)\"%s\"]\n", alarm->time,
sleep_time, alarm->message);
#endif
}
status = pthread_mutex_unlock(&alarm_mutex);
if(status != )
err_abort(status, "Unlock mutex");
if( sleep_time > )
sleep(sleep_time);
else
sched_yield();
if( alarm != NULL)
{
printf("(%d) %s\n", alarm->seconds, alarm->message);
free(alarm);
} }
} int main(int argc, char *argv[])
{
int status;
char line[];
alarm_t *alarm, **current, *next;
pthread_t thread; status = pthread_create(&thread, NULL, alarm_thread, NULL);
if(status != )
err_abort(status, "Create alarm thread");
while()
{
printf("Alarm> ");
if(fgets(line, sizeof(line), stdin) == NULL ) exit();
if(strlen(line) <= ) continue;
alarm = (alarm_t*)malloc(sizeof(alarm_t));
if(alarm == NULL)
errno_abort("malloc alarm"); if(sscanf(line, "%d %64[^\n]", &alarm->seconds, alarm->message) < )
{
fprintf(stderr, "Bad command\n");
free(alarm);
}
else
{
status = pthread_mutex_lock(&alarm_mutex);
if(status != )
err_abort(status, "mutex lock");
alarm->time = time(NULL) + alarm->seconds; current = &alarm_list;
next = *current;
while(next != NULL)
{
if(next->time >= alarm->time)
{
alarm->link = next;
*current = alarm;
break;
}
current = &next->link;
next = next->link;
}
if(next == NULL)
{
*current = alarm;
alarm->link = NULL;
}
#ifdef DEBUG
printf("[list:");
for(next = alarm_list;next != NULL; next = next->link)
printf("%d(%d)[\"%s\"] ", next->time,
next->time - time(NULL), next->message);
printf("]\n");
#endif
status = pthread_mutex_unlock(&alarm_mutex);
if(status != )
err_abort(status, "Unlock mutex");
}
}
}

alarm_mutex.c

在试锁和回退算法中,总是应该以相反的顺序解锁互斥量:

  • 尝试加锁互斥量1;如果成功,再加锁互斥量2;如果成功,再加锁互斥量3。如果某一个互斥量加锁失败,则全部回退。
  • 解锁互斥量3/2/1

按照相反顺序解锁,如果第二个线程需要加锁这三个互斥量,则会由于加锁互斥量1失败而回退;而如果先解锁1-2-3这样的顺序,可能会到加锁互斥量3时候才失败,回退代价更大。

 #include<pthread.h>
#include "errors.h" #define ITERATIONS 10 pthread_mutex_t mutex[] = {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_MUTEX_INITIALIZER
}; int backoff = ;
int yield_flag = ; void * lock_forward(void* arg)
{
int i, iterate, backoffs, status;
for( iterate = ; iterate < ITERATIONS; iterate++ ){
backoffs = ;
for( i=; i< ; i++ ){
if( i == ){
status = pthread_mutex_lock(&mutex[i]);
if(status != )
err_abort(status, "First lock");
}else{
if(backoff)
status = pthread_mutex_trylock(&mutex[i]);
else
status = pthread_mutex_lock(&mutex[i]);
if( status == EBUSY) {
backoffs++;
DPRINTF((
" [forward locker"
"backing off at %d]\n",
i));
for(; i>= ;i--) {
status = pthread_mutex_unlock(&mutex[i]);
if(status != )
err_abort(status, "Backoff");
}
}else {
if( status != )
err_abort(status, "Lock mutex");
DPRINTF((" forward locker got %d \n", i));
}
}
if(yield_flag){
if(yield_flag > )
sched_yield();
else
sleep();
}
}
printf("lock forward got all locks, %d backoffs\n", backoffs);
pthread_mutex_unlock(&mutex[]);
pthread_mutex_unlock(&mutex[]);
pthread_mutex_unlock(&mutex[]);
sched_yield();
}
return NULL;
} void *lock_backward(void *arg)
{
int i, iterate, backoffs;
int status; for ( iterate = ; iterate < ITERATIONS; iterate++ ) {
backoffs = ;
for ( i = ; i >= ; i-- ) {
if (i == ) {
status = pthread_mutex_lock (&mutex[i]);
if (status != )
err_abort(status, "First lock");
} else {
if (backoff)
status = pthread_mutex_trylock(&mutex[i]);
else
status = pthread_mutex_lock(&mutex[i]);
if (status == EBUSY ) {
backoffs++;
DPRINTF(("[backward locker backing off at %d]\n",i));
for (; i < ; i++) {
status = pthread_mutex_unlock(&mutex[i]);
if (status != )
err_abort(status, "Backoff");
}
} else {
if (status != )
err_abort(status, "Lock mutex");
DPRINTF(("backward locker got %d\n", i));
}
}
if (yield_flag) {
if (yield_flag > )
sched_yield();
else
sleep();
}
}
printf("Lock backward got all locks, %d backoffs\n", backoffs);
pthread_mutex_unlock(&mutex[]);
pthread_mutex_unlock(&mutex[]);
pthread_mutex_unlock(&mutex[]);
sched_yield();
}
return NULL;
} int main(int argc, char* argv[])
{
pthread_t forward, backward;
int status; if (argc > )
backoff = atoi(argv[]);
if (argc > )
yield_flag = atoi(argv[]);
status = pthread_create(&forward, NULL, lock_forward, NULL);
if (status != )
err_abort(status, "Create forward");
status = pthread_create(&backward, NULL, lock_backward, NULL);
if (status != )
err_abort(status, "Create backward");
pthread_exit(NULL); }

backoff.c

  

上一篇:Java中ArrayDeque,栈与队列


下一篇:[NOIP2004] 提高组 洛谷P1089 津津的储蓄计划