(九) 一起学 Unix 环境高级编程 (APUE) 之 线程

.

.

.

.

.

目录

(一) 一起学 Unix 环境高级编程 (APUE) 之 标准IO

(二) 一起学 Unix 环境高级编程 (APUE) 之 文件 IO

(三) 一起学 Unix 环境高级编程 (APUE) 之 文件和目录

(四) 一起学 Unix 环境高级编程 (APUE) 之 系统数据文件和信息

(五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境

(六) 一起学 Unix 环境高级编程 (APUE) 之 进程控制

(七) 一起学 Unix 环境高级编程 (APUE) 之 进程关系 和 守护进程

(八) 一起学 Unix 环境高级编程 (APUE) 之 信号

(九) 一起学 Unix 环境高级编程 (APUE) 之 线程

(十) 一起学 Unix 环境高级编程 (APUE) 之 线程控制

(十一) 一起学 Unix 环境高级编程 (APUE) 之 高级 IO

(十二) 一起学 Unix 环境高级编程 (APUE) 之 进程间通信(IPC)

(十三) [终篇] 一起学 Unix 环境高级编程 (APUE) 之 网络 IPC:套接字

本章主要讲线程的基本操作,线程的创建、取消、终止、同步等等。

实际项目中多线程用得比较多,因为多线程是先有标准后有实现的,所以不会向多进程那样在不同平台上有许多不同的情况。

线程就是一个正在运行的函数。

C 语言线程有很多标准,POSIX 是其中的一种。

POSIX 是一套标准,而不是一种实现。

正因为 POSIX 是一套标准而不是实现,所以 POSIX 只是规定了 pthread_t 作为线程标识符,但是并没有规定它必须是由什么类型组成的,所以在有的平台上它可能是 int,有些平台上它可能是 struct,还有些平台上它可能是 union,所以不要直接操作这个类型,而是要使用 POSIX 规定的各种线程函数来操作它。

有木有觉得像标准 IO 里 FILE 的赶脚?没错,标准制定出来的很多东西都是这种风格的,它为你提供一个数据类型而不让你直接对这个类型操作,要通过它定义的一系列函数来实现对这个类型的操作,这样就在各个平台上实现统一的接口了,所以这样做才能让标准制定出来的东西具有较好的可移植性。

pthread_t 是个很重要的东西,我们所有使用 PSOIX 标准的线程操作都是围绕着它来进行的,通过它配合各种函数就可以对线程进行各种花样作死的玩了。:)

我记得在前面进程的博文中 LZ 介绍过几种 ps(1) 命令的使用方式,用来观察进程的关系和状态。在本篇开始之前 LZ 再补充一个 ps(1) 命令的组合,用来查看线程的情况,方便我们调试程序。

>$ ps ax -L
PID LWP TTY STAT TIME COMMAND
? Ss : /sbin/init
? S : [kthreadd]
? S : [ksoftirqd/]
? Ss : dbus-daemon --system --fork
? Ssl : /usr/sbin/ModemManager
? Ssl : /usr/sbin/ModemManager
? Ssl : /usr/sbin/ModemManager
? Ss : /usr/sbin/bluetoothd
>$

PID 是进程号,LWP 是线程 ID。

这里看到的 PID 为 948 的进程有三个 LWP,它们就是三个线程。

1. pthread_equal(3)

 pthread_equal - compare thread IDs

 #include <pthread.h>

 int pthread_equal(pthread_t t1, pthread_t t2);

 Compile and link with -pthread.

第一个要介绍的函数是 pthread_equal(3),比较两个线程标识符是否相同。为什么不能使用 if (t1 == t2) 的方式比较两个线程标识符呢?就像我们上面说的,因为你不知道 pthread_t 是什么类型的,所以永远不要自己直接操作它。

2. pthread_self(3)

 pthread_self - obtain ID of the calling thread

 #include <pthread.h>

 pthread_t pthread_self(void);

 Compile and link with -pthread.

大家还记得一个进程可以通过 getpid(2) 函数获得当前进程的 ID 号吧?pthread_self(3) 就是获得当前线程 ID 的函数。

3. pthread_create(3)

 pthread_create - create a new thread

 #include <pthread.h>

 int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg); Compile and link with -pthread.

pthread_create(3) 函数的作用就是创建一个新线程。

参数列表:

  thread:由函数回填的线程标识符,它来唯一的标识产生的新线程,后面我们只要需要操作新线程就需要用到它;

  attr:线程属性,在本篇博文(第 11 章)中,所有的属性都是使用 NULL,也就是使用默认属性。

  start_routine:线程的执行函数;入参是 void*,返回值是 void*,恭喜你,这两个值的类型都是百搭的,任何类型你都可以在这使用了。

  arg:传递给 start_routine 的 void* 参数。

