线程同步的几种方式

目录

1、前言

几年的编程生涯中,线程的使用可以说是非常常见的,从工作第一年把GUI和后台工作放在同一个线程中导致界面卡死(想想以前还装专业地给生产的同事写SOP,让他们在操作的时候别点击界面,真可笑),到现在能随随便便就能封装一个简易的线程池,这中间这么些年却从来没有系统地整理过线程的一些重要的知识点,今天翻了翻旧书,想起来,顺便整理一下线程间同步的几种方式。

关于线程的一些基础知识和使用方式,就不多说了,直接上正文。

以下内容,细节和代码处参考 后台开发核心技术与应用实践 一书

2、线程同步

我们都知道,在不同的线程针对同一个资源的时候,如果只是单纯的读取,不需要加以同步处理,而如果修改,不进行处理的话,会导致资源的冲突。

因为在并发的情况下,指令执行的先后顺序由内核决定。同一个线程的内部,指令按照先后顺序执行,但不同线程之间的指令很难说清楚哪一个会先执行。如果运行的结果依赖于不同线程执行的先后的话,那么就会造成竞争条件,在这样的状况下,计算机的结果很难预知,所以应该尽量避免竞争条件的形成。最常见的解决竞争条件的方法是将原先分离的两个指令构成不可分割的一个原子操作,而其他任务不能插入到原子操作中。

对于多线程的程序来说,同步指的是在一定的时间内只允许某一个线程访问某个资源,具体可以使用一下四种方式实现:

  1. 互斥锁(mutex)
  2. 条件变量(condition)
  3. 读写锁(reader-writer lock)
  4. 信号量(semphore)

以下代码使用 pthread库 示例,不同的库有不同的实现和使用方式,该篇只做演示,不做详细讨论

以下面代码示例,这是一个会造成资源竞争的代码,运行会出现不可预知的错误

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int gTotalTickerNum = 20;

void *sellTicket(void *arg) {
  for (int i = 0; i < 20; i++) {
    if (gTotalTickerNum > 0) {
      sleep(1);
      printf("sell the %dth ticket\n", 20 - gTotalTickerNum + 1);
      gTotalTickerNum--;
    }
  }
  return nullptr;
}

int main() {
  pthread_t pthread[4];
  for (int index = 0; index < 4; index++) {
    int res = pthread_create(&pthread[index], nullptr, &sellTicket, nullptr);
    if (res) {
      printf("pthread create error, res=%d\n", res);
      return res;
    }
  }
  sleep(20);
  void *retval;
  for (int index = 0; index < 4; index++) {
    int res = pthread_join(pthread[index], &retval);
    if (res) {
      printf("tif=%d join error, res = %d\n", index, res);
      return res;
    }
    printf("retval=%d\n", retval);
  }
  return 0;
}

2.1、互斥锁

互斥锁 是最常见的线程同步方式,它是一种特殊的变量,它有 lockunlock 两种状态,一旦获取,就会上锁,且只能由该线程解锁,期间,其他线程无法获取

以上代码使用互斥锁可以修改如下:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int gTotalTickerNum = 20;

void *sellTicket(void *arg) {
  for (int i = 0; i < 20; i++) {
    pthread_mutex_lock(&mutex);
    if (gTotalTickerNum > 0) {
      sleep(1);
      printf("sell the %dth ticket\n", 20 - gTotalTickerNum + 1);
      gTotalTickerNum--;
    }
    pthread_mutex_unlock(&mutex);
  }
  return nullptr;
}

如上,在使用同一个资源前加锁,使用后解锁,即可实现线程同步,需要注意的是,如果加锁后不解锁,会造成死锁

优点:

  1. 使用简单;

缺点:

  1. 重复锁定和解锁,每次都会检查共享数据结构,浪费时间和资源;
  2. 繁忙查询的效率非常低;

2.2、条件变量

针对互斥锁浪费资源且效率低的缺点,可以使用条件变量。

条件变量的方法是,当线程在等待某些满足条件时使线程进入睡眠状态,一旦条件满足,就唤醒,这样不会占用宝贵的互斥对象锁,实现高效

条件变量允许线程阻塞并等待另一个线程发送信号,一般和互斥锁一起使用。

条件变量被用来阻塞一个线程,当条件不满足时,线程会解开互斥锁,并等待条件发生变化。一旦其他线程改变了条件变量,将通知相应的阻塞线程,这些线程重新锁定互斥锁,然后执行后续代码,最后再解开互斥锁。

代码如下:

#include <iostream>
#include <unistd.h>

using namespace std;

pthread_cond_t qready = PTHREAD_COND_INITIALIZER;//初始构造条件变量
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;//初始构造锁

int x = 10, y = 20;

void *func1(void *arg) {
  cout << "func1" << endl;
  pthread_mutex_lock(&qlock);
  while (x < y) {
    pthread_cond_wait(&qready, &qlock);
  }
  pthread_mutex_unlock(&qlock);
  cout << "func1 end" << endl;
}

void *func2(void *arg) {
  cout << "func2" << endl;
  pthread_mutex_lock(&qlock);
  x = 20, y = 10;
  cout << "x & y changed" << endl;
  pthread_mutex_unlock(&qlock);
  if (x > y) {
    pthread_cond_signal(&qready);
  }
  cout << "func2 end" << endl;
}

