基于NtyCo的协程使用和API

NtyCo的使用和API


NtyCo的源码路径:
NtyCo源码

NtyCo的工作流程

1. 创建协程

当我们需要异步调用的时候,我们会创建一个协程。比如 accept 返回一个新的sockfd,创建一个客户端处理的子过程。 再比如需要监听多个端口的时候,创建一个 server的子过程,这样多个端口同时工作的,是符合微服务的架构的。创建协程的时候, 进行了如何的工作? 创建 API 如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {

	assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
	nty_schedule *sched = nty_coroutine_get_sched();

	if (sched == NULL) {
		nty_schedule_create(0);
		
		sched = nty_coroutine_get_sched();
		if (sched == NULL) {
			printf("Failed to create scheduler\n");
			return -1;
		}
	}

	nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
	if (co == NULL) {
		printf("Failed to allocate memory for new coroutine\n");
		return -2;
	}

	int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
	if (ret) {
		printf("Failed to allocate stack for new coroutine\n");
		free(co);
		return -3;
	}

	co->sched = sched;
	co->stack_size = sched->stack_size;
	co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
	co->id = sched->spawned_coroutines ++;
	co->func = func;
#if CANCEL_FD_WAIT_UINT64
	co->fd = -1;
	co->events = 0;
#else
	co->fd_wait = -1;
#endif
	co->arg = arg;
	co->birth = nty_coroutine_usec_now();
	*new_co = co;

	TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);

	return 0;
}

参数1: nty_coroutine **new_co,传入一个空的协程对象,这个对象是由内部函数创建的,并且在函数返回的时候,会返回一个内部创建的协程对象。为什么需要一个指向指针的指针呢,因为传回来的是一个malloc的指针对象,如果使用单指针,传到函数内部的是一个参数的副本,而函数一结束,副本就会被回收。

参数2: proc_coroutine func, 协程的子过程。 当协程被调度的时候, 就会执行该函数。

参数3: void *arg, 需要传入到新协程中的参数。

协程不存在亲属关系, 都是一致的调度关系, 接受调度器的调度。 调用 create API就会创建一个新协程,新协程就会加入到调度器的就绪队列中。

nty_coroutine_create:创建一个协程,主要流程如下:

  1. 调度器是否存在,pthread_once()在多线程的环境下nty_coroutine_sched_key_creator 调度器也只初始化一次。如果不存在就创建一个。调度器作为全局的单例,将调度器的实例存储在线程的私有空间pthread_setspecific。
  2. 分配一个nty_coroutine的内存空间,分别设置nty_coroutine的数据项,栈空间,栈大小,初始状态,创建时间,子过程回调函数,子过程的调用参数。
  3. 将新分配协程添加到就绪队列ready_queue中。

对ntyco来说,协程一旦创建就不能有用户自己销毁,必须得以子过程执行结束,就会自动销毁协程的上下文数据。以_exec执行入口函数返回而销毁协程的上下文与相关信息。co->func(co->arg) 是子过程,若用户需要长久运行协程,就必须要在func函数里面写入循环等操作。所以NtyCo里面没有实现exit的原语操作。

2. 协程让出CPU(yield)

API:

void nty_coroutine_yield(nty_coroutine *co) {
	co->ops = 0;
	_switch(&co->sched->ctx, &co->ctx);
}

参数: 当前运行的协程实例。调用后该函数不会立即返回,而是切换到最近执行resume的上下文。该函数返回是咋执行resume的时候,会有调度器统一选择resume的,然后再次调用yield的。resume和yield是两个可逆过程的原子操作。

1. 协程的上文切换

首先回顾一下x86_64寄存器的相关知识。x86_64 的寄存器有16个64位寄存器,分别是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,
  %r13, %r14, %r15。
  %rax 作为函数返回值使用的。
  %rsp 栈指针寄存器,指向栈顶
  %rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函数参数,依次对应第1参数,第2参数。。。
  %rbx, %rbp, %r12, %r13, %r14, %r15 用作数据存储,遵循调用者使用规则,换句话说,就是随便用。调用子函数之前要备份它,以防它被修改
  %r10, %r11 用作数据存储,就是使用前要先保存原值。
  上下文切换,就是将CPU的寄存器暂时保存,再将即将运行的协程的上下文寄存器,分别mov到相对应的寄存器上。此时上下文完成切换。如下图所示:
  基于NtyCo的协程使用和API
