本文分析的state-threads的版本是1.9
。
以下正在写作中。。。
srs源码分析7-create stream
srs源码分析8-推流-publish
srs源码分析9-推流-unpublish
srs源码分析10-拉流-play
srs源码分析11-拉流-pause
srs源码分析12-转发-forward
srs是基于协程开发的,底层使用了state_threads协程库。为了更好的理解srs,所以需要先熟悉state_threads。这里并不会介绍协程的相关概念,只是简单的介绍一下state_threads的核心逻辑。
以下state_thread会被简称为st。
使用示例-echo server
使用st实现了一个简单的echo服务器,以下代码写的很简单,重点是理解st的使用。
#include <arpa/inet.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <st.h>
#define LISTEN_PORT 9000
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(-1); \
} while (0)
void *client_thread(void *arg) {
st_netfd_t client_st_fd = (st_netfd_t)arg;
int client_fd = st_netfd_fileno(client_st_fd);
sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len);
if (ret == -1) {
printf("[WARN] Failed to get client ip: %s\n", strerror(ret));
}
char ip_buf[INET_ADDRSTRLEN];
bzero(ip_buf, sizeof(ip_buf));
inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf,
sizeof(ip_buf));
while (1) {
char buf[1024] = {0};
ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT);
if (ret == -1) {
printf("client st_read error\n");
break;
} else if (ret == 0) {
printf("client quit, ip = %s\n", ip_buf);
break;
}
printf("recv from %s, data = %s", ip_buf, buf);
ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT);
if (ret == -1) {
printf("client st_write error\n");
}
}
}
void *listen_thread(void *arg) {
while (1) {
st_netfd_t client_st_fd =
st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT);
if (client_st_fd == NULL) {
continue;
}
printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd));
st_thread_t client_tid =
st_thread_create(client_thread, (void *)client_st_fd, 0, 0);
if (client_tid == NULL) {
printf("Failed to st create client thread\n");
}
}
}
int main() {
int ret = st_set_eventsys(ST_EVENTSYS_ALT);
if (ret == -1) {
printf("st_set_eventsys use linux epoll failed\n");
}
ret = st_init();
if (ret != 0) {
printf("st_init failed. ret = %d\n", ret);
return -1;
}
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
ERR_EXIT("socket");
}
int reuse_socket = 1;
ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket,
sizeof(int));
if (ret == -1) {
ERR_EXIT("setsockopt");
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(LISTEN_PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
ret =
bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr));
if (ret == -1) {
ERR_EXIT("bind");
}
ret = listen(listen_fd, 128);
if (ret == -1) {
ERR_EXIT("listen");
}
st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd);
if (!st_listen_fd) {
printf("st_netfd_open_socket open socket failed.\n");
return -1;
}
st_thread_t listen_tid =
st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0);
if (listen_tid == NULL) {
printf("Failed to st create listen thread\n");
}
while (1) {
st_sleep(1); /*用于让出CPU执行权,重新调度就绪的协程。*/
}
return 0;
}
root@learner:~/tmp/st# gcc main.cpp -lst
root@learner-Lenovo:~/tmp/st# ./a.out
get a new client, fd = 4
recv from 192.168.30.17, data = hello world
client quit, ip = 192.168.30.17
^C
root@learner:~# nc 192.168.30.17 9000
hello world
hello world
^C
创建一个listen协程,用于监听客户端的连接,当客户端连接服务后,会为此客户端创建一个client协程,用于处理此客户端的所有请求。
协程的切换
st中协程的切换提供了两种方式:一种是使用系统提供的setjmp
和longjmp
接口,另一种是使用汇编实现的_st_md_cxt_save
和_st_md_cxt_restore
接口,这两个函数从用法上同setjmp和longjmp。
这两种方式的切换本质上都是栈帧的切换。
setjmp和longjmp
C语言中的goto语句只能在当前函数内跳转,而不能在函数间跳转。setjmp()和longjmp()可以执行非局部跳转,即跳转的目标为当前执行函数之外的某个位置。
setjmp()函数为后续由longjmp()调用执行的跳转确立了跳转目标,该目标正是程序发起setjmp()调用的位置。从编程角度看来,调用longjmp()函数后,看起来就和从第二次调用setjmp()返回时完全一样。通过setjmp()的返回值,可以区分setjmp()调用是初始返回还是第二次返回。初始调用返回值为0,后续“伪返回”的返回值为longjmp()调用中val参数所指定的任意值。通过对val参数使用不同值,能够区分程序中跳转至同一目标的不同起跳位置。更多相关setjmp()、longjmp()的介绍,可以参考《Linux/UNIX系统编程手册》上册第106页。
以下是从《Linux/UNIX系统编程手册》摘抄的示例:
#include <stdio.h>
#include <stdlib.h>
#include <setjmp.h>
jmp_buf env;
void f2(int num)
{
longjmp(env, num);
}
void f1(int num)
{
if(num == 1){
longjmp(env, num);
}
f2(num);
}
int main(int argc, char** argv)
{
if(argc != 2){
printf("Usage: %s [1|2]\n", argv[0]);
return -1;
}
switch(setjmp(env)){
case 0:
printf("Calling f1() after initial setjmp()\n");
f1(atoi(argv[1]));
break;
case 1:
printf("We jumped back from f1()\n");
break;
case 2:
printf("We jumped back from f2()\n");
break;
}
return 0;
}
这个示例我稍微做了一些修改,运行结果及分析如下:
root@learner:~/tmp# ./a.out 1
Calling f1() after initial setjmp()
We jumped back from f1()
root@learner:~/tmp# ./a.out 2
Calling f1() after initial setjmp()
We jumped back from f2()
_st_md_cxt_save和_st_md_cxt_restore
这两个函数是通过汇编实现的,代码如下:
#define JB_BX 0
#define JB_SI 1
#define JB_DI 2
#define JB_BP 3
#define JB_SP 4
#define JB_PC 5
.file "md.S"
.text
/* _st_md_cxt_save(__jmp_buf env) 存储函数栈帧 */
.globl _st_md_cxt_save
.type _st_md_cxt_save, @function
.align 16
_st_md_cxt_save:
movl 4(%esp), %eax /*取得参数env的地址,保存到eax中。*/
movl %ebx, (JB_BX*4)(%eax) /*保存ebx*/
movl %esi, (JB_SI*4)(%eax) /*保存esi*/
movl %edi, (JB_DI*4)(%eax) /*保存edi*/
/*保存esp,即栈顶,保存的栈顶是没有调用_st_md_cxt_save()函数之前的栈顶*/
leal 4(%esp), %ecx /
movl %ecx, (JB_SP*4)(%eax) /*保存ecx*/
movl 0(%esp), %ecx
movl %ecx, (JB_PC*4)(%eax) /*保存引用计数器pc*/
movl %ebp, (JB_BP*4)(%eax) /*保存ebp 即调用_st_md_cxt_save()的函数的ebp*/
xorl %eax, %eax /*清空eax 作为_st_md_cxt_save()的返回值*/
ret
.size _st_md_cxt_save, .-_st_md_cxt_save
/* _st_md_cxt_restore(__jmp_buf env, int val) 恢复函数栈帧 */
.globl _st_md_cxt_restore
.type _st_md_cxt_restore, @function
.align 16
_st_md_cxt_restore:
movl 4(%esp), %ecx /*获取第一个参数的地址,即env的地址。*/
movl 8(%esp), %eax /*获取第二个参数的地址,即val的地址。*/
movl (JB_PC*4)(%ecx), %edx /*将原pc寄存器的值保存到edx中*/
movl (JB_BX*4)(%ecx), %ebx /*恢复ebx*/
movl (JB_SI*4)(%ecx), %esi /*恢复esi*/
movl (JB_DI*4)(%ecx), %edi /*恢复edi*/
movl (JB_BP*4)(%ecx), %ebp /*恢复ebp*/
movl (JB_SP*4)(%ecx), %esp /*恢复esp*/
testl %eax, %eax /*测试eax的值是否为0,也就是第二个参数是否为0。*/
jnz 1f /*如果第二个参数不为0,则直接跳转到1:执行。*/
incl %eax /*将返回值置为1*/
1: jmp *%edx /*跳转到之前pc处*/
.size _st_md_cxt_restore, .-_st_md_cxt_restore
_st_md_cxt_save(__jmp_buf env)用于保存栈帧,_st_md_cxt_restore(__jmp_buf env, int val)用于恢复栈帧。
st中协程的切换宏
#if defined(MD_USE_BUILTIN_SETJMP) && !defined(USE_LIBC_SETJMP)
#define MD_SETJMP(env) _st_md_cxt_save(env)
#define MD_LONGJMP(env, val) _st_md_cxt_restore(env, val)
extern int _st_md_cxt_save(jmp_buf env);
extern void _st_md_cxt_restore(jmp_buf env, int val);
#else
#define MD_SETJMP(env) setjmp(env)
#define MD_LONGJMP(env, val) longjmp(env, val)
#endif
如果定义了MD_USE_BUILTIN_SETJMP
宏,且没有定义USE_LIBC_SETJMP
宏,则使用自定义的栈帧存取函数。否则使用系统提供的setjmp和longjmp切换栈帧。
#define _ST_SWITCH_CONTEXT(_thread) \
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \
if (!MD_SETJMP((_thread)->context)) { \ /*调出协程返回0,调入协程返回1。*/
_st_vp_schedule(); \ /*选择下一个需要调度的协程*/
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \
ST_END_MACRO
_ST_SWITCH_CONTEXT
用于将协程的CPU执行权让出去,重新调度一个新的协程。
当协程调用_ST_SWITCH_CONTEXT
时,此时MD_SETJMP会返回0,则进入协程调度函数_st_vp_schedule(),CPU的执行权转移到其他协程。此时相当于在本协程中打上了一个切换点。当本协程将再次获得CPU执行权时,在_st_vp_schedule()中调用_ST_RESTORE_CONTEXT宏函数,会通过MD_SETJMP再次返回,此时返回值为1,跳过if语句返回到本协程调用_ST_SWITCH_CONTEXT的位置,继续往下执行。
#define _ST_RESTORE_CONTEXT(_thread) \
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \ /*标记此协程为当前运行的协程*/
MD_LONGJMP((_thread)->context, 1); \ /*执行协程切换 恢复之前挂起的协程*/
ST_END_MACRO
_ST_RESTORE_CONTEXT
用于恢复指定的协程,通过MD_LONGJMP宏,返回到MD_SETJMP打的断点处,从MD_SETJMP再次返回,从而再次获取到CPU的执行权。
void _st_vp_schedule(void)
{
_st_thread_t *thread;
/*从就绪的协程队列中取出一个协程*/
if (_ST_RUNQ.next != &_ST_RUNQ) {
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread); /*从就绪协程队列删除*/
} else { /*如果就绪的协程队列为空,说明所有的就绪协程都处理完毕了。*/
thread = _st_this_vp.idle_thread; /*现在切换至idle协程*/
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE); /*该协程必须处于可运行状态*/
thread->state = _ST_ST_RUNNING; /*将即将运行协程的状态标记为正在运行*/
_ST_RESTORE_CONTEXT(thread); /*切换至新的协程*/
}
在切换协程时,会从就绪的协程队列中取出一个协程,然后切换至该协程。如果就绪队列中没有可切换的协程,则说明没有协程需要处理,此时会切换至idle协程。返回idle协程后,会重新进入epoll_wait,重新开始监听待发生的事件和处理定时事件。
调度器
所有的协程都是在一个单线程中执行的,所以需要有一个调度器来调度所有的协程,以便需要执行权限的协程能够获取到CPU。通常协程在发生读事件
、写事件
、定时器事件
时才需要执行权限,也就是发生这些事件后,需要将协程调度到CPU上,让其获得CPU的执行权,处理对应的事情。
st中对读写事件的监控是通过epoll实现的,而定时器事件通过最小堆配合epoll的超时实现的。
typedef struct _st_eventsys_ops {
const char *name; /* Name of this event system */
int val; /* Type of this event system */
int (*init)(void); /* Initialization */
void (*dispatch)(void); /* Dispatch function */
int (*pollset_add)(struct pollfd *, int); /* Add descriptor set */
void (*pollset_del)(struct pollfd *, int); /* Delete descriptor set */
int (*fd_new)(int); /* New descriptor allocated */
int (*fd_close)(int); /* Descriptor closed */
int (*fd_getlimit)(void); /* Descriptor hard limit */
} _st_eventsys_t;
这是调度器的接口,可以使用epoll实现,也可以使用select和poll实现。
static _st_eventsys_t _st_epoll_eventsys = {
"epoll",
ST_EVENTSYS_ALT,
_st_epoll_init,
_st_epoll_dispatch,
_st_epoll_pollset_add,
_st_epoll_pollset_del,
_st_epoll_fd_new,
_st_epoll_fd_close,
_st_epoll_fd_getlimit
};
st中通过epoll实现了调度器,实现的这些函数作为回调函数封装到了结构体中。
ST_HIDDEN void _st_epoll_dispatch(void)
{
...
if (_ST_SLEEPQ == NULL) {
/* 定时队列为空,说明没有定时器事件,则epoll_wait的超时时间为-1,
即没有事件触发时,epoll_wait一直阻塞。*/
timeout = -1;
} else {
/*从定时队列获取最小定时器*/
min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
/*换算epoll_wait的超时时间 单位:us */
timeout = (int) (min_timeout / 1000);
...
}
/*进入epoll等待事件的触发,也可能因为超时而退出。*/
nfd = epoll_wait(..., ..., ..., timeout);
...
pq->thread->state = _ST_ST_RUNNABLE; /*把协程的状态设置为可运行状态*/
_ST_ADD_RUNQ(pq->thread); /*将协程添加到运行队列,等待新一轮的调度。*/
...
}
在进入epoll_wait之前,先从最小堆中获取最近一个定时器触发的时间,将此时间作为epoll_wait的超时时间,如果在这个超时时间之内发生了读写事件,则epoll_wait返回处理读写事件;如果段超时时间之内没有发生读写事件,epoll_wait会因为超时而退出,此时返回正好处理定时事件。
若不是因为超时而从epoll_wait返回,说明有的协程读写事件触发了,此时需要将触发事件的协程保存到可运行队列中,等待新一轮的调度。
创建协程
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
_st_thread_t *thread;
_st_stack_t *stack;
void **ptds;
char *sp;
/* Adjust stack size 调整栈大小*/
if (stk_size == 0)
stk_size = ST_DEFAULT_STACK_SIZE; /*默认栈大小是128KB*/
/*页大小对齐*/
stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
/*申请栈空间*/
stack = _st_stack_new(stk_size);
if (!stack)
return NULL;
sp = stack->stk_top; /*栈顶*/
sp = sp - (ST_KEYS_MAX * sizeof(void *));/*栈顶空出一块区域,用于存放私有的数据。*/
ptds = (void **) sp;
sp = sp - sizeof(_st_thread_t); /*再空出一个_st_thread_t大小*/
thread = (_st_thread_t *) sp;
if ((unsigned long)sp & 0x3f)
sp = sp - ((unsigned long)sp & 0x3f);
stack->sp = sp - _ST_STACK_PAD_SIZE; /*栈顶再空出128字节的填充区域*/
memset(thread, 0, sizeof(_st_thread_t));
memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
thread->private_data = ptds; /*指向协程私有数据*/
thread->stack = stack; /*指向协程栈*/
thread->start = start; /*协程入口函数*/
thread->arg = arg; /*入口函数参数*/
/*保存切换上下文,打上还原点,当本协程下次获取到执行权限时,从这个还原点接着执行。*/
_ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
/*如果需要主动回收协程,则需要协程创建一个条件变量,用于阻塞等待回收协程。*/
if (joinable) {
thread->term = st_cond_new();
if (thread->term == NULL) {
_st_stack_free(thread->stack);
return NULL;
}
}
thread->state = _ST_ST_RUNNABLE; /*标记协程为可运行状态*/
_st_active_count++; /*增加活跃协程的个数*/
_ST_ADD_RUNQ(thread); /*将协程插入到运行队列*/
return thread;
}
创建一个新的协程,在创建的过程中,会将这个协程放到可运行队列,等待着调度。在调度到这个新的协程时,就可以获得CPU的执行权。
除了主协程外,其他协程的栈都是在堆上申请的空间,默认大小时128KB。
#define _ST_INIT_CONTEXT MD_INIT_CONTEXT
#define MD_INIT_CONTEXT(_thread, _sp, _main) \
ST_BEGIN_MACRO \
if (MD_SETJMP((_thread)->context)) \ /*设置还原点,或从还原点返回。*/
_main(); \
MD_GET_SP(_thread) = (long) (_sp); \ /*设置ctx中sp寄存器的值,设置新的栈帧*/
ST_END_MACRO
在创建新协程时,会通过上面的宏函数设置还原点,当执行到MD_SETJMP时,会返回0,此时_main()函数不会被执行。当协程再次获取执行权时,会再次从MD_SETJMP返回,此时返回值为1,则进入_main()函数,也就是_st_thread_main()
函数。
void _st_thread_main(void)
{
_st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程的句柄*/
MD_CAP_STACK(&thread);
thread->retval = (*thread->start)(thread->arg); /*执行协程入口函数*/
st_thread_exit(thread->retval); /*协程退出*/
}
新的协程创建后,并不会立即被执行,需要先打上还原点,然后放入可执行队列中。当调度器调度到这个新线程后才会真正获取到CPU的执行权,在MD_SETJMP返回后,进入这个函数,在此函数中才会进入协程的入口函数。协程入口函数处理完毕后,会进入协程退出函数,这个稍后分析。
st的初始化
int st_init(void)
{
_st_thread_t *thread;
if (_st_active_count) { /*如果已经初始化,则直接返回。*/
return 0;
}
st_set_eventsys(ST_EVENTSYS_DEFAULT); /*设置epoll封装的接口 */
if (_st_io_init() < 0)
return -1;
memset(&_st_this_vp, 0, sizeof(_st_vp_t));
/*三个队列的初始化*/
ST_INIT_CLIST(&_ST_RUNQ); /*可运行队列*/
ST_INIT_CLIST(&_ST_IOQ); /*io队列*/
ST_INIT_CLIST(&_ST_ZOMBIEQ); /*僵尸态队列*/
if ((*_st_eventsys->init)() < 0) /*epoll的初始化*/
return -1;
_st_this_vp.pagesize = getpagesize(); /*页大小*/
_st_this_vp.last_clock = st_utime(); /*时钟时间*/
/* 创建一个idle协程 */
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0);
if (!_st_this_vp.idle_thread)
return -1;
_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; /*标识为idle协程*/
_st_active_count--;
_ST_DEL_RUNQ(_st_this_vp.idle_thread); /*从可运行队列中删除idle协程*/
/*为主协程封装一个_st_thread_t */
thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *)));
if (!thread)
return -1;
thread->private_data = (void **) (thread + 1); /*指向协程私有数据*/
thread->state = _ST_ST_RUNNING; /*设置协程为可运行状态*/
thread->flags = _ST_FL_PRIMORDIAL; /*标识为主协程*/
_ST_SET_CURRENT_THREAD(thread); /*设置当前工作的协程*/
_st_active_count++; /*增加活跃协程个数*/
return 0;
}
在使用st时,首先需要调用st_init()函数对st进行初始化。这个函数有三个作用:1、做一些初始化工作 2、创建idle协程 3、将主线程封装为主协程
主线程也是一条可执行流,需要将主线程封装成主协程,以便能够在调度器中进行调度。
idle协程是非常核心的,当就绪队列中没有可运行的协程时,会将CPU的执行权限调度到idle协程。在idle协程中重新开始监听读、写、定时器事件。
void *_st_idle_thread_start(void *arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {
_ST_VP_IDLE(); /*进入epoll_wait,监听读写事件*/
_st_vp_check_clock(); /*处理定时器事件*/
me->state = _ST_ST_RUNNABLE; /*将idle线程标记为可运行状态*/
_ST_SWITCH_CONTEXT(me); /*让出CPU执行权,重新开始调度。*/
}
exit(0);
return NULL;
}
当就绪队列为空时,调度会进入idle线程,在idle线程中,会进入epoll_wait监听读写事件,有读写事件触发时,会将协程保存到就绪队列中;从epoll_wait返回后,查看是否有定时器触发,若有定时器触发,则将协程保存到就绪队列中。处理完读写事件和定时器事件后,idle协程让出CPU执行权,开始依次调度所有的就绪协程,所有的就绪协程处理完毕后,会再次进入idle协程,之后都是这样循环往复。
#define _ST_VP_IDLE() (*_st_eventsys->dispatch)()
_st_eventsys->dispatch是回调函数,这个函数指针实际指向_st_epoll_dispatch。
void _st_vp_check_clock(void)
{
_st_thread_t *thread;
st_utime_t now;
now = st_utime(); /*获取当前时间*/
_ST_LAST_CLOCK = now;
if (_st_curr_time && now - _st_last_tset > 999000) {
_st_curr_time = time(NULL);
_st_last_tset = now;
}
while (_ST_SLEEPQ != NULL) { /*睡眠队列不为空*/
thread = _ST_SLEEPQ; /*获取最小堆上的最小的定时器*/
ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ);
if (thread->due > now)
break; /*协程的定时器还没有到,立即返回。*/
/*协程的定时器触发了*/
_ST_DEL_SLEEPQ(thread); /*从睡眠队列中删除*/
/*协程是因为条件变量而睡眠的,现在条件变量超时了。*/
if (thread->state == _ST_ST_COND_WAIT)
thread->flags |= _ST_FL_TIMEDOUT;
ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD));
thread->state = _ST_ST_RUNNABLE; /*标记协程为可运行状态*/
_ST_INSERT_RUNQ(thread); /*将协程送至就绪队列,等待调度。*/
}
}
从epoll_wait返回后,检查睡眠队列中的协程,当其定时器到了,则将协程送至就绪队列,等待新一轮的调度。
所有的定时器都放在最小堆中,从最小堆中获取到的是所有定时器的最小值。如果当前时间超过了最小堆中的定时器,说明定时器触发了。通过while循环将最小堆中的所有该触发的定时器全部都保存到就绪队列中。
协程的exit、join和yield
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size)
{
...
if (joinable) { /*如果协程需要主动回收,则为协程创建一个条件变量。*/
thread->term = st_cond_new(); /*创建条件变量*/
if (thread->term == NULL) {
_st_stack_free(thread->stack);
return NULL;
}
}
...
}
在创建协程的时候,需要指明是否会主动回收协程。如果需要主动回收协程,则需要为协程创建一个条件变量,以便其他协程阻塞的回收该协程。
void _st_thread_main(void)
{
_st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程的句柄*/
MD_CAP_STACK(&thread);
thread->retval = (*thread->start)(thread->arg); /*执行协程入口函数*/
st_thread_exit(thread->retval); /*退出协程*/
}
当协程的主体函数执行完毕后,会进入st_thread_exit函数,用于退出协程。
void st_thread_exit(void *retval)
{
_st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程句柄*/
thread->retval = retval; /*保存返回值*/
_st_thread_cleanup(thread); /*释放协程的私有数据*/
_st_active_count--; /*活跃协程数减一*/
if (thread->term) { /*如果需要主动回收此协程*/
thread->state = _ST_ST_ZOMBIE; /*设置协程为僵尸态*/
_ST_ADD_ZOMBIEQ(thread); /*添加到僵尸态队列*/
st_cond_signal(thread->term); /*通知阻塞等待回收的协程*/
_ST_SWITCH_CONTEXT(thread); /*让出执行权*/
st_cond_destroy(thread->term); /*销毁条件变量*/
thread->term = NULL;
}
/*如果是主协程,则无需释放其对应的栈,否则释放在堆上申请的栈空间。*/
if (!(thread->flags & _ST_FL_PRIMORDIAL))
_st_stack_free(thread->stack);
_ST_SWITCH_CONTEXT(thread); /*销毁完毕让出CPU执行权*/
}
若协程是主协程,则无需释放堆空间,否则需要释放在堆上申请的用于栈的空间。thread->term不为NULL,说明这个协程需要主动的回收,此时需要将协程设置为僵尸态,并加入僵尸态队列。同时通知阻塞等待回收的协程。
int st_thread_join(_st_thread_t *thread, void **retvalp)
{
_st_cond_t *term = thread->term; /*获取协程的条件变量*/
if (term == NULL) {
errno = EINVAL;
return -1;
}
if (_ST_CURRENT_THREAD() == thread) { /*不能是当前协程*/
errno = EDEADLK;
return -1;
}
/*不能多个线程回收同时回收同一个协程*/
if (term->wait_q.next != &term->wait_q) {
errno = EINVAL;
return -1;
}
/*如果协程的状态不是僵尸态,则用于回收的线程将进入条件变量等待。*/
while (thread->state != _ST_ST_ZOMBIE) {
if (st_cond_timedwait(term, ST_UTIME_NO_TIMEOUT) != 0)
return -1;
}
if (retvalp)
*retvalp = thread->retval; /*获取待回收协程的返回值*/
thread->state = _ST_ST_RUNNABLE; /*将待回收的协程设置为可运行状态*/
_ST_DEL_ZOMBIEQ(thread); /*从僵尸态队列删除*/
_ST_ADD_RUNQ(thread); /*加入就绪运行队列*/
return 0;
}
协程在回收其他协程,此时待回收的协程还没有退出,主动回收的协程将进入条件变量等待。当待回收的协程退出时,会激活条件变量上的协程。
主动回收的协程从条件变量返回后,此时待回收的协程处于僵尸态,获取返回值后,此时需要再次将待回收的协程置为可运行状态,并加入就绪运行队列。待回收协程会再次进入st_thread_exit()
函数,从_ST_SWITCH_CONTEXT返回,主动销毁条件变量和栈空间,最后通过_ST_SWITCH_CONTEXT让出执行权,这时协程才算退出。
void st_thread_yield()
{
_st_thread_t *me = _ST_CURRENT_THREAD(); /*获取当前协程句柄*/
/*检查是否有定时器事件触发*/
_st_vp_check_clock();
/*就绪队列为空,则直接返回。*/
if (_ST_RUNQ.next == &_ST_RUNQ) {
return;
}
me->state = _ST_ST_RUNNABLE; /*将本协程标记为可运行状态*/
_ST_ADD_RUNQ(me); /*把本协程添加到就绪队列中*/
/*将执行权切换给就绪队列中的其他协程*/
_ST_SWITCH_CONTEXT(me);
}
协程在运行的过程中,可以主动的让出执行权。在让出执行权的时候,需要将自己主动加入到就绪队列中,等待再次被调度。
socket的处理
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds; /*指向数组末尾*/
_st_pollq_t pq;
_st_thread_t *me = _ST_CURRENT_THREAD();
int n;
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
return -1;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
_ST_ADD_IOQ(pq);
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
me->state = _ST_ST_IO_WAIT;
/*主动切出协程,交出执行权。*/
_ST_SWITCH_CONTEXT(me);
n = 0;
if (pq.on_ioq) {
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
} else {
for (pd = pds; pd < epd; pd++) {
if (pd->revents)
n++;
}
}
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
return n;
}
注册需要监听的事件,然后让出CPU执行权,当事件触发后再次从_ST_SWITCH_CONTEXT返回继续处理。
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
struct pollfd pd;
int n;
pd.fd = fd->osfd;
pd.events = (short) how;
pd.revents = 0;
if ((n = st_poll(&pd, 1, timeout)) < 0) /*单一fd*/
return -1;
if (n == 0) {
errno = ETIME;
return -1;
}
if (pd.revents & POLLNVAL) {
errno = EBADF;
return -1;
}
return 0;
}
对监听一个文件描述符的封装
_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout)
{
int osfd, err;
_st_netfd_t *newfd;
/*执行accept函数,如果没有client连接,则accept立即返回。*/
while ((osfd = accept(fd->osfd, addr, (socklen_t *)addrlen)) < 0) {
if (errno == EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
return NULL;
/*进入poll函数,注册读事件,同时让出CPU的执行权,等待读事件触发。*/
if (st_netfd_poll(fd, POLLIN, timeout) < 0)
return NULL;
}
/*accept返回的client socket fd,将其进行封装。*/
newfd = _st_netfd_new(osfd, 1, 1);
if (!newfd) {
err = errno;
close(osfd);
errno = err;
}
return newfd;
}
fd
被设置为了非阻塞,调用accept()函数后,若没有客户端请求连接,则立即从accept返回,若errno为EAGAIN或EWOULDBLOCK,说明没有客户端连接,然后执行st_netfd_poll()函数,在此函数内会为fd
注册读事件,同时会让出CPU的执行权。当fd
的读事件触发后,本协程会再次被调度从而获得CPU执行权,接着往下执行。
ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte, st_utime_t timeout)
{
ssize_t n;
while ((n = read(fd->osfd, buf, nbyte)) < 0) { /*非阻塞的读取*/
if (errno == EINTR) /*被信号中断了*/
continue;
if (!_IO_NOT_READY_ERROR) /*不是EAGAIN或EWOULDBLOCK错误*/
return -1;
/*执行到这里说明发生了EAGAIN或EWOULDBLOCK错误,此时没有数据可读,让出执行权。*/
if (st_netfd_poll(fd, POLLIN, timeout) < 0)
return -1;
}
return n;
}
read的原理同accept,不再赘述。