学习pthreads,使用条件变量进行多线程之间的同步

条件变量提供另一种多线程同步的方法。互斥量通过控制对共享数据的访问来同步任务。条件变量可以根据数据的值来同步任务。条件变量是当一个事件发生时发送信号的信号量。一旦事件发生,可能会有多个线程在等待信号,条件变量通常用于对操作的顺序进行同步。使用条件变量对多线程进行同步时,条件变量和互斥量得同时使用。知道这些大概知识,本文将重点探讨怎么使用条件变量进行同步,结构分为三个部分,第一部分给出代码示例,第二部分对代码进行讲解,第三部分给出运行结果。

一、代码示例

#include "StdAfx.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <Windows.h>

#define NUM_THREADS  3
#define TCOUNT 10
#define COUNT_LIMIT 12

int     count = 0;
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;

void *inc_count(void *t)
{
  int i;
  long my_id = (long)t;

  for (i=0; i < TCOUNT; i++) {
    pthread_mutex_lock(&count_mutex);
    count++;

    /*
    Check the value of count and signal waiting thread when condition is
    reached.  Note that this occurs while mutex is locked.
    */
    if (count == COUNT_LIMIT) {
      printf("inc_count(): thread %ld, count = %d  Threshold reached. ",
             my_id, count);
      pthread_cond_signal(&count_threshold_cv);
      printf("Just sent signal.\n");
      }
    printf("inc_count(): thread %ld, count = %d, unlocking mutex\n",
	   my_id, count);
    pthread_mutex_unlock(&count_mutex);

    /* Do some work so threads can alternate on mutex lock */
    Sleep(10);
    }
  pthread_exit(NULL);
  return(NULL);
}

void *watch_count(void *t)
{
  long my_id = (long)t;

  printf("Starting watch_count(): thread %ld\n", my_id);

  /*
  Lock mutex and wait for signal.  Note that the pthread_cond_wait routine
  will automatically and atomically unlock mutex while it waits.
  Also, note that if COUNT_LIMIT is reached before this routine is run by
  the waiting thread, the loop will be skipped to prevent pthread_cond_wait
  from never returning.
  */
  pthread_mutex_lock(&count_mutex);
  while (count < COUNT_LIMIT) {
    printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id,count);
    pthread_cond_wait(&count_threshold_cv, &count_mutex);
    printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id,count);
    printf("watch_count(): thread %ld Updating the value of count...\n", my_id,count);
    count += 125;
    printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
    }
  printf("watch_count(): thread %ld Unlocking mutex.\n", my_id);
  pthread_mutex_unlock(&count_mutex);
  pthread_exit(NULL);
  return(NULL);
}

int main(int argc, char *argv[])
{
  int i, rc;
  long t1=1, t2=2, t3=3;
  pthread_t threads[3];
  pthread_attr_t attr;

  /* Initialize mutex and condition variable objects */
  pthread_mutex_init(&count_mutex, NULL);
  pthread_cond_init (&count_threshold_cv, NULL);

  /* For portability, explicitly create threads in a joinable state */
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  pthread_create(&threads[0], &attr, watch_count, (void *)t1);
  pthread_create(&threads[1], &attr, inc_count, (void *)t2);
  pthread_create(&threads[2], &attr, inc_count, (void *)t3);

  /* Wait for all threads to complete */
  for (i = 0; i < NUM_THREADS; i++) {
    pthread_join(threads[i], NULL);
  }
  printf ("Main(): Waited and joined with %d threads. Final value of count = %d. Done.\n",
          NUM_THREADS, count);

  /* Clean up and exit */
  pthread_attr_destroy(&attr);
  pthread_mutex_destroy(&count_mutex);
  pthread_cond_destroy(&count_threshold_cv);
  pthread_exit (NULL);

}

二、代码讲解

int     count = 0;
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;

定义全局变量计数器,互斥量和条件变量

void *inc_count(void *t)
{
  int i;
  long my_id = (long)t;

  for (i=0; i < TCOUNT; i++) {
    pthread_mutex_lock(&count_mutex);
    count++;

    if (count == COUNT_LIMIT) {
      printf("inc_count(): thread %ld, count = %d  Threshold reached. ",
             my_id, count);
      pthread_cond_signal(&count_threshold_cv);
      printf("Just sent signal.\n");
      }
    printf("inc_count(): thread %ld, count = %d, unlocking mutex\n",
	   my_id, count);
    pthread_mutex_unlock(&count_mutex);

    Sleep(10);
    }
  pthread_exit(NULL);
  return(NULL);
}

inc_count()函数是线程2和3执行的任务,每次for循环调用pthread_mutex_lock锁住互斥量,从而对全局变量进行计数,当计数达到一定条件,调用pthread_cond_signal函数发送条件变量,然后对互斥量进行解锁,为了另一个线程有充足时间锁住互斥量,让本线程休眠一段时间,最后退出线程。

void *watch_count(void *t)
{
  long my_id = (long)t;

  printf("Starting watch_count(): thread %ld\n", my_id);

  pthread_mutex_lock(&count_mutex);
  while (count < COUNT_LIMIT) {
    printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id,count);
    pthread_cond_wait(&count_threshold_cv, &count_mutex);
    printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id,count);
    printf("watch_count(): thread %ld Updating the value of count...\n", my_id,count);
    count += 125;
    printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
    }
  printf("watch_count(): thread %ld Unlocking mutex.\n", my_id);
  pthread_mutex_unlock(&count_mutex);
  pthread_exit(NULL);
  return(NULL);
}

watch_count函数是线程1执行的函数,该线程主要是完成接受发送的条件变量。首先,调用pthread_mutex_lock函数锁住互斥量我;为了使等待时间在有限范围内,使用满足条件的while循环,因为假如count大于条件,那么等待的时间将是无穷无尽的;调用pthread_cond_wait函数接受条件变量,直到接受到条件变量,才执行下一步程序;接受完条件变量后,对互斥量进行解锁,最后退出线程。

 int i, rc;
 long t1=1, t2=2, t3=3;
 pthread_t threads[3];
 pthread_attr_t attr;

 pthread_mutex_init(&count_mutex, NULL);
 pthread_cond_init (&count_threshold_cv, NULL);

定义程序所需变量,pthread_t变量以及属性对象attr;初始化全局互斥量,初始化全局条件变量。

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  pthread_create(&threads[0], &attr, watch_count, (void *)t1);
  pthread_create(&threads[1], &attr, inc_count, (void *)t2);
  pthread_create(&threads[2], &attr, inc_count, (void *)t3);

  for (i = 0; i < NUM_THREADS; i++) {
    pthread_join(threads[i], NULL);
  }
  printf ("Main(): Waited and joined with %d threads. Final value of count = %d. Done.\n",
          NUM_THREADS, count);

初始化属性对象,并将属性对象设置为可结合的,然后使用该属性对象创建三个线程,分别执行watch_count和inc_count函数,最后结合所有的线程。

  pthread_attr_destroy(&attr);
  pthread_mutex_destroy(&count_mutex);
  pthread_cond_destroy(&count_threshold_cv);
  pthread_exit (NULL);

销毁属性对象、互斥量和条件变量,退出线程。

三、运行结果

学习pthreads,使用条件变量进行多线程之间的同步

上一篇:Java基础-零拷贝技术应用案例


下一篇:java缓存技术(转)