切换_switch函数定义:

  int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);

参数1:即将运行协程的上下文,寄存器列表
参数2:正在运行协程的上下文,寄存器列表
我们nty_cpu_ctx结构体的定义,为了兼容x86,结构体项命令采用的是x86的寄存器名字命名。

typedef struct _nty_cpu_ctx {
	void *esp; //
	void *ebp;
	void *eip;
	void *edi;
	void *esi;
	void *ebx;
	void *r1;
	void *r2;
	void *r3;
	void *r4;
	void *r5;
} nty_cpu_ctx;

_switch返回后,执行即将运行协程的上下文。是实现上下文的切换

__asm__ (
"    .text                                  \n"
"       .p2align 4,,15                                   \n"
".globl _switch                                          \n"
".globl __switch                                         \n"
"_switch:                                                \n"
"__switch:                                               \n"
"       movq %rsp, 0(%rsi)      # save stack_pointer     \n"
"       movq %rbp, 8(%rsi)      # save frame_pointer     \n"
"       movq (%rsp), %rax       # save insn_pointer      \n"
"       movq %rax, 16(%rsi)                              \n"
"       movq %rbx, 24(%rsi)     # save rbx,r12-r15       \n"
"       movq %r12, 32(%rsi)                              \n"
"       movq %r13, 40(%rsi)                              \n"
"       movq %r14, 48(%rsi)                              \n"
"       movq %r15, 56(%rsi)                              \n"
"       movq 56(%rdi), %r15                              \n"
"       movq 48(%rdi), %r14                              \n"
"       movq 40(%rdi), %r13     # restore rbx,r12-r15    \n"
"       movq 32(%rdi), %r12                              \n"
"       movq 24(%rdi), %rbx                              \n"
"       movq 8(%rdi), %rbp      # restore frame_pointer  \n"
"       movq 0(%rdi), %rsp      # restore stack_pointer  \n"
"       movq 16(%rdi), %rax     # restore insn_pointer   \n"
"       movq %rax, (%rsp)                                \n"
"       ret                                              \n"
);

按照x86_64的寄存器定义,%rdi保存第一个参数的值,即new_ctx的值,%rsi保存第二个参数的值,即保存cur_ctx的值。X86_64每个寄存器是64bit,8byte。
  Movq %rsp, 0(%rsi) 保存在栈指针到cur_ctx实例的rsp项
  Movq %rbp, 8(%rsi)
  Movq (%rsp), %rax #将栈顶地址里面的值存储到rax寄存器中。Ret后出栈,执行栈顶
  Movq %rbp, 8(%rsi) #后续的指令都是用来保存CPU的寄存器到new_ctx的每一项中
  Movq 8(%rdi), %rbp #将new_ctx的值
  Movq 16(%rdi), %rax #将指令指针rip的值存储到rax中
  Movq %rax, (%rsp) # 将存储的rip值的rax寄存器赋值给栈指针的地址的值。
  Ret # 出栈,回到栈指针,执行rip指向的指令。
  上下文环境的切换完成。

3. 恢复协程的运行权(resume)

我们知道,协程让出CPU是通过调度器来实现的,但是调度器是通过什么知道需要resume哪个协程呢?

int nty_coroutine_resume(nty_coroutine *co) {
	
	if (co->status & BIT(NTY_COROUTINE_STATUS_NEW)) {
		nty_coroutine_init(co);
	}

	nty_schedule *sched = nty_coroutine_get_sched();
	sched->curr_thread = co;
	_switch(&co->ctx, &co->sched->ctx);
	sched->curr_thread = NULL;

	nty_coroutine_madvise(co);
#if 1
	if (co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) {
		if (co->status & BIT(NTY_COROUTINE_STATUS_DETACH)) {
			printf("nty_coroutine_resume --> \n");
			nty_coroutine_free(co);
		}
		return -1;
	} 
#endif
	return 0;
}

