线程源码分析之条件变量(基于linuxthreads2.0.1)

条件变量是线程间同步的一种机制,本文分析条件变量的实现和使用。我们先看一下条件变量的定义。

typedef struct
{
  int c_spinlock;                  /* Spin lock to protect the queue.  */
  struct _pthread_queue c_waiting; /* Threads waiting on this condition.  */
} pthread_cond_t;

我们看到条件变量的定义很简单,条件变量通常配合互斥变量一起使用,大致流程如下

加锁
if (条件不满足) {
	阻塞在条件变量
}
操作加锁的资源
解锁

其实机制也很简单,条件变量就是在条件不满足的时候,把线程插入等待队列,等待条件满足的时候再唤醒队列里的线程。我们看一下具体实现。

// 阻塞等待条件。进入该函数前,已经获得了互斥锁mutex
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
  volatile pthread_t self = thread_self();
  // 加锁操作队列
  acquire(&cond->c_spinlock);
  // 插入条件的等待队列
  enqueue(&cond->c_waiting, self);
  // 操作完释放锁
  release(&cond->c_spinlock);
  // 释放互斥变量,否则别人无法操作资源,导致条件一直无法满足
  pthread_mutex_unlock(mutex);
  // 挂起等待条件满足后被唤醒
  suspend_with_cancellation(self);
  // 被唤醒后重新获取互斥锁
  pthread_mutex_lock(mutex);
  /* This is a cancellation point */
  // 取消点,等待期间被取消了
  if (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE) {
    /* Remove ourselves from the waiting queue if we're still on it */
    acquire(&cond->c_spinlock);
    // 线程准备退出,从条件阻塞队列中移除
    remove_from_queue(&cond->c_waiting, self);
    release(&cond->c_spinlock);
    pthread_exit(PTHREAD_CANCELED);
  }
  return 0;
}

pthread_cond_wait函数是当条件不能满足时,线程调用的函数。调用完后线程会被挂起,等待被唤醒(如果不希望一直被阻塞可以调用pthread_cond_timedwait,pthread_cond_timedwait支持定时阻塞)。看一下挂起线程的逻辑。

static inline void suspend_with_cancellation(pthread_t self)
{
  sigset_t mask;
  sigjmp_buf jmpbuf;
  // 获取当前的信号屏蔽码
  sigprocmask(SIG_SETMASK, NULL, &mask); /* Get current signal mask */
  // 清除PTHREAD_SIG_RESTART的信号掩码,即允许处理该信号
  sigdelset(&mask, PTHREAD_SIG_RESTART); /* Unblock the restart signal */
  /* No need to save the signal mask, we'll restore it ourselves */
  /*
    直接调用返回0,从siglongjump回来返回非0,这里支持线程挂起时,
    收到restart信号被唤醒,或者在取消信号的处理函数中,通过siglongjmp返回这里
  */
  if (sigsetjmp(jmpbuf, 0) == 0) {
    self->p_cancel_jmp = &jmpbuf;
    // 已经被取消并且是可取消的则直接返回,否则挂起等待唤醒
    if (! (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE)) {
      do {
        // 挂起等待restart信号
        sigsuspend(&mask);               /* Wait for a signal */
      } while (self->p_signal != PTHREAD_SIG_RESTART);
    }
    self->p_cancel_jmp = NULL;
  } else {
    // 从cancel信号的处理函数中的siglongjmp返回,重新设置信号掩码,屏蔽restart信号
    sigaddset(&mask, PTHREAD_SIG_RESTART); /* Reblock the restart signal */
    sigprocmask(SIG_SETMASK, &mask, NULL);
  }
}

我们看到最终通过调用sigsuspend挂起线程。等待信号的唤醒,从while循环的条件我们可以看到,当收到PTHREAD_SIG_RESTART信号的时候线程才会真正被“唤醒”。接着我们看看当条件满足后,其他线程是如何唤醒被阻塞的线程的。

// 条件满足,唤醒线程
int pthread_cond_signal(pthread_cond_t *cond)
{
  pthread_t th;

  acquire(&cond->c_spinlock);
  // 取出一个被被阻塞的线程
  th = dequeue(&cond->c_waiting);
  release(&cond->c_spinlock);
  // 发送信号唤醒他
  if (th != NULL) restart(th);
  return 0;
}

// 给pid进程发送唤醒信号
static inline void restart(pthread_t th)
{
  kill(th->p_pid, PTHREAD_SIG_RESTART);
}