返回值:成功返回 0;失败返回 errno。为什么线程函数返回的是 errno 呢?因为在一些平台上 error 是全局变量,如果大家都使用同一个全局变量,在多线程的情况下就可能会出现竞争,所以 POSIX 的线程函数一般在失败的时候都是直接返回 errno 的,这样就避免了某些平台 errno 的缺陷了。

新线程和当前的线程是两个兄弟线程,他们是平等的,没有父子关系。

新线程被创建之后,这两个线程哪个先执行是不确定的,由调度器来决定。如果你希望哪个线程一定先执行,那么就在其它线程中使用类似 sleep(3) 的函数让它们等一会儿再运行。

4. pthread_exit(3)

 pthread_exit - terminate calling thread

 #include <pthread.h>

 void pthread_exit(void *retval);

 Compile and link with -pthread.

在线程执行函数中调用,作用是退出当前线程,并将返回值通过 retval 参数返回给调用 pthread_join(3) 函数的地方,如果不需要返回值可以传入 NULL。

pthread_join(3) 是为线程收尸的函数,我们会在下面详细介绍。

我们先来看个小栗子,直观的了解下线程是如何被创建的,以及它是如何工作的。

 #include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h> static void *func(void *p)
{
puts("Thread is working."); sleep(); // 延时是为了方便我们使用 ps(1) 命令验证线程是否被创建了 pthread_exit(NULL);
// return NULL;
} int main()
{
pthread_t tid;
int err; puts("Begin!"); // 创建线程
err = pthread_create(&tid,NULL,func,NULL);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit();
} // 为线程收尸
pthread_join(tid,NULL); puts("End!"); exit();
}

编译并运行,同时使用 LZ 上面介绍过的 ps(1) 命令验证线程是否被创建成功了。

>$ gcc -Wall create.c -o create -pthread
>$ ./create
Begin!
Thread is working.
End!
>$
# 在线程结束之前打开另一个终端,验证线程的状态
>$ ps ax -L
PID LWP TTY STAT TIME COMMAND
pts/ Sl+ : ./create
pts/ Sl+ : ./create
>$

通过 ps(1) 命令的验证,可以看到这两个线程拥有同一个 PID 不同的 LWP,所以可以直观的看出来我们的线程创建成功了!

大家注意,编译 POSIX 线程程序的时候需要使用 -pthread 参数,这个其实在 man 手册里已经说得很清楚了,但是还是总有童鞋在编译的时候没有加这个参数然后还问我为什么编译不通过,鄙视你们这帮粗心的家伙。。

5. pthread_cancel(3)

 pthread_cancel - send a cancellation request to a thread

 #include <pthread.h>

 int pthread_cancel(pthread_t thread);

 Compile and link with -pthread.

pthread_cancel(3) 函数的作用是取消同一个进程中的其它线程线程。

为什么要取消线程呢?当一个线程没有必要继续执行下去时,我们又没法为它收尸,所以就需要先取消这个线程,然后再为它收尸。

比如在使用多线程遍历一个很大的二叉树查找一个数据时,其中某一个线程找到了要查找的数据,那么其它线程就没有必要继续执行了,所以就可以取消它们了。

注意 pthread_cancel(3) 并不等待线程终止,它仅仅提出请求。

而线程收到这个请求也不会立即终止,线程要执行到取消点才能被取消,关于取消点在下一篇博文中会介绍。

6. pthread_join(3)

 pthread_join - join with a terminated thread

 #include <pthread.h>

 int pthread_join(pthread_t thread, void **retval);

 Compile and link with -pthread.

为线程收尸,在上面的栗子中大家已经见到了。不像 wait(2) 函数,线程之间谁都可以为别人收尸,它们之间是没有父子关系的。而 wait(2) 函数只能是由父进程对子进程收尸。

参数列表:

  thread:指定为哪个线程收尸;

  retval:这个二级指针是什么呢?它就是线程在退出的时候的返回值(pthread_exit(3) 的参数),它会把线程的返回值的地址回填到这个参数中。

7.线程清理处理程序(thread cleanup handler)

 pthread_cleanup_push, pthread_cleanup_pop - push and pop thread cancellation clean-up handlers

 #include <pthread.h>

 void pthread_cleanup_push(void (*routine)(void *), void *arg);
void pthread_cleanup_pop(int execute); Compile and link with -pthread.

就像在进程级别使用 atexit(3) 函数挂钩子函数一样,线程可能也需要在结束时执行一些清理工作,这时候就需要派出线程清理处理程序上场了。钩子函数的调用顺序也是逆序的,也就是执行顺序与注册顺序相反。

这两个是带参的宏而不是函数,所以必须成对使用,而且必须先使用 pthread_cleanup_push 再使用  pthread_cleanup_pop,否则会报语法错误,括号不匹配。

参数列表:

  routine:钩子函数。

  arg:传递给钩子函数的参数。

  execute:0 不调用该钩子函数;1 调用该钩子函数。

