引言
生产者消费者模型
1 #include <unistd.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <queue>
5 #include "co_routine.h"
6 using namespace std;
7 struct stTask_t
8 {
9 int id;
10 };
11 struct stEnv_t
12 {
13 stCoCond_t* cond;
14 queue<stTask_t*> task_queue;
15 };
16 void* Producer(void* args)
17 {
18 co_enable_hook_sys();
19 stEnv_t* env= (stEnv_t*)args;
20 int id = 0;
21 while (true)
22 {
23 stTask_t* task = (stTask_t*)calloc(1, sizeof(stTask_t));
24 task->id = id++;
25 env->task_queue.push(task);
26 printf("%s:%d produce task %d\n", __func__, __LINE__, task->id);
27 co_cond_signal(env->cond);
28 // poll中数字的调整可以调整交替速度,因为这个数字代表了在epoll中的超时时间,也就是什么时候生产者执行
29 // 可以简单的理解为生产者的生产速度,timeout越大,生产速度越慢
30 poll(NULL, 0, 1000);
31 }
32 return NULL;
33 }
34 void* Consumer(void* args)
35 {
36 printf("进入consumer\n");
37 co_enable_hook_sys();
38 stEnv_t* env = (stEnv_t*)args;
39 while (true)
40 {
41 if (env->task_queue.empty())
42 {
43 co_cond_timedwait(env->cond, -1);
44 continue;
45 }
46 // 操作队列的时候没有加锁
47 stTask_t* task = env->task_queue.front();
48 env->task_queue.pop();
49 printf("%s:%d consume task %d\n", __func__, __LINE__, task->id);
50 free(task);
51 }
52 return NULL;
53 }
54
55 /*
56 * 主协程是跟 stCoRoutineEnv_t 一起创建的。主协程也无需调用 resume 来启动,
57 * 它就是程序本身,就是 main 函数。主协程是一个特殊的存在
58 *
59 * 在程序首次调用 co_create() 时,此函数内部会判断当前进程(线程)的 stCoRoutineEnv_t 结构是否已分配,
60 * 如果未分配则分配一个,同时分配一个 stCoRoutine_t 结构,并将 pCallStack[0] 指向主协程。
61 * 此后如果用 co_resume() 启动协程,又会将 resume 的协程压入 pCallStack 栈
62 */
63
64 int main()
65 {
66 stEnv_t* env = new stEnv_t;
67 env->cond = co_cond_alloc();
68
69 stCoRoutine_t* consumer_routine; // 一个协程的结构
70 // 协程的创建函数于pthread_create很相似
71 //1.指向线程表示符的指针,设置线程的属性(栈大小和指向共享栈的指针,使用共享栈模式),线程运行函数的其实地址,运行是函数的参数
72 co_create(&consumer_routine, NULL, Consumer, env);// 创建一个协程
73 // 协程在创建以后并没有运行 使用resume运行
74 co_resume(consumer_routine);
75 stCoRoutine_t* producer_routine;
76 co_create(&producer_routine, NULL, Producer, env);
77 co_resume(producer_routine);
78
79 // 没有使用pthread_join 而是使用co_eventloop
80 co_eventloop(co_get_epoll_ct(), NULL, NULL);
81 return 0;
82 }
首先这个代码展示了一个协程的生产者消费者模型,与线程的实现不同,协程实现的生产者消费者模型不需要加锁,因为究其本质两个协程不过是串行的执行而已。首先libco的协程为了使得程序员能够更好的接收,使用了和posix标准的线程创建几乎一样的方法,即co_create,这与pthread_create的参数是基本一致的。我们来看看如何创建一个协程:
1 stCoRoutine_t* consumer_routine; // 一个协程的结构
2
3 co_create(&consumer_routine, NULL, Consumer, env);// 创建一个协程,即初始化协程结构
4 // 协程在创建以后并没有运行 使用resume运行
5 co_resume(consumer_routine);
可以看到这段代码中有一个stCoRoutine_t结构,这个结构实际上就是就是协程的主体结构,存储着一个协程相关的数据。在co_create创建一个协程以后协程是没有运行的,这点和线程并不一样,如果想要使协程运行的话,还需要执行co_resume才可以。
stCoRoutine_t结构的内容
1 // libco的协程一旦创建之后便和创建它的线程绑定在一起 不支持线程之间的迁移
2 struct stCoRoutine_t
3 {
4 stCoRoutineEnv_t *env; // 协程的执行环境,运行在同一个线程上的各协程是共享该结构
5 pfn_co_routine_t pfn; // 结构为一个函数指针 实际待执行的协程函数
6
7
8 void *arg; // 参数
9 // 用于协程切换时保存 CPU 上下文(context)的,即 esp、ebp、eip 和其他通用寄存器的值
10 coctx_t ctx;
11
12 // 一些状态和标志变量
13 char cStart; // 协程是否执行过resume
14 char cEnd;
15 char cIsMain; //是否为主协程 在co_init_curr_thread_env修改
16 char cEnableSysHook; //此协程是否hook库函数
17 char cIsShareStack; // 是否开启共享栈模式
18
19 // 保存程序系统环境变量的指针
20 void *pvEnv;
21
22
23 //这里也可以看出libco协程是stackful的,也有一些库的实现是stackless,即无栈协程
24 // char sRunStack[ 1024 * 128 ];
25 // 协程运行时的栈内存
26 stStackMem_t* stack_mem;
27
28 /**
29 * 一个协程实际占用的(从 esp 到栈底)栈空间,相比预分配的这个栈大小
30 * (比如 libco 的 128KB)会小得多;这样一来, copying stack
31 * 的实现方案所占用的内存便会少很多。当然,协程切换时拷贝内存的开销
32 * 有些场景下也是很大的。因此两种方案各有利弊,而 libco 则同时实现
33 * 了两种方案,默认使用前者
34 */
35
36 // save satck buffer while confilct on same stack_buffer;
37 // 当使用共享栈的时候需要用到的一些数据结构
38 char* stack_sp;
39 unsigned int save_size;
40 char* save_buffer;
41
42 stCoSpec_t aSpec[1024];
43
44 };
内容分析
- env是一个非常关键的结构,这个结构是所有数据中最特殊的一个,因为它是一个线程内共享的结构,也就是说同一个线程创建的所有协程的此结构指针指向同一个数据。其中存放了一些协程调度相关的数据,当然叫调度有些勉强,因为libco实现的非对称式协程实际上没有什么调度策略,完全就是协程切换会调用这个协程的协程或者线程。
- pfn是一个函数指针,类型为function<void*(void*)>,当然libco虽然是用C++写的,但是整体风格偏向于C语言,所以实际结构是一个函数指针。值得一提的是实际存储的函数指针并不是我们传入的函数指针,而是一个使用我们传入的函数指针的一个函数,原因是当协程执行完毕的时候需要切换CPU执行权,这样可以做到最小化入侵用户代码。
- arg没什么说的,传入的指针的参数。
- ctx保存协程的上下文,实际就是寄存器的值,不管是C还是C++都没有函数可以直接接触寄存器,所以操作这个参数的时候需要嵌入一点汇编代码。
- 紧接着是五个标记位,功能注释中写的很清楚啦。
- pvEnv保存着环境变量相关,这个环境变量其实是与hook后的setenv,getenv类函数有关。和上面说的env没有什么关系。
- stack_mem是运行时栈的结构,libco提供了两种方式,一个是每个协程拥有一个独立的栈,默认分配128KB空间,缺点是每个协程可能只用到了1KB不到,碎片较多。还有一种是共享栈模式,需要我们在创建协程的时候在Co_create中指定第二个参数,这种方法是多个协程共用一个栈,但是在协程切换的时候需要拷贝已使用的栈空间。
- 剩下的就是一些在共享栈时要用到的参数了。
coctx_t结构:
1 struct coctx_t
2 {
3 #if defined(__i386__)
4 void *regs[ 8 ];
5 #else
6 void *regs[ 14 ];
7 #endif // 保存上下文
8 size_t ss_size; // 栈的大小
9 char *ss_sp; // 栈顶指针esp
10
11 };
其中regs就是保存寄存器的值。在32位机器下保存八个寄存器,在64位下保存14个寄存器。我们知道X86架构下有8个通用寄存器,X64则有16个寄存器,那么为什么64位只使用保存14个寄存器呢?我们可以在coctx_swap.S中看到64位下缺少了对%r10, %r11寄存器的备份,
x86-64的16个64位寄存器分别是:%rax, %rbx, %rcx, %rdx, %esi, %edi, %rbp, %rsp, %r8-%r15。其中:
- %rax 作为函数返回值使用
- %rsp栈指针寄存器,指向栈顶
- %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,依次对应第1参数,第2参数
- %rbx,%rbp,%r12,%r13,%14,%15 用作数据存储,遵循被调用者保护规则,简单说就是随便用,调用子函数之前要备份它,以防被修改
- %r10,%r11 用作数据存储,遵循调用者保护规则,简单说就是使用之前要先保存原值
我们来看看两个陌生的名词调用者保护&被调用者保护:
- 调用者保护:表示这些寄存器上存储的值,需要调用者(父函数)自己想办法先备份好,否则过会子函数直接使用这些寄存器将无情的覆盖。如何备份?当然是实现压栈(pushl),等子函数调用完成,再通过栈恢复(popl)
- 被调用者保护:即表示需要由被调用者(子函数)想办法帮调用者(父函数)进行备份
stCoRoutineEnv_t结构:
1 /*
2 1. 每当启动(resume)一个协程时,就将它的协程控制块 stCoRoutine_t 结构指针保存在 pCallStack 的“栈顶”,
3 2. 然后“栈指针” iCallStackSize 加 1,最后切换 context 到待启动协程运行。当协程要让出(yield)CPU 时,
4 3. 就将它的 stCoRoutine_t从pCallStack 弹出,“栈指针” iCallStackSize 减 1,
5 4. 然后切换 context 到当前栈顶的协程(原来被挂起的调用者)恢复执
6 */
7 // stCoRoutineEnv_t结构一个线程只有一个
8 struct stCoRoutineEnv_t
9 {
10 // 如果将协程看成一种特殊的函数,那么这个 pCallStack 就时保存这些函数的调用链的栈。
11 // 非对称协程最大特点就是协程间存在明确的调用关系;甚至在有些文献中,启动协程被称作 call,
12 // 挂起协程叫 return。非对称协程机制下的被调协程只能返回到调用者协程,这种调用关系不能乱,
13 // 因此必须将调用链保存下来
14 stCoRoutine_t *pCallStack[ 128 ];
15 int iCallStackSize; // 上面那个调用栈的栈顶指针
16 // epoll的一个封装结构
17 stCoEpoll_t *pEpoll;
18
19 // for copy stack log lastco and nextco
20 // 对上次切换挂起的协程和嵌套调用的协程栈的拷贝,为了减少共享栈上数据的拷贝
21 // 在不使用共享栈模式时 pending_co 和 ocupy_co 都是空指针
22 // pengding是目前占用共享栈的协程
23 // 想想看,如果不加的话,我们需要O(N)的时间复杂度分清楚Callback中current上一个共享栈的协程实体(可能共享栈与默认模式混合)
24 stCoRoutine_t* pending_co;
25 // 与pending在同一个共享栈上的上一个协程
26 stCoRoutine_t* occupy_co;
27 };
内容分析
- pCallStack结构是一个非常重要的结构,这个名字起的非常有意思,很贴切,因为这就是一个调用栈,它存储着协程的调用栈,举个例子,主协程A调用协程B,协程B的函数中又调用协程C,这个时候pCallStack中存储的数据就是[A,B,C],拿我们前面举过的生产者消费者模型距离,把生产者当做B,消费者当做C,主协程当做A,pCallStack的结构就在[A,B],[A,C],间切换。简单来说每一项前面存储着调用这个协程的协程,最少有一个元素,即主协程。
- pEpoll,一个封装的epoll。
- 剩下两个结构与共享栈相关,存储着与当前运行线程使用同一个栈的线程,因为共享栈可能有多个,参见sharestack结构中栈结构其实是个数组。
转载:
https://blog.csdn.net/weixin_43705457/article/details/106863859