我们看到pthread_cond_signal的函数非常简单,从阻塞队列中获取一个线程,然后给他发一个唤醒信号。另外线程库也支持唤醒所有线程。

// 条件满足,唤醒所有线程
int pthread_cond_broadcast(pthread_cond_t *cond)
{
  pthread_queue tosignal;
  pthread_t th;

  acquire(&cond->c_spinlock);
  /* Copy the current state of the waiting queue and empty it */
  tosignal = cond->c_waiting;
  // 重置阻塞队列
  queue_init(&cond->c_waiting);
  release(&cond->c_spinlock);
  /* Now signal each process in the queue */
  // 发送信号唤醒所有线程
  while ((th = dequeue(&tosignal)) != NULL) restart(th);
  return 0;
}

pthread_cond_broadcast就是给每一个等待的线程发送唤醒信号。这就是线程条件变量的原理和实现。最后我们看一下使用例子。

struct prodcons {
  int buffer[BUFFER_SIZE];      /* 环形数据缓冲区 */
  pthread_mutex_t lock;         /* 访问数据区的互斥锁 */
  int readpos, writepos;        /* 读写指针 */
  pthread_cond_t notempty;      /* 消费者使用的条件变量,非空即有数据消费 */
  pthread_cond_t notfull;       /* 生产者使用的条件变量,非满即可以生产数据 */
};

struct prodcons buffer;

void init(struct prodcons * b)
{
  pthread_mutex_init(&b->lock, NULL);
  pthread_cond_init(&b->notempty, NULL);
  pthread_cond_init(&b->notfull, NULL);
  b->readpos = 0;
  b->writepos = 0;
}

int main()
{
  pthread_t th_a, th_b;
  void * retval;
  // 初始化线程间共享的数据结构
  init(&buffer);
  // 创建两个线程
  pthread_create(&th_a, NULL, producer, 0);
  pthread_create(&th_b, NULL, consumer, 0);
  pthread_join(th_a, &retval);
  pthread_join(th_b, &retval);
  return 0;
}

我们分别看看生产者和消费者的逻辑

void * producer(void * data)
{
  int n;
  for (n = 0; n < 10000; n++) {
    printf("%d --->\n", n);
    put(&buffer, n);
  }
  put(&buffer, OVER);
  return NULL;
}

void * consumer(void * data)
{
  int d;
  while (1) {
    d = get(&buffer);
    if (d == OVER) break;
    printf("---> %d\n", d);
  }
  return NULL;
}

我们看到生产者和消费者的逻辑很简单,就是一个往buffer里写数据,一个从buffer里读数据。问题在于如果没有空间可写或者没有数据可读怎么办?我们看get和put函数的逻辑。

void put(struct prodcons * b, int data)
{
  // 操作共享数据需要加锁
  pthread_mutex_lock(&b->lock);
  /* 写指针+1等于读指针,说明没有空闲可写了,等待空闲空间 */
  while ((b->writepos + 1) % BUFFER_SIZE == b->readpos) {
    pthread_cond_wait(&b->notfull, &b->lock);
  }
  // pthread_cond_wait中被唤醒后会重新获得互斥锁,所以这里直接操作就行
  b->buffer[b->writepos] = data;
  b->writepos++;
  // 到尾巴了,修正位置
  if (b->writepos >= BUFFER_SIZE) b->writepos = 0;
  /* 有数据可消费了,通知等待的消费者 */
  pthread_cond_signal(&b->notempty);
  pthread_mutex_unlock(&b->lock);
}

接着看看get的实现。

int get(struct prodcons * b)
{
  int data;
  pthread_mutex_lock(&b->lock);
  /* 读写指针相等说明没有数据读了,等待数据 */
  while (b->writepos == b->readpos) {
    pthread_cond_wait(&b->notempty, &b->lock);
  }
  data = b->buffer[b->readpos];
  b->readpos++;
  if (b->readpos >= BUFFER_SIZE) b->readpos = 0;
  /* 消费了数据,说明有空闲空间了,唤醒生产者 */
  pthread_cond_signal(&b->notfull);
  pthread_mutex_unlock(&b->lock);
  return data;
}

以上就是线程间同步机制:条件变量的实现和原理。

上一篇:线程的创建、加锁


下一篇:C/C++基于线程的并发编程(二):线程安全和互斥锁