pthread_cleanup_pop 写到哪都行,只要写了让语法不报错就行,就算你把它写到 pthread_exit(3) 下面也没问题,但是 execute 参数就看不到了,所以无论 pthread_cleanup_pop 的参数是什么,所有注册过的钩子函数都会被执行。

 #include <pthread.h>

 void routine (void *p) {}

 void* fun (void *p)
{
pthread_cleanup_push(routine, NULL);
这里是其它代码
pthread_cleanup_pop();
}

预编译,查看宏替换的结果:

>$ gcc -E cleanup.c
void routine (void *p) {} void* fun (void *p)
{
do { __pthread_unwind_buf_t __cancel_buf; void (*__cancel_routine) (void *) = (routine); void *__cancel_arg = (((void *))); int not_first_call = __sigsetjmp ((struct __jmp_buf_tag *) (void *) __cancel_buf.__cancel_jmp_buf, ); if (__builtin_expect (not_first_call, )) { __cancel_routine (__cancel_arg); __pthread_unwind_next (&__cancel_buf); } __pthread_register_cancel (&__cancel_buf); do {;
这里是其它代码
do { } while (); } while (); __pthread_unregister_cancel (&__cancel_buf); if () __cancel_routine (__cancel_arg); } while ();
}

通过预编译可以看出来 pthread_cleanup_push 和 pthread_cleanup_pop 两个宏被替换了,并且每个宏仅定义了一半,如果不成对写另一个宏编译的时候就会报括号不匹配的错误。

8. pthread_detach(3)

 pthread_detach - detach a thread

 #include <pthread.h>

 int pthread_detach(pthread_t thread);

 Compile and link with -pthread.

pthread_detach(3) 函数用于分离线程,被分离的线程是不能被收尸的。

9.互斥量(pthead_mutex_t)

多线程就是为了充分利用硬件资源,使程序可以并发的运行,但是只要是并发就会遇到竞争的问题,互斥量就是解决竞争的多种手段之一。

在介绍互斥量之前我们先思考一个问题:如何让 20 个线程同时从一个文件中读取数字,累加 1 然后再写入回去,并且保证程序运行之后文件中的数值比运行程序之前大 20?

 #include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h> #include <sys/types.h>
#include <sys/stat.h> #define BUFSIZE 32 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; static void *fun (void *p)
{
int fd = -;
long long n = ;
char buf[BUFSIZE] = ""; fd = open(p, O_RDWR | O_CREAT, );
/* if err */ pthread_mutex_lock(&mutex); read(fd, buf, BUFSIZE);
lseek(fd, , SEEK_SET);
n = atoll(buf);
snprintf(buf, BUFSIZE, "%lld\n", ++n);
write(fd, buf, strlen(buf)); close(fd); pthread_mutex_unlock(&mutex); pthread_exit(NULL);
} int main (int argc, char **argv)
{
pthread_t tids[];
int i = ; if (argc < ) {
fprintf(stderr, "Usage %s <filename>\n", argv[]);
return -;
} for (i = ; i < ; i++) {
pthread_create(&tids[i], NULL, fun, argv[]);
/* if err */
} for (i = ; i < ; i++) {
pthread_join(tids[i], NULL);
} pthread_mutex_destroy(&mutex); return ;
}

程序中每一个线程都要做:读取文件 --- 累加 1 --- 写入文件 的动作,如果 20 个线程同时做这件事,那么就很有可能多个线程读到的数据是相同的,这样累加的结果也就是相同的了,就没办法保证 20 个线程每个人读到的数据都是独一无二的了。

怎么样才能让 20 个线程读到独一无二的数值呢?很简单,让 读取文件 --- 累加 1 --- 写入文件 的这个动作同一时刻只能有一个线程来做,这样每个线程读取到的数值都是上一个线程写入的数值了。那么 读取文件 --- 累加 1 --- 写入文件 这段代码(也就是发生竞争的这段区域)就叫做“临界区”。

互斥量正如它的名字描述的一般,可以使各个线程实现互斥的效果。由它来保护临界区每次只能由一个线程进入,当一个线程想要进入临界区之前需要先抢锁(加锁),如果能抢到锁就进入临界区工作,并且要在离开的时候解锁以便让其它线程可以抢到锁进入临界区;如果没有抢到锁则进入阻塞状态等待锁被释放然后再抢锁。

要在进入临界区之前加锁,在退出临界区的时候解锁。

与 ptread_t 一样,互斥量也使用一种数据类型来表示,它使用 pthread_mutex_t 类型来表示。

初始化互斥量有两种方式:

1)用宏初始化:如同使用默认属性;

2)使用 pthread_mutex_init(3) 函数初始化,可以为互斥量指定属性。

pthread_mutex_t 使用完成之后需要使用 pthread_mutex_destroy(3) 函数销毁,否则会导致内存泄漏。

一般什么情况使用宏初始化,什么情况使用函数初始化互斥量呢?请看下面的伪代码:

 /* 定义并赋值 */
