SRS源码—— Thread笔记

 

SRS源码中的Thread是一层套一层,最终的Thread类是在 srs_app_thread.cpp 的 SrsThread

这里我们暂且先放下协程的概念,把它当线程来看,其逻辑如下:

 

1. 在start() 方法中创建线程:

 int SrsThread::start()
    {
        int ret = ERROR_SUCCESS;
        
        if(tid) {
            srs_info("thread %s already running.", _name);
            return ret;
        }
        
        if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
            ret = ERROR_ST_CREATE_CYCLE_THREAD;
            srs_error("st_thread_create failed. ret=%d", ret);
            return ret;
        }
        
        disposed = false;
        // we set to loop to true for thread to run.
        loop = true;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (_cid < 0) {
            st_usleep(10 * 1000);
        }
        
        // now, cycle thread can run.
        can_run = true;
        
        return ret;
    }

 

2. 执行的函数是 thread_fun,参数是其本身即: this,再实际调用 this->thread_cycle()

void* SrsThread::thread_fun(void* arg)
    {
        SrsThread* obj = (SrsThread*)arg;
        srs_assert(obj);
        
        obj->thread_cycle();
        
        // for valgrind to detect.
        SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
        if (ctx) {
            ctx->clear_cid();
        }
        
        st_thread_exit(NULL);
        
        return NULL;
    }

 

3. thread_cycle() 实现如下:

void SrsThread::thread_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        _srs_context->generate_id();
        srs_info("thread %s cycle start", _name);
        
        _cid = _srs_context->get_id();
        
        srs_assert(handler);
        handler->on_thread_start();
        
        // thread is running now.
        really_terminated = false;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (!can_run && loop) {
            st_usleep(10 * 1000);
        }
        
        while (loop) {
            if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
                goto failed;
            }
            srs_info("thread %s on before cycle success", _name);
            
            if ((ret = handler->cycle()) != ERROR_SUCCESS) {
                if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
                    srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
                }
                goto failed;
            }
            srs_info("thread %s cycle success", _name);
            
            if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", _name, ret);
                goto failed;
            }
            srs_info("thread %s on end cycle success", _name);
            
        failed:
            if (!loop) {
                break;
            }
            
            // to improve performance, donot sleep when interval is zero.
            // @see: https://github.com/ossrs/srs/issues/237
            if (cycle_interval_us != 0) {
                st_usleep(cycle_interval_us);
            }
        }
        
        // readly terminated now.
        really_terminated = true;
        
        handler->on_thread_stop();
        srs_info("thread %s cycle finished", _name);
    }

其核心就是循环调用 变量 ISrsThreadHandler* handler (构造函数的参数赋值进来的)如下三个函数:

virtual int on_before_cycle();
virtual int cycle() = 0;
virtual int on_end_cycle();

 

4. 停止线程循环可以调用 stop_loop() :

void SrsThread::stop_loop()
{
    loop = false;
}

 

 

但是仔细看使用线程的代码会发现,ISrsThreadHandler* handler 这个变量还是被封装了。

如 SrsOneCycleThread 自己继承自 ISrsThreadHandler

class SrsOneCycleThread : public internal::ISrsThreadHandler
{
private:
    internal::SrsThread* pthread;
    ISrsOneCycleThreadHandler* handler;

//...
}

 

在new SrsThread 时,将自己的this指针传入,当作SrsThread 的 handler变量,

然后自己的线程函数中重新调用自己的变量: ISrsOneCycleThreadHandler* handler

int SrsOneCycleThread::cycle()
{
    int ret = handler->cycle();
    pthread->stop_loop();
    return ret;
}

void SrsOneCycleThread::on_thread_start()
{
    handler->on_thread_start();
}

int SrsOneCycleThread::on_before_cycle()
{
    return handler->on_before_cycle();
}

int SrsOneCycleThread::on_end_cycle()
{
    return handler->on_end_cycle();
}

void SrsOneCycleThread::on_thread_stop()
{
    handler->on_thread_stop();
}

 

注意,虽然 ISrsOneCycleThreadHandlerISrsThreadHandler 这两个接口函数都一样,但是并没有继承关系。

 

上一篇:PAT_A1150#Travelling Salesman Problem


下一篇:【第七篇】Volley之处理Gzip数据