SRS源码中的Thread是一层套一层,最终的Thread类是在 srs_app_thread.cpp 的 SrsThread 类
这里我们暂且先放下协程的概念,把它当线程来看,其逻辑如下:
1. 在start() 方法中创建线程:
int SrsThread::start() { int ret = ERROR_SUCCESS; if(tid) { srs_info("thread %s already running.", _name); return ret; } if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){ ret = ERROR_ST_CREATE_CYCLE_THREAD; srs_error("st_thread_create failed. ret=%d", ret); return ret; } disposed = false; // we set to loop to true for thread to run. loop = true; // wait for cid to ready, for parent thread to get the cid. while (_cid < 0) { st_usleep(10 * 1000); } // now, cycle thread can run. can_run = true; return ret; }
2. 执行的函数是 thread_fun,参数是其本身即: this,再实际调用 this->thread_cycle():
void* SrsThread::thread_fun(void* arg) { SrsThread* obj = (SrsThread*)arg; srs_assert(obj); obj->thread_cycle(); // for valgrind to detect. SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context); if (ctx) { ctx->clear_cid(); } st_thread_exit(NULL); return NULL; }
3. thread_cycle() 实现如下:
void SrsThread::thread_cycle() { int ret = ERROR_SUCCESS; _srs_context->generate_id(); srs_info("thread %s cycle start", _name); _cid = _srs_context->get_id(); srs_assert(handler); handler->on_thread_start(); // thread is running now. really_terminated = false; // wait for cid to ready, for parent thread to get the cid. while (!can_run && loop) { st_usleep(10 * 1000); } while (loop) { if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) { srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret); goto failed; } srs_info("thread %s on before cycle success", _name); if ((ret = handler->cycle()) != ERROR_SUCCESS) { if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) { srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret); } goto failed; } srs_info("thread %s cycle success", _name); if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) { srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret); goto failed; } srs_info("thread %s on end cycle success", _name); failed: if (!loop) { break; } // to improve performance, donot sleep when interval is zero. // @see: https://github.com/ossrs/srs/issues/237 if (cycle_interval_us != 0) { st_usleep(cycle_interval_us); } } // readly terminated now. really_terminated = true; handler->on_thread_stop(); srs_info("thread %s cycle finished", _name); }
其核心就是循环调用 变量 ISrsThreadHandler* handler (构造函数的参数赋值进来的)如下三个函数:
virtual int on_before_cycle(); virtual int cycle() = 0; virtual int on_end_cycle();
4. 停止线程循环可以调用 stop_loop() :
void SrsThread::stop_loop() { loop = false; }
但是仔细看使用线程的代码会发现,ISrsThreadHandler* handler 这个变量还是被封装了。
如 SrsOneCycleThread 自己继承自 ISrsThreadHandler
class SrsOneCycleThread : public internal::ISrsThreadHandler { private: internal::SrsThread* pthread; ISrsOneCycleThreadHandler* handler; //... }
在new SrsThread 时,将自己的this指针传入,当作SrsThread 的 handler变量,
然后自己的线程函数中重新调用自己的变量: ISrsOneCycleThreadHandler* handler
int SrsOneCycleThread::cycle() { int ret = handler->cycle(); pthread->stop_loop(); return ret; } void SrsOneCycleThread::on_thread_start() { handler->on_thread_start(); } int SrsOneCycleThread::on_before_cycle() { return handler->on_before_cycle(); } int SrsOneCycleThread::on_end_cycle() { return handler->on_end_cycle(); } void SrsOneCycleThread::on_thread_stop() { handler->on_thread_stop(); }
注意,虽然 ISrsOneCycleThreadHandler 和 ISrsThreadHandler 这两个接口函数都一样,但是并没有继承关系。