type name = value; // 定义并赋值。使用 PTHREAD_MUTEX_INITIALIZER 宏初始化互斥量必须在这种情况时。 /* 先定义再赋值 */
type name; // 定义
name = valu; // 赋值。这种情况不允许使用 PTHREAD_MUTEX_INITIALIZER 宏初始化互斥量,只能使用 pthread_mutex_init(3) 函数初始化互斥量。

前面说了,要在进入临界区之前加锁,在退出临界区的时候解锁。我们来了解一下加锁和解锁的函数。

 pthread_mutex_lock, pthread_mutex_trylock, pthread_mutex_unlock -  lock and unlock a mutex

 #include <pthread.h>

 int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

_lock() 是阻塞加锁,当抢锁的时候被抢不到就死等,直到别人通过 _unlock() 把这把锁解锁再抢。

_trylock() 是尝试加锁,无论能否抢到锁都返回。

临界区是每个线程要单独执行的,所以临界区中的代码执行时间越短越好。

了解了互斥量之后,我们再来看一道经典的面试题:用 4 个线程疯狂的打印 abcd 持续 5 秒钟,但是要按照顺序打印,不能是乱序的。

 #include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h> #define THRNUM 4 static pthread_mutex_t mut[THRNUM]; static int next(int a)
{
if(a+ == THRNUM)
return ;
return a+;
} static void *thr_func(void *p)
{
int n = (int)p;
int ch = n + 'a'; while()
{
pthread_mutex_lock(mut+n);
write(,&ch,);
pthread_mutex_unlock(mut+next(n));
}
pthread_exit(NULL);
} int main()
{
int i,err;
pthread_t tid[THRNUM]; for(i = ; i < THRNUM ; i++)
{
pthread_mutex_init(mut+i,NULL); pthread_mutex_lock(mut+i); err = pthread_create(tid+i,NULL,thr_func,(void *)i);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit();
} } pthread_mutex_unlock(mut+); alarm(); for(i = ; i < THRNUM ; i++)
pthread_join(tid[i],NULL); exit();
}

上面这段代码是通过多个互斥量实现了一个锁链的结构巧妙的实现了要求的效果。

首先定义 4 个互斥量,然后创建 4 个线程,每个互斥量对应一个线程,每个线程负责打印一个字母。4 个线程刚刚被创建好时,4 把锁都处于锁定状态,4 个线程全部都阻塞在临界区之外,等 4 个线程全部都创建好之后解锁其中一把锁。被解锁的线程首先将自己的互斥量上锁,然后打印字符再解锁下一个线程对应的互斥量,然后再次等待自己被解锁。如此往复,使 4 个线程有条不紊的循环执行 锁定自己 --- 打印字符 -- 解锁下一个线程 的步骤,这样打印到控制台上的 abcd 就是有序的了。

从上面的栗子可以看出来:互斥量限制的是一段代码能否执行,而不是一个变量或一个资源。

上面的代码虽然使用锁链巧妙的完成了任务,但是它的实现方式并不是最漂亮的,更好的办法我们下面介绍条件变量(pthread_cond_t)的时候会讨论。

大家还记得我们在上一篇博文中提到过令牌桶吗?当时只是实现了一个简单的令牌桶,这次我们来写一个通用的多线程并发版的令牌桶。

 /* mytbf.h */
#ifndef MYTBF_H__
#define MYTBF_H__ #define MYTBF_MAX 1024 typedef void mytbf_t; mytbf_t *mytbf_init(int cps,int burst); int mytbf_fetchtoken(mytbf_t *,int); int mytbf_returntoken(mytbf_t *,int ); void mytbf_destroy(mytbf_t *); #endif
 /* mytbf.c */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <string.h> #include "mytbf.h" /* 每一个令牌桶 */