参数: 需要恢复运行的协程实例
调用该函数后也不会立即返回,而是切换到运行协程实例的yield的位置。返回是在等协程相应事务处理完成后,主动yield会返回到resume的位置。

这里我们要说明一下协程的启动是怎么实现的:
主要有协程的创建,也就是create; 协程加入就绪队列;协程的入口函数(eip–func, esp–>void*stack)。

  1. 先判断要resume协程的状态是不是为NEW,如果是先设置初始状态。
static void nty_coroutine_init(nty_coroutine *co) {

	void **stack = (void **)(co->stack + co->stack_size);

	stack[-3] = NULL;
	stack[-2] = (void *)co;

	co->ctx.esp = (void*)stack - (4 * sizeof(void*));
	co->ctx.ebp = (void*)stack - (3 * sizeof(void*));
	co->ctx.eip = (void*)_exec;
	co->status = BIT(NTY_COROUTINE_STATUS_READY);
	
}

这里主要说下esp, ebp, eip三个寄存器:
esp:寄存器存放当前线程的栈顶指针
ebp:寄存器存放当前线程的栈底指针
eip:寄存器存放下一个CPU指令存放的内存地址,当CPU执行完当前的指令后,从EIP寄存器中读取下一条指令的内存地址,然后继续执行。

void **stack = (void **)(co->stack + co->stack_size);

我们都知道栈的增长是从高地址到低地址,stack 指向栈底。下面这两句是为了_exec函数压栈使用的。

	stack[-3] = NULL;
	stack[-2] = (void *)co;

分别保存协程的栈顶,栈底,和入口函数。

	co->ctx.esp = (void*)stack - (4 * sizeof(void*));
	co->ctx.ebp = (void*)stack - (3 * sizeof(void*));
	co->ctx.eip = (void*)_exec;

基于NtyCo的协程使用和API
_exec函数并不是真正的协程入口函数:

static void _exec(void *lt) {
#if defined(__lvm__) && defined(__x86_64__)
	__asm__("movq 16(%%rbp), %[lt]" : [lt] "=r" (lt));
#endif

	nty_coroutine *co = (nty_coroutine*)lt;
	co->func(co->arg);
	co->status |= (BIT(NTY_COROUTINE_STATUS_EXITED) | BIT(NTY_COROUTINE_STATUS_FDEOF) | BIT(NTY_COROUTINE_STATUS_DETACH));
#if 1
	nty_coroutine_yield(co);
#else
	co->ops = 0;
	_switch(&co->sched->ctx, &co->ctx);
#endif
}

真正执行协程的入口函数是:

co->func(co->arg);

我们接着看resume函数,之后得到当前线程的属性,switch到刚才resume的位置。
调用:

static inline void nty_coroutine_madvise(nty_coroutine *co) {

	size_t current_stack = (co->stack + co->stack_size) - co->ctx.esp;
	assert(current_stack <= co->stack_size);

	if (current_stack < co->last_stack_size &&
		co->last_stack_size > co->sched->page_size) {
		size_t tmp = current_stack + (-current_stack & (co->sched->page_size - 1));
		assert(madvise(co->stack, co->stack_size-tmp, MADV_DONTNEED) == 0);
	}
	co->last_stack_size = current_stack;
}

判断当前栈的大小,当前栈不能超过预定的值,在ntyco的协程中,我们设置的最大栈大小为16k, 但是可以自己去调整。

#define NTY_CO_MAX_STACKSIZE	(16*1024)

最后,协程的yield和resume可以有三种实现 方式:

  1. longjmp 和 setjmp可以实现
  2. ucontext
  3. 汇编实现
上一篇:【unity2D】API-学习记录8-协程Coroutine


下一篇:比物理线程都好用的C++20的协程,你会用吗?