NtyCo的使用和API
NtyCo的源码路径:
NtyCo源码
NtyCo的工作流程
1. 创建协程
当我们需要异步调用的时候,我们会创建一个协程。比如 accept 返回一个新的sockfd,创建一个客户端处理的子过程。 再比如需要监听多个端口的时候,创建一个 server的子过程,这样多个端口同时工作的,是符合微服务的架构的。创建协程的时候, 进行了如何的工作? 创建 API 如下:
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {
assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
nty_schedule *sched = nty_coroutine_get_sched();
if (sched == NULL) {
nty_schedule_create(0);
sched = nty_coroutine_get_sched();
if (sched == NULL) {
printf("Failed to create scheduler\n");
return -1;
}
}
nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
if (co == NULL) {
printf("Failed to allocate memory for new coroutine\n");
return -2;
}
int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
if (ret) {
printf("Failed to allocate stack for new coroutine\n");
free(co);
return -3;
}
co->sched = sched;
co->stack_size = sched->stack_size;
co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
co->id = sched->spawned_coroutines ++;
co->func = func;
#if CANCEL_FD_WAIT_UINT64
co->fd = -1;
co->events = 0;
#else
co->fd_wait = -1;
#endif
co->arg = arg;
co->birth = nty_coroutine_usec_now();
*new_co = co;
TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);
return 0;
}
参数1: nty_coroutine **new_co,传入一个空的协程对象,这个对象是由内部函数创建的,并且在函数返回的时候,会返回一个内部创建的协程对象。为什么需要一个指向指针的指针呢,因为传回来的是一个malloc的指针对象,如果使用单指针,传到函数内部的是一个参数的副本,而函数一结束,副本就会被回收。
参数2: proc_coroutine func, 协程的子过程。 当协程被调度的时候, 就会执行该函数。
参数3: void *arg, 需要传入到新协程中的参数。
协程不存在亲属关系, 都是一致的调度关系, 接受调度器的调度。 调用 create API就会创建一个新协程,新协程就会加入到调度器的就绪队列中。
nty_coroutine_create:创建一个协程,主要流程如下:
- 调度器是否存在,pthread_once()在多线程的环境下nty_coroutine_sched_key_creator 调度器也只初始化一次。如果不存在就创建一个。调度器作为全局的单例,将调度器的实例存储在线程的私有空间pthread_setspecific。
- 分配一个nty_coroutine的内存空间,分别设置nty_coroutine的数据项,栈空间,栈大小,初始状态,创建时间,子过程回调函数,子过程的调用参数。
- 将新分配协程添加到就绪队列ready_queue中。
对ntyco来说,协程一旦创建就不能有用户自己销毁,必须得以子过程执行结束,就会自动销毁协程的上下文数据。以_exec执行入口函数返回而销毁协程的上下文与相关信息。co->func(co->arg) 是子过程,若用户需要长久运行协程,就必须要在func函数里面写入循环等操作。所以NtyCo里面没有实现exit的原语操作。
2. 协程让出CPU(yield)
API:
void nty_coroutine_yield(nty_coroutine *co) {
co->ops = 0;
_switch(&co->sched->ctx, &co->ctx);
}
参数: 当前运行的协程实例。调用后该函数不会立即返回,而是切换到最近执行resume的上下文。该函数返回是咋执行resume的时候,会有调度器统一选择resume的,然后再次调用yield的。resume和yield是两个可逆过程的原子操作。
1. 协程的上文切换
首先回顾一下x86_64寄存器的相关知识。x86_64 的寄存器有16个64位寄存器,分别是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,
%r13, %r14, %r15。
%rax 作为函数返回值使用的。
%rsp 栈指针寄存器,指向栈顶
%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函数参数,依次对应第1参数,第2参数。。。
%rbx, %rbp, %r12, %r13, %r14, %r15 用作数据存储,遵循调用者使用规则,换句话说,就是随便用。调用子函数之前要备份它,以防它被修改
%r10, %r11 用作数据存储,就是使用前要先保存原值。
上下文切换,就是将CPU的寄存器暂时保存,再将即将运行的协程的上下文寄存器,分别mov到相对应的寄存器上。此时上下文完成切换。如下图所示:
切换_switch函数定义:
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
参数1:即将运行协程的上下文,寄存器列表
参数2:正在运行协程的上下文,寄存器列表
我们nty_cpu_ctx结构体的定义,为了兼容x86,结构体项命令采用的是x86的寄存器名字命名。
typedef struct _nty_cpu_ctx {
void *esp; //
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
_switch返回后,执行即将运行协程的上下文。是实现上下文的切换
__asm__ (
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
" movq %rsp, 0(%rsi) # save stack_pointer \n"
" movq %rbp, 8(%rsi) # save frame_pointer \n"
" movq (%rsp), %rax # save insn_pointer \n"
" movq %rax, 16(%rsi) \n"
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"
" movq 56(%rdi), %r15 \n"
" movq 48(%rdi), %r14 \n"
" movq 40(%rdi), %r13 # restore rbx,r12-r15 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n"
" movq %rax, (%rsp) \n"
" ret \n"
);
按照x86_64的寄存器定义,%rdi保存第一个参数的值,即new_ctx的值,%rsi保存第二个参数的值,即保存cur_ctx的值。X86_64每个寄存器是64bit,8byte。
Movq %rsp, 0(%rsi) 保存在栈指针到cur_ctx实例的rsp项
Movq %rbp, 8(%rsi)
Movq (%rsp), %rax #将栈顶地址里面的值存储到rax寄存器中。Ret后出栈,执行栈顶
Movq %rbp, 8(%rsi) #后续的指令都是用来保存CPU的寄存器到new_ctx的每一项中
Movq 8(%rdi), %rbp #将new_ctx的值
Movq 16(%rdi), %rax #将指令指针rip的值存储到rax中
Movq %rax, (%rsp) # 将存储的rip值的rax寄存器赋值给栈指针的地址的值。
Ret # 出栈,回到栈指针,执行rip指向的指令。
上下文环境的切换完成。
3. 恢复协程的运行权(resume)
我们知道,协程让出CPU是通过调度器来实现的,但是调度器是通过什么知道需要resume哪个协程呢?
int nty_coroutine_resume(nty_coroutine *co) {
if (co->status & BIT(NTY_COROUTINE_STATUS_NEW)) {
nty_coroutine_init(co);
}
nty_schedule *sched = nty_coroutine_get_sched();
sched->curr_thread = co;
_switch(&co->ctx, &co->sched->ctx);
sched->curr_thread = NULL;
nty_coroutine_madvise(co);
#if 1
if (co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) {
if (co->status & BIT(NTY_COROUTINE_STATUS_DETACH)) {
printf("nty_coroutine_resume --> \n");
nty_coroutine_free(co);
}
return -1;
}
#endif
return 0;
}
参数: 需要恢复运行的协程实例
调用该函数后也不会立即返回,而是切换到运行协程实例的yield的位置。返回是在等协程相应事务处理完成后,主动yield会返回到resume的位置。
这里我们要说明一下协程的启动是怎么实现的:
主要有协程的创建,也就是create; 协程加入就绪队列;协程的入口函数(eip–func, esp–>void*stack)。
- 先判断要resume协程的状态是不是为NEW,如果是先设置初始状态。
static void nty_coroutine_init(nty_coroutine *co) {
void **stack = (void **)(co->stack + co->stack_size);
stack[-3] = NULL;
stack[-2] = (void *)co;
co->ctx.esp = (void*)stack - (4 * sizeof(void*));
co->ctx.ebp = (void*)stack - (3 * sizeof(void*));
co->ctx.eip = (void*)_exec;
co->status = BIT(NTY_COROUTINE_STATUS_READY);
}
这里主要说下esp, ebp, eip三个寄存器:
esp:寄存器存放当前线程的栈顶指针
ebp:寄存器存放当前线程的栈底指针
eip:寄存器存放下一个CPU指令存放的内存地址,当CPU执行完当前的指令后,从EIP寄存器中读取下一条指令的内存地址,然后继续执行。
void **stack = (void **)(co->stack + co->stack_size);
我们都知道栈的增长是从高地址到低地址,stack 指向栈底。下面这两句是为了_exec函数压栈使用的。
stack[-3] = NULL;
stack[-2] = (void *)co;
分别保存协程的栈顶,栈底,和入口函数。
co->ctx.esp = (void*)stack - (4 * sizeof(void*));
co->ctx.ebp = (void*)stack - (3 * sizeof(void*));
co->ctx.eip = (void*)_exec;
_exec函数并不是真正的协程入口函数:
static void _exec(void *lt) {
#if defined(__lvm__) && defined(__x86_64__)
__asm__("movq 16(%%rbp), %[lt]" : [lt] "=r" (lt));
#endif
nty_coroutine *co = (nty_coroutine*)lt;
co->func(co->arg);
co->status |= (BIT(NTY_COROUTINE_STATUS_EXITED) | BIT(NTY_COROUTINE_STATUS_FDEOF) | BIT(NTY_COROUTINE_STATUS_DETACH));
#if 1
nty_coroutine_yield(co);
#else
co->ops = 0;
_switch(&co->sched->ctx, &co->ctx);
#endif
}
真正执行协程的入口函数是:
co->func(co->arg);
我们接着看resume函数,之后得到当前线程的属性,switch到刚才resume的位置。
调用:
static inline void nty_coroutine_madvise(nty_coroutine *co) {
size_t current_stack = (co->stack + co->stack_size) - co->ctx.esp;
assert(current_stack <= co->stack_size);
if (current_stack < co->last_stack_size &&
co->last_stack_size > co->sched->page_size) {
size_t tmp = current_stack + (-current_stack & (co->sched->page_size - 1));
assert(madvise(co->stack, co->stack_size-tmp, MADV_DONTNEED) == 0);
}
co->last_stack_size = current_stack;
}
判断当前栈的大小,当前栈不能超过预定的值,在ntyco的协程中,我们设置的最大栈大小为16k, 但是可以自己去调整。
#define NTY_CO_MAX_STACKSIZE (16*1024)
最后,协程的yield和resume可以有三种实现 方式:
- longjmp 和 setjmp可以实现
- ucontext
- 汇编实现