struct mytbf_st
{
int cps; // 速率
int burst; // 令牌上限
int token; // 可用令牌数量
int pos; // 当前令牌桶在 job 数组中的下标
pthread_mutex_t mut; // 用来保护令牌竞争的互斥量
}; /* 所有的令牌桶 */
static struct mytbf_st *job[MYTBF_MAX];
/* 用来保护令牌桶数组竞争的互斥量 */
static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;
/* 添加令牌的线程 ID */
static pthread_t tid;
/* 初始化添加令牌的线程 */
static pthread_once_t init_once = PTHREAD_ONCE_INIT; /* 线程处理函数:负责定时向令牌桶中添加令牌 */
static void *thr_alrm(void *p)
{
int i; while()
{
pthread_mutex_lock(&mut_job);
// 遍历所有的桶
for(i = ; i < MYTBF_MAX; i++)
{
// 为可用的桶添加令牌
if(job[i] != NULL)
{
pthread_mutex_lock(&job[i]->mut);
job[i]->token += job[i]->cps;
// 桶中可用的令牌不能超过上限
if(job[i]->token > job[i]->burst)
job[i]->token = job[i]->burst;
pthread_mutex_unlock(&job[i]->mut);
}
}
pthread_mutex_unlock(&mut_job); // 等待一秒钟后继续添加令牌
sleep();
} pthread_exit(NULL);
} static void module_unload(void)
{
int i; pthread_cancel(tid);
pthread_join(tid,NULL); pthread_mutex_lock(&mut_job);
for(i = ; i < MYTBF_MAX ; i++)
{
if(job[i] != NULL)
{
// 互斥量使用完毕不要忘记释放资源
pthread_mutex_destroy(&job[i]->mut);
free(job[i]);
}
} pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&mut_job); } static void module_load(void)
{
int err; err = pthread_create(&tid,NULL,thr_alrm,NULL);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit();
} atexit(module_unload);
} /*
* 为了不破坏调用者对令牌桶操作的原子性,
* 在该函数内加锁可能会导致死锁,
* 所以该函数内部无法加锁,
* 必须在调用该函数之前先加锁。
*/
static int get_free_pos_unlocked(void)
{
int i; for(i = ; i < MYTBF_MAX; i++)
if(job[i] == NULL)
return i;
return -;
} mytbf_t *mytbf_init(int cps,int burst)
{
struct mytbf_st *me;
int pos; pthread_once(&init_once,module_load); me = malloc(sizeof(*me));
if(me == NULL)
return NULL; me->cps = cps;
me->burst = burst;
me->token = ;
pthread_mutex_init(&me->mut,NULL); pthread_mutex_lock(&mut_job); pos = get_free_pos_unlocked();
if(pos < )
{
// 带锁跳转不要忘记先解锁再跳转
pthread_mutex_unlock(&mut_job);
free(me);
return NULL;
} me->pos = pos; job[pos] = me; pthread_mutex_unlock(&mut_job); return me;
} static inline int min(int a,int b)
{
return (a < b) ? a : b;
} int mytbf_fetchtoken(mytbf_t *ptr,int size)
{
int n;
struct mytbf_st *me = ptr; if(size < )
return -EINVAL; pthread_mutex_lock(&me->mut);
// 令牌数量不足,等待令牌被添加进来
while(me->token <= )
{
// 先解锁,出让调度器让别人先跑起来,然后再抢锁检查令牌是否够用
pthread_mutex_unlock(&me->mut);
sched_yield();
pthread_mutex_lock(&me->mut);
} n = min(me->token,size); me->token -= n; pthread_mutex_unlock(&me->mut); return n;
} /* 令牌用不完要归还哟,可不能浪费了 */
int mytbf_returntoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr; // 逗我玩呢?
if(size < )
return -EINVAL; pthread_mutex_lock(&me->mut); me->token += size;
if(me->token > me->burst)
me->token = me->burst; pthread_mutex_unlock(&me->mut); return size;
} void mytbf_destroy(mytbf_t *ptr)
{
struct mytbf_st *me = ptr; pthread_mutex_lock(&mut_job);
job[me->pos] = NULL;
pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&me->mut);
free(ptr);
}

上面这个令牌桶库可以支持最多 1024 个桶,也就是可以使用多线程同时操作这 1024 个桶来获得不同的速率,每个桶的速率是固定的。

这 1024 个桶保存在一个数组中,所以每次访问桶的时候都需要对它进行加锁,避免多个线程同时访问发生竞争。

同样每个桶也允许使用多个线程同时访问,所以每个桶中也需要一个互斥量来保障处理令牌的时候不会发生竞争。

写互斥量的代码一定要注意临界区内的所有的跳转,通常在跳转之前需要解锁,避免产生死锁。常见的跳转包括 continue; break; return; goto; longjmp(3); 等等,甚至函数调用也是一种跳转。

当某个函数内包含临界区,也就是需要加锁再进入临界区,但是从程序的布局来看该函数无法加锁,那么根据 POSIX 标准的约定,这种函数的命名规则是必须以 _unlocked 作为后缀,所以大家在看到这样的函数时在调用之前一定要先加锁。总结起来说就是以这个后缀命名的函数表示函数内需要加锁但是没有加锁,所以调用者需要先加锁再调用,例如上面代码中的 get_free_pos_unlocked() 函数。

LZ 来解释一下上面这个令牌桶中用过的几个没见过的函数。

 sched_yield — yield the processor

 #include <sched.h>

 int sched_yield(void);

sched_yield(2) 这个函数的作用是出让调度器。在用户态无法模拟它的实现,它会让出当前线程所占用的调度器给其它线程使用,而不必等待时间片耗尽才切换调度器,大家暂时可以把它理解成一个很短暂的 sleep(3) 。一般用于在使用一个资源时需要同时获得多把锁但是却没法一次性获得全部的锁的场景下,只要有任何一把锁没有抢到,那么就立即释放已抢到的锁,并让出自己的调度器让其它线程有机会获得被自己释放的锁。当再次调度到自己时再重新抢锁,直到能一次性抢到所有的锁时再进入临界区,这样就避免了出现死锁的情况。

 pthread_once - dynamic package initialization

 #include <pthread.h>

 int pthread_once(pthread_once_t *once_control,
void (*init_routine)(void));
pthread_once_t once_control = PTHREAD_ONCE_INIT;