int main() {
  pthread_t tid1, tid2;
  int res = pthread_create(&tid1, nullptr, func1, nullptr);
  if (res) {
    cout << "pthread 1 create error" << endl;
    return res;
  }
  sleep(2);
  res = pthread_create(&tid2, nullptr, func2, nullptr);
  if (res) {
    cout << "pthread 2 create error" << endl;
    return res;
  }
  sleep(10);
  return 0;
}

如上,条件变量需要和互斥锁一起使用

代码中,如果没有条件变量,则需要等 func1() 执行完成之后,fun2() 才能获取互斥锁,继续执行。

而使用了条件变量之后,在 fun1 中的 pthread_cond_wait(&qready, &qlock); 代码,会先解开互斥锁,让其他线程能够得到这个互斥锁。在 fun2 中完成条件后执行代码 pthread_cond_signal(&qready); 后激活等待的线程,此时若有多个线程,则会按照入队的顺序激活一个,当然也可以使用 pthread_cond_broadcast() 方法激活所有等待线程

pthread_cond_signal 函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行

pthread_cond_signal 不会有 惊群现象,它最多给一个线程发送信号。当有多个线程阻塞时,会根据优先级高低和入队顺序决定取消阻塞线程

2.3、读写锁

读写锁 也称之为 共享-独占锁,一般用在读和写的次数有很大不同的场合。即对某些资源的访问会出现两种情况,一种是访问的排他性,需要独占,称之为写操作;还有就是访问可以共享,称之为读操作。

读写所 相比于不管三七二十一,通通独占的模式,有着很大的适用性和并行性。其有以下几种状态:

  1. 读写锁处于写锁定的状态,则在解锁之前,所有试图加锁的线程都会阻塞;
  2. 读写锁处于读锁定的状态,则所有试图以读模式加锁的线程都可得到访问权,但是以写模式加锁的线程则会阻塞;
  3. 读写锁处于读模式的锁(未加锁)状态时,有另外的线程试图以写模式加锁,则读写锁会阻塞读模式加锁的请求,这样避免了读模式锁长期占用,导致的写模式锁长期阻塞的情况;

适用场景:

读写锁最适用于对数据结构的毒操作次数多于写操作次数的场合。

处理这种问题一般有两种常见的策略:

  1. 强读者同步

    总是给读者更高的优先权,只要没有写操作,读者就可以获取访问权,比如图书馆查询系统采用强读者同步策略;

  2. 强写者同步

    写者有更高的优先级,读者只能等到写者结束之后才能执行,比如航班订票系统,要求看到最新的信息记录,会使用强写者同步策略;

2.4、信号量

信号量 和互斥锁的区别在于:互斥锁只允许一个线程进入临界区,信号量允许多个线程同时进入临界区

可以这样理解,互斥锁使用对同一个资源的互斥的方式达到线程同步的目的,信号量可以同步多个资源以达到线程同步

代码示例:

以下代码模拟某个营业厅两个窗口处理业务的场景,有10个客户进入营业厅,当发现窗口已满,则等待,当有可用的窗口时,就接受服务

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>

#define CUSTOMER_NUM 10

sem_t sem;

void *get_server(void *thread_id) {
    int cusotmer_id = *((int *) thread_id);
    if (sem_wait(&sem) == 0) {
        printf("customer %d receive server ...\n", cusotmer_id);
        sem_post(&sem);
    }
}

int main() {
    sem_init(&sem, 0, 2);
    pthread_t customers[CUSTOMER_NUM];
    int customer_id[CUSTOMER_NUM];//用户id

    for (int index = 0; index < CUSTOMER_NUM; index++) {
        customer_id[index] = index;
        printf("customer %d arrived\n", customer_id[index]);
        int res = pthread_create(&customers[index], nullptr, get_server, &customer_id[index]);
        if (res) {
            printf("create thread error\n");
            return res;
        }
    }
    for (int index = 0; index < CUSTOMER_NUM; index++) {
        pthread_join(customers[index], nullptr);
    }
    sem_destroy(&sem);

    return 0;
}

执行结果

customer 0 arrived
customer 1 arrived
customer 2 arrived
customer 1 receive server ...
customer 2 receive server ...
customer 3 arrived
customer 4 arrived
customer 0 receive server ...
customer 3 receive server ...
customer 4 receive server ...
customer 5 arrived
customer 6 arrived
customer 5 receive server ...
customer 6 receive server ...
customer 7 arrived
customer 8 arrived
customer 9 arrived
customer 8 receive server ...
customer 7 receive server ...
customer 9 receive server ...

if (sem_wait(&sem) == 0) 表示当前信号量大于0,即有空闲的窗口,可以为该顾客服务,并将信号量-1,服务完成 sem_post 方法把信号量+1,以便继续服务

void *get_server(void *thread_id) {
  int cusotmer_id = *((int *) thread_id);
  if (sem_wait(&sem) == 0) {
    usleep(100);
    printf("customer %d receive server ...\n", cusotmer_id);
    sem_post(&sem);
  }
}

3、总结

几种线程同步的方式各有利弊,实际开发中需要根据场景选择不同的方式使用开发

上一篇:介绍Dynamics 365的OrgDBOrgSettings工具


下一篇:Ribbon负载均衡