条件变量是线程间同步的一种机制,本文分析条件变量的实现和使用。我们先看一下条件变量的定义。
typedef struct
{
int c_spinlock; /* Spin lock to protect the queue. */
struct _pthread_queue c_waiting; /* Threads waiting on this condition. */
} pthread_cond_t;
我们看到条件变量的定义很简单,条件变量通常配合互斥变量一起使用,大致流程如下
加锁
if (条件不满足) {
阻塞在条件变量
}
操作加锁的资源
解锁
其实机制也很简单,条件变量就是在条件不满足的时候,把线程插入等待队列,等待条件满足的时候再唤醒队列里的线程。我们看一下具体实现。
// 阻塞等待条件。进入该函数前,已经获得了互斥锁mutex
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
{
volatile pthread_t self = thread_self();
// 加锁操作队列
acquire(&cond->c_spinlock);
// 插入条件的等待队列
enqueue(&cond->c_waiting, self);
// 操作完释放锁
release(&cond->c_spinlock);
// 释放互斥变量,否则别人无法操作资源,导致条件一直无法满足
pthread_mutex_unlock(mutex);
// 挂起等待条件满足后被唤醒
suspend_with_cancellation(self);
// 被唤醒后重新获取互斥锁
pthread_mutex_lock(mutex);
/* This is a cancellation point */
// 取消点,等待期间被取消了
if (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE) {
/* Remove ourselves from the waiting queue if we're still on it */
acquire(&cond->c_spinlock);
// 线程准备退出,从条件阻塞队列中移除
remove_from_queue(&cond->c_waiting, self);
release(&cond->c_spinlock);
pthread_exit(PTHREAD_CANCELED);
}
return 0;
}
pthread_cond_wait函数是当条件不能满足时,线程调用的函数。调用完后线程会被挂起,等待被唤醒(如果不希望一直被阻塞可以调用pthread_cond_timedwait,pthread_cond_timedwait支持定时阻塞)。看一下挂起线程的逻辑。
static inline void suspend_with_cancellation(pthread_t self)
{
sigset_t mask;
sigjmp_buf jmpbuf;
// 获取当前的信号屏蔽码
sigprocmask(SIG_SETMASK, NULL, &mask); /* Get current signal mask */
// 清除PTHREAD_SIG_RESTART的信号掩码,即允许处理该信号
sigdelset(&mask, PTHREAD_SIG_RESTART); /* Unblock the restart signal */
/* No need to save the signal mask, we'll restore it ourselves */
/*
直接调用返回0,从siglongjump回来返回非0,这里支持线程挂起时,
收到restart信号被唤醒,或者在取消信号的处理函数中,通过siglongjmp返回这里
*/
if (sigsetjmp(jmpbuf, 0) == 0) {
self->p_cancel_jmp = &jmpbuf;
// 已经被取消并且是可取消的则直接返回,否则挂起等待唤醒
if (! (self->p_canceled && self->p_cancelstate == PTHREAD_CANCEL_ENABLE)) {
do {
// 挂起等待restart信号
sigsuspend(&mask); /* Wait for a signal */
} while (self->p_signal != PTHREAD_SIG_RESTART);
}
self->p_cancel_jmp = NULL;
} else {
// 从cancel信号的处理函数中的siglongjmp返回,重新设置信号掩码,屏蔽restart信号
sigaddset(&mask, PTHREAD_SIG_RESTART); /* Reblock the restart signal */
sigprocmask(SIG_SETMASK, &mask, NULL);
}
}
我们看到最终通过调用sigsuspend挂起线程。等待信号的唤醒,从while循环的条件我们可以看到,当收到PTHREAD_SIG_RESTART信号的时候线程才会真正被“唤醒”。接着我们看看当条件满足后,其他线程是如何唤醒被阻塞的线程的。
// 条件满足,唤醒线程
int pthread_cond_signal(pthread_cond_t *cond)
{
pthread_t th;
acquire(&cond->c_spinlock);
// 取出一个被被阻塞的线程
th = dequeue(&cond->c_waiting);
release(&cond->c_spinlock);
// 发送信号唤醒他
if (th != NULL) restart(th);
return 0;
}
// 给pid进程发送唤醒信号
static inline void restart(pthread_t th)
{
kill(th->p_pid, PTHREAD_SIG_RESTART);
}
我们看到pthread_cond_signal的函数非常简单,从阻塞队列中获取一个线程,然后给他发一个唤醒信号。另外线程库也支持唤醒所有线程。
// 条件满足,唤醒所有线程
int pthread_cond_broadcast(pthread_cond_t *cond)
{
pthread_queue tosignal;
pthread_t th;
acquire(&cond->c_spinlock);
/* Copy the current state of the waiting queue and empty it */
tosignal = cond->c_waiting;
// 重置阻塞队列
queue_init(&cond->c_waiting);
release(&cond->c_spinlock);
/* Now signal each process in the queue */
// 发送信号唤醒所有线程
while ((th = dequeue(&tosignal)) != NULL) restart(th);
return 0;
}
pthread_cond_broadcast就是给每一个等待的线程发送唤醒信号。这就是线程条件变量的原理和实现。最后我们看一下使用例子。
struct prodcons {
int buffer[BUFFER_SIZE]; /* 环形数据缓冲区 */
pthread_mutex_t lock; /* 访问数据区的互斥锁 */
int readpos, writepos; /* 读写指针 */
pthread_cond_t notempty; /* 消费者使用的条件变量,非空即有数据消费 */
pthread_cond_t notfull; /* 生产者使用的条件变量,非满即可以生产数据 */
};
struct prodcons buffer;
void init(struct prodcons * b)
{
pthread_mutex_init(&b->lock, NULL);
pthread_cond_init(&b->notempty, NULL);
pthread_cond_init(&b->notfull, NULL);
b->readpos = 0;
b->writepos = 0;
}
int main()
{
pthread_t th_a, th_b;
void * retval;
// 初始化线程间共享的数据结构
init(&buffer);
// 创建两个线程
pthread_create(&th_a, NULL, producer, 0);
pthread_create(&th_b, NULL, consumer, 0);
pthread_join(th_a, &retval);
pthread_join(th_b, &retval);
return 0;
}
我们分别看看生产者和消费者的逻辑
void * producer(void * data)
{
int n;
for (n = 0; n < 10000; n++) {
printf("%d --->\n", n);
put(&buffer, n);
}
put(&buffer, OVER);
return NULL;
}
void * consumer(void * data)
{
int d;
while (1) {
d = get(&buffer);
if (d == OVER) break;
printf("---> %d\n", d);
}
return NULL;
}
我们看到生产者和消费者的逻辑很简单,就是一个往buffer里写数据,一个从buffer里读数据。问题在于如果没有空间可写或者没有数据可读怎么办?我们看get和put函数的逻辑。
void put(struct prodcons * b, int data)
{
// 操作共享数据需要加锁
pthread_mutex_lock(&b->lock);
/* 写指针+1等于读指针,说明没有空闲可写了,等待空闲空间 */
while ((b->writepos + 1) % BUFFER_SIZE == b->readpos) {
pthread_cond_wait(&b->notfull, &b->lock);
}
// pthread_cond_wait中被唤醒后会重新获得互斥锁,所以这里直接操作就行
b->buffer[b->writepos] = data;
b->writepos++;
// 到尾巴了,修正位置
if (b->writepos >= BUFFER_SIZE) b->writepos = 0;
/* 有数据可消费了,通知等待的消费者 */
pthread_cond_signal(&b->notempty);
pthread_mutex_unlock(&b->lock);
}
接着看看get的实现。
int get(struct prodcons * b)
{
int data;
pthread_mutex_lock(&b->lock);
/* 读写指针相等说明没有数据读了,等待数据 */
while (b->writepos == b->readpos) {
pthread_cond_wait(&b->notempty, &b->lock);
}
data = b->buffer[b->readpos];
b->readpos++;
if (b->readpos >= BUFFER_SIZE) b->readpos = 0;
/* 消费了数据,说明有空闲空间了,唤醒生产者 */
pthread_cond_signal(&b->notfull);
pthread_mutex_unlock(&b->lock);
return data;
}
以上就是线程间同步机制:条件变量的实现和原理。