pthread_once(3) 函数一般用于动态单次初始化,它能保证 init_routine 函数仅被调用一次。

pthread_once_t 只能使用 PTHREAD_ONCE_INIT 宏初始化,没有提供其它初始化方式。这个与我们前面见到的初始化 pthread_t 和 pthread_nutex_t 不一样。

上面的代码中,向令牌桶添加令牌的线程只需要启动一次,而初始化令牌桶的函数却在开启每个令牌桶的时候都需要调用。为了在初始化令牌桶的函数中仅启动一次添加令牌的线程,采用 pthread_once(3) 函数来创建线程就可以了。这样之后在第一次调用 mytbf_init() 函数的时候会启动新线程添加令牌,而后续再调用 mytbf_init() 的时候就不会启动添加令牌的线程了。

上面代码中调用 pthread_once(3) 相当于下面的伪代码:

 lock();

 if (init_flag)

 {
init_flag = ;
// do sth
} unlock();

10.条件变量(pthread_cond_t)

上面的程序经过测试,发现 CPU 正在满负荷工作,说明程序中出现了忙等, 是哪里出现了忙等呢?其实就是 mytbf_fetchtoken() 函数获得锁的时候采用了忙等的方式。前面我们提到过,异步程序有两种处理方式,一种是通知法,一种是查询法,我们这里用的就是查询法,下面我们把它修改成个通知法来实现。

 /* mytbf.c */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <string.h> #include "mytbf.h" /* 每一个令牌桶 */
struct mytbf_st
{
int cps; // 速率
int burst; // 令牌上限
int token; // 可用令牌数量
int pos; // 当前令牌桶在 job 数组中的下标
pthread_mutex_t mut; // 用来保护令牌竞争的互斥量
pthread_cond_t cond; // 用于在令牌互斥量状态改变时发送通知
}; /* 所有的令牌桶 */
static struct mytbf_st *job[MYTBF_MAX];
/* 用来保护令牌桶数组竞争的互斥量 */
static pthread_mutex_t mut_job = PTHREAD_MUTEX_INITIALIZER;
/* 添加令牌的线程 ID */
static pthread_t tid;
/* 初始化添加令牌的线程 */
static pthread_once_t init_once = PTHREAD_ONCE_INIT; /* 线程处理函数:负责定时向令牌桶中添加令牌 */
static void *thr_alrm(void *p)
{
int i; while()
{
pthread_mutex_lock(&mut_job);
// 遍历所有的桶
for(i = ; i < MYTBF_MAX; i++)
{
// 为可用的桶添加令牌
if(job[i] != NULL)
{
pthread_mutex_lock(&job[i]->mut);
job[i]->token += job[i]->cps;
// 桶中可用的令牌不能超过上限
if(job[i]->token > job[i]->burst)
job[i]->token = job[i]->burst;
// 令牌添加完毕之后,通知所有等待使用令牌的线程准备抢锁
pthread_cond_broadcast(&job[i]->cond);
pthread_mutex_unlock(&job[i]->mut);
}
}
pthread_mutex_unlock(&mut_job); // 等待一秒钟后继续添加令牌
sleep();
} pthread_exit(NULL);
} static void module_unload(void)
{
int i; pthread_cancel(tid);
pthread_join(tid,NULL); pthread_mutex_lock(&mut_job);
for(i = ; i < MYTBF_MAX ; i++)
{
if(job[i] != NULL)
{
// 互斥量和条件变量使用完之后不要忘记释放资源
pthread_mutex_destroy(&job[i]->mut);
pthread_cond_destroy(&job[i]->cond);
free(job[i]);
}
} pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&mut_job); } static void module_load(void)
{
int err; err = pthread_create(&tid,NULL,thr_alrm,NULL);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit();
} atexit(module_unload);
} /*
* 为了不破坏调用者对令牌桶操作的原子性,
* 在该函数内加锁可能会导致死锁,
* 所以该函数内部无法加锁,
* 必须在调用该函数之前先加锁。
*/
static int get_free_pos_unlocked(void)
{
int i; for(i = ; i < MYTBF_MAX; i++)
if(job[i] == NULL)
return i;
return -;
} mytbf_t *mytbf_init(int cps,int burst)
{
struct mytbf_st *me;
int pos; pthread_once(&init_once,module_load); me = malloc(sizeof(*me));
if(me == NULL)
return NULL; me->cps = cps;
me->burst = burst;
me->token = ;
pthread_mutex_init(&me->mut,NULL);
pthread_cond_init(&me->cond,NULL); pthread_mutex_lock(&mut_job); pos = get_free_pos_unlocked();
if(pos < )
{
pthread_mutex_unlock(&mut_job);
free(me);
return NULL;
} me->pos = pos; job[pos] = me; pthread_mutex_unlock(&mut_job); return me;
} static inline int min(int a,int b)
{
return (a < b) ? a : b;
} int mytbf_fetchtoken(mytbf_t *ptr,int size)
{
int n;
struct mytbf_st *me = ptr; if(size < )
return -EINVAL; pthread_mutex_lock(&me->mut);
// 令牌数量不足,等待令牌被添加进来
while(me->token <= )
{
/*
* 原子化的解锁、出让调度器再抢锁以便工作或等待
* 它会等待其它线程发送通知再唤醒
* 放在循环中是因为可能同时有多个线程再使用同一个桶,
* 被唤醒时未必就能拿得到令牌,所以要直到能拿到令牌再出去工作
*/
pthread_cond_wait(&me->cond,&me->mut); // pthread_mutex_unlock(&me->mut);
// sched_yield();
// pthread_mutex_lock(&me->mut);
} n = min(me->token,size); me->token -= n; pthread_mutex_unlock(&me->mut); return n;
} /* 令牌用不完要归还哟,可不能浪费了 */
int mytbf_returntoken(mytbf_t *ptr,int size)
{
struct mytbf_st *me = ptr; // 逗我玩呢?
if(size < )
return -EINVAL; pthread_mutex_lock(&me->mut); me->token += size;
if(me->token > me->burst)
me->token = me->burst; /*
* 令牌归还完毕,通知其它正在等待令牌的线程赶紧起床,准备抢锁
* 这两行谁在上面谁在后面都无所谓
* 如果先发通知再解锁,收到通知的线程发现锁没有释放会等待锁释放再抢;
* 如果先解锁再发通知,反正已经出了临界区了,
* 就算有线程在通知发出之前抢到了锁也不会发生竞争,
* 大不了其它被唤醒的线程起床之后发现没有锁可以抢,那就继续睡呗。
*/
pthread_cond_broadcast(&me->cond);
pthread_mutex_unlock(&me->mut); return size;
} void mytbf_destroy(mytbf_t *ptr)
{
struct mytbf_st *me = ptr; pthread_mutex_lock(&mut_job);
job[me->pos] = NULL;
pthread_mutex_unlock(&mut_job); pthread_mutex_destroy(&me->mut);
pthread_cond_destroy(&me->cond);
free(ptr);
}

大家不难看出这两段代码的差别,把查询法(忙等)修改为通知法(非忙等)仅仅加一个条件变量(pthread_cond_t) 就行了。

条件变量的作用是什么?其实就是让线程以无竞争的形式等待某个条件的发生,当条件发生时通知等待的线程醒来去做某件事。

通知进程醒来有两种方式,一种是仅通知一个线程醒来,如果有多个线程都在等待,那么不一定是哪个线程被唤醒;另一种方式是把所有等待同一个条件的线程都唤醒。

在下面我们会介绍这两种方式,先从条件变量的初始化和销毁开始讨论。

 pthread_cond_destroy, pthread_cond_init - destroy and initialize condition variables

 #include <pthread.h>

 int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

与互斥量一样,条件变量也有两种方式初始化,一种是使用 pthread_cond_init(3) 函数,另一种是使用 PTHREAD_COND_INITIALIZER 宏。这两种方式的使用场景也与互斥量相同,这里就不再赘述了。

条件变量在使用完毕之后不要忘记用 pthread_cond_destroy(3) 函数释放资源,否则会导致内存泄漏!

 pthread_cond_broadcast, pthread_cond_signal -  broadcast  or  signal  a condition

 #include <pthread.h>

 int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

这两个函数就是条件变量的关键操作了,大家注意看。

pthread_cond_signal(3) 函数用于唤醒当前多个等待的线程中的任何一个。虽然名字上有 signal,但是跟系统中的信号没有任何关系。

pthread_cond_broadcast(3) 惊群,将现在正在等待的线程全部唤醒。

 pthread_cond_timedwait, pthread_cond_wait - wait on a condition

 #include <pthread.h>

 int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);

这几个函数与上面的两个函数的作用是成对的,上面的两个函数用于唤醒线程,唤醒什么线程呢?当然是唤醒 _wait() 等待条件满足的线程啦。

当一个线程做某件事之前发现条件不满足,那就使用这几个 _wait() 函数进入等待状态,当某个线程使条件满足了就要用上面的两个函数唤醒等待的线程继续工作了。

pthread_cond_wait(3) 在临界区外阻塞等待某一个条件发生变化,直到有一个通知到来打断它的等待。这种方式是死等。

pthread_cond_timedwait(3) 增加了超时功能的等待,超时之后无论能否拿到锁都返回。这种方式是尝试等。

pthread_cond_wait(3) 相当于下面三行代码的原子操作:

 pthread_mutex_unlock(mutex);

 sched_yield();

 pthread_mutex_lock(mutex);

通常等待会放在一个循环中,就像上面的令牌桶栗子一样,因为可能有多个线程都在等待条件满足,当前的线程被唤醒时不代表执行条件一定满足,可能先被唤醒的线程发现条件满足已经去工作了,等轮到当前线程调度的时候条件可能就又不满足了,所以如果条件不满足需要继续进入等待。

还记得我们上面提到的面试题吗?用锁链实现的疯狂的有序的打印 abcd 5 秒钟。

上面我们说了,锁链的办法并不是这道题的考点,这道题真正的考点其实是使用互斥量 + 条件变量的方式来实现。

你想出来如何实现了吗?自己先写写,写完了再看 LZ 给出的实现。

 #include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h> #define THRNUM 4 static pthread_mutex_t mut_num = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond_num = PTHREAD_COND_INITIALIZER;
static int num = ; static int next(int a)
{
if(a+ == THRNUM)
return ;
return a+;
} static void *thr_func(void *p)
{
int n = (int)p;
int ch = n + 'a'; while()
{
// 先抢锁,能抢到锁就可以获得打印的机会
pthread_mutex_lock(&mut_num);
while(num != n)
{
// 抢到锁但是发现不应该自己打印,那就释放锁再出让调度器,让别人尝试抢锁
pthread_cond_wait(&cond_num,&mut_num);
}
write(,&ch,);
num = next(num);
/*
* 自己打印完了,通知别人你们抢锁吧
* 因为不知道下一个应该运行的线程是谁,
* 所以采用惊群的方式把它们全都唤醒,
* 让它们自己检查是不是该自己运行了。
*/
pthread_cond_broadcast(&cond_num);
pthread_mutex_unlock(&mut_num);
}
pthread_exit(NULL);
} int main()
{
int i,err;
pthread_t tid[THRNUM]; for(i = ; i < THRNUM ; i++)
{
// 直接启动 4 个线程,让它们自己判断自己是否应该运行,而不用提前锁住它们
err = pthread_create(tid+i,NULL,thr_func,(void *)i);
if(err)
{
fprintf(stderr,"pthread_create():%s\n",strerror(err));
exit();
} } alarm(); for(i = ; i < THRNUM ; i++)
pthread_join(tid[i],NULL); exit();
}

代码中注释写得已经比较清晰了,所以 LZ 就不解释了,小伙伴们要是有什么看不懂的就在评论中留言讨论。

那么应该在什么场景中选择使用 pthread_cond_signal(3) 还是使用 pthread_cond_broadcast(3) 呢?

这个其实没有固定的套路,要根据具体的场景来选择。一般只有一个线程在等待或者明确知道哪个线程应该被唤醒的时候使用 _signal() 函数,如果有多个线程在等待并且不确定应该由谁起来工作的时候使用惊群。

LZ 说的不确定是指业务上不能确定哪个线程应该工作,而不是你作为程序猿稀里糊涂的不知道哪个线程该工作。程序猿应该保证了解你的每一行代码在做什么,而不要写出一坨自己都不知道它在做什么的代码。

这段应该补充一个栗子,但是 LZ 忘记案例是什么了,所以先划掉,等 LZ 想起来了再补充上来。

A -> B

1 5c

2 15c

3 5c

至于应该先发通知再解锁还是先解锁再发通知,效果上没有太大的区别,这一点在上面令牌桶的栗子中已经阐述了。

11.下面纯属吐槽:

如果面试的时候问你:处理常规任务时,是采用多线程比较快还是采用多进程比较快?

如果只回答多线程比较快,那么你工资一定多不了。

应该回答常规情况下是多线程较快,因为多进程需要重新布置进程的执行空间,还需要进行数据拷贝以及部分配置,所以会比创建线程慢xx倍。

不要只回答一个大方向就完事了,而是要量化你的答案,这样才能体现出来你在平时学习工作中很注重这些细节问题。

吐槽完毕。。

12.一个进程最多能创建多少个线程

一个进程能够创建多少个线程呢?主要受两个因素影响,一个是 PID 耗尽,一个是我们在之前的博文中画 C 程序地址空间布局时的阴影区域被栈空间占满了 。(不记得那副图了,去前面的博文里找找。)

PID 看上去是进程 ID,但是在之前讨论进程的博文中我们讨论过,内核的最小执行单元其实是线程,实际上是线程在消耗 PID。一个系统中的线程可以有很多,所以 PID 被耗尽也是有可能的。

使用 ulimit(1) 命令可以查看栈空间的大小,阴影区剩余空间的大小 / 栈空间的大小 == 就是我们能创建的线程数量。

大家可以自己写个程序测试一下一个进程最多能够创建多少个线程,然后使用 ulimit(1) 命令修改栈的大小再测试几次,看看能有什么发现。代码很简单,LZ 就不贴出来了。

13.管道的特点

1)管道的同义词是队列;

2)管道是单工的;

3)管道必须凑齐读写双方,如果只有一方,则阻塞等待。

关于管道的详细内容,我们在后面讨论进程间通信(IPC)的时候还会再详细讨论。

上一篇:java反射机制深入详解


下一篇:Spark菜鸟学习营Day5 分布式程序开发