4、SRS4.0源代码分析之RTMP推流处理

学习目标:

    本章我们将分析SRS4.0 RTMP服务模块和推流相关的代码处理逻辑。


学习内容:

    根据上节内容可知,SRS4.0针对RTMP推流客户端的处理逻辑,主要在协程SrsRtmpConn::do_cycle()中通过调用SrsRtmpConn::publishing()函数进行处理。

// 为了方便理解,下面函数使用了简化后的伪代码,但不影响理解函数的主流程
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{
    http_hooks_on_publish();   // 此回调函数向外通知客户端推流开始
    
    // 检测是否可以向当前的source对象推流,如果source的状态是已经有人在推流了,则acquire_publish函数返回失败
    if ((err = acquire_publish(source)) == srs_success) 
    {
        // 创建推流端接收协程,这个协程用于实际接收客户端向服务器发送的推流数据
        // 接收协程将在do_publishing()函数内部被启动,并在do_publishing()函数退出后主动结束
        SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, );  
        
        // 进入推流处理函数do_publishing()
        // SrsRtmpConn协程将在do_publishing函数内部循环处理,
        // 同时do_publishing函数内部还会启动SrsPublishRecvThread协程
        err = do_publishing(source, &rtrd);  
                                         
        rtrd.stop();  // 执行到这里,表示SrsRtmpConn协程退出循环,此时需要结束之前创建的接收协程
    }
    
    release_publish(source); // 推流结束,释放source中的推流信息
    
    http_hooks_on_unpublish();   // 此回调函数向外通知客户端推流结束
}

// SrsRtmpConn协程在推流端的处理,主要工作是统计接收的数据,并检测如果没有接收到新数据,则结束协程
srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{
    rtrd->start();  // 启动接收协程
    while (true) {  // conn协程好像主要做统计工作,以及判断接收数据超时后退出do_publishing
        
        if ((err = trd->pull()) != srs_success)  // 协程运行出错,则结束协程
            return srs_error_wrap();
        
        if (nb_msgs == 0) {  
            rtrd->wait(20秒);  // 如果是第一次接收报文,conn协程在此阻塞20秒
        } 
        else {   
            rtrd->wait(5秒);   // 如果是普通接收报文,conn协程在此阻塞5秒
        }    

        if ((rtrd->error_code()) != srs_success)  
            return srs_error_wrap(); // 如果接收协程错误则退出循环,结束协程
            
        if (rtrd->nb_msgs() <= nb_msgs)  
            return srs_error_wrap(); // 超时检测发现没有接收到新数据则退出循环,结束协程

        nb_msgs = rtrd->nb_msgs(); // 更新数据,???这里要是变量溢出翻转了怎么办
        stat->on_video_frames();  // 通过单件类SrsStatistic,刷新每条流的接收视频帧计数
    }
}

推流端接收协程的创建和处理逻辑

// 在创建SrsPublishRecvThread对象时,它的构造函数会同时创建一个SrsRecvThread协程对象
// 所以,真正的推流端接收协程是SrsRecvThread::do_cycle()
SrsPublishRecvThread::SrsPublishRecvThread()
    : trd(this, rtmp_sdk, tm, parent_cid)
{

}

srs_error_t SrsRecvThread::cycle() {
    rtmp->set_recv_timeout(SRS_UTIME_NO_TIMEOUT); // 设置为阻塞式读取
    do_cycle();  // 
    rtmp->set_recv_timeout(timeout); // 读取连接结束,恢复默认timeout
}

srs_error_t SrsRecvThread::do_cycle()
{
    rtmp->recv_message(&msg); // 此函数用于得到一个完整RTMP Message消息
                              // 到此我们大概知道,具体的RTMP协议都封装在SrsRtmpServer类中
                              // 对于初学者,可以先不关心SrsRtmpServer类的实现细节
    pumper->consume(msg);  // 对于推流端,这里的pumper对象类型为SrsPublishRecvThread
                           // 对于拉流端,这里的pumper对象类型为SrsQueueRecvThread
}

// 接收推流端数据并处理
srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
{
    _conn->handle_publish_message(_source, msg); // 最终,SrsRtmpConn接收推流报文
    srs_thread_yield();  // 当前协程主动让出CPU
}

处理一个完整的RTMP报文

srs_error_t SrsRtmpConn::handle_publish_message()
{
    // 如果收到的报文是RTMP协议相关的控制命令,则此分支处理,然后就直接返回
    if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) 
    {
        rtmp->decode_message(msg, &pkt);
        rtmp->fmle_unpublish(); // for fmle, drop others except the fmle start packet.
        return err;
    }  
 
    // 处理RTMP相关的video, audio, data报文
    process_publish_message(source, msg);   // video, audio, data message
}

srs_error_t SrsRtmpConn::process_publish_message()
{
    if (info->edge) {  
        // 如果sever工作在edge模式,则调用此接口向源站转发报文,然后直接返回
        // 此时可以看到,edge模式下,SRS属于纯转发,所以不会在本地做HLS(ts、m3u8文件)处理
        // 同样,对于edge模式下的,推流处理逻辑主要在SrsPublishEdge和SrsEdgeForwarder类实现
        // 对于初学者,可暂时不关注这两个类,只要知道SRS对edge模式的设计规格是:
        // 推流到edge上时,edge会直接将流转发给源站;播放edge上的流时,edge会回源拉流。
        return  source->on_edge_proxy_publish(msg);  
    }   
    
    // 非edge模式,报文分类传入source对象
    if (msg->header.is_audio()) {  // 音频报文处理分支
        return source->on_audio(msg);  
    }     
    
    if (msg->header.is_video()) {  // 视频报文处理分支
        return source->on_video(msg);  
    }     
    
    if (msg->header.is_aggregate()) {  // 聚合消息处理分支
        return source->on_aggregate(msg);  
    }  
    
    // onMetaData音视频元数据处理分支
    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
        rtmp->decode_message(msg, &pkt);
        source->on_meta_data(msg, metadata); 
        return;
    }
}

// 在继续分析后面的函数处理之前,我们大概要了解一下RTMP对H264数据的封装格式
// RTMP推送的音视频流的封装形式和FLV格式相似
// 首先需要发送"AVC sequence header"和"AAC sequence header",
// 这两项数据包含的是重要的编码信息,没有它们,解码器将无法解码。
// 1)RTMP对SPS+PPS数据的封装格式如下(SPS和PPS全局关键编码信息,也属于关键帧)
+----------------------------------------------------------------------------+
|FrameType(1 byte) 高4位表示是否是关键帧(1:关键帧),低4位表示编码类型(7表示H264编码)  |
+----------------------------------------------------------------------------+
|0x00 0x00 0x00 0x00 (4 byte)                                                |
+----------------------------------------------------------------------------+
|configurationVersion (1 byte)    0x01                                       |
+----------------------------------------------------------------------------+
|AVCProfileIndication (1 byte)    SPS[1]                                     |
+----------------------------------------------------------------------------+
|profile_compatibility (1 byte)   SPS[2]                                     |
+----------------------------------------------------------------------------+
|AVCLevelIndication (1 byte)      SPS[3]                                     |
+----------------------------------------------------------------------------+
|lengthSi*usOne (1 byte)      0xff                                       |
+----------------------------------------------------------------------------+
|sps number (1 byte)              SPS的序号0xE1                               |
+----------------------------------------------------------------------------+
|sps data length (2 byte)         去掉分隔符00000001之后的实际长度               |
+----------------------------------------------------------------------------+
|sps data                                                                    |
+----------------------------------------------------------------------------+
|pps number (1 byte)              PPS的序号0x01                               |
+----------------------------------------------------------------------------+
|pps data length (2 byte)         去掉分隔符00000001之后的实际长度               |
+----------------------------------------------------------------------------+
|pps data                                                                    |
+----------------------------------------------------------------------------+

// 2)RTMP对普通H264视频数据的封装
+----------------------------------------------------------------------------+
| FrameType[1 byte] 高4位表示是否是关键帧(1:关键帧),低4位表示编码类型(7表示H264编码) |
+----------------------------------------------------------------------------+
| 0x01 0x00 0x00 0x00 (4 byte)                                               |
+----------------------------------------------------------------------------+
| NALU size (4 byte)  去掉分隔符00000001之后的长度                              |
+----------------------------------------------------------------------------+
| NALU data                                                                  |
+----------------------------------------------------------------------------+

// 音视频报文的处理流程基本一致,其中有一个mix_queue用于解决时间戳乱序问题
srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio)
srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) 
{
    last_packet_time = shared_video->header.timestamp; // 更新最新接收报文的时间
    
    // 检查视频报文的封装格式是否正确,错误报文直接丢弃,判断依据就是RTMP封装的第一个字节frame_type
    if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
        return err;
    }
    
    SrsSharedPtrMessage msg;
    msg.create(shared_video);// 将shared_video中的视频数据转移到msg对象内
    
    // 如果不需要mix_correct算法,则直接交给on_video_imp处理
    if (!mix_correct) { return on_video_imp(&msg); }   
    
    // 否则将音视频报文放入算法队列,执行mix_correct
    mix_queue->push(msg.copy());                 
    SrsSharedPtrMessage* m = mix_queue->pop();   // 从算法队列取报文并处理
    if (m->is_audio()) { 
        err = on_audio_imp(m); 
    } else {   
        err = on_video_imp(m); 
    }
}

srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) 
{
    // AVC sequence header就是SPS+PPS的RTMP包,具体根据报文的第1、2字节进行判断:
    // 第一个字节是FrameType,其中:高4位表示是否是关键帧(1:关键帧,2、3、4、5表示其它帧类型),
    //                          低4位表示编码类型(7表示H264编码,12表示H265编码)
    // 第二个字节:0表示SPS+PPS数据,1表示H264视频帧
    // 这里就是根据上面的原则判断报文是否是AVC sequence header(SPS+PPS)
    bool is_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size); 
    
    // 如果是AVC sequence header数据,需要和meta对象中保存的数据比较,是否为重复数据
    
    // 将最新的SPS+PPS数据保存到meta对象,
    if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
        return srs_error_wrap(err, "meta update video");
    }
    
    // 服务器只有工作在源站模式才能走到这里,此函数内部执行源站的全部工作,内容多且独立,后面单独分析
    // 1) hls->on_video()  HLS处理
    // 2) dash->on_video() DASH处理
    // 3) dvr->on_video()  DVR处理
    // 4) hds->on_video()  HDS处理
    // 5) forwarder->on_video()  直接向指定站点转发视频数据
    if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
        return srs_error_wrap(err, "hub consume video");
    }
    
    // 如果是类似RTMP推流-->WebRTC拉流场景,则有相应的bridger_->on_video()处理
    if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) {
        return srs_error_wrap(err, "bridger consume video");
    }
 
    // 典型的RTMP直播场景,一个推流端可能同时对应多个拉流端,
    // 每个拉流端都需要创建一个属于自己的SrsLiveConsumer消费者对象
    // SRS接收到推流客户端的数据后,在这里将数据复制到每个拉流端的SrsLiveConsumer缓存队列中
    for (int i = 0; i < (int)consumers.size(); i++) {
        SrsLiveConsumer* consumer = consumers.at(i);  // 遍历拉流端队列并复制消息
        if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
            return srs_error_wrap(err, "consume video");
        }
    }
    
    // AVC sequence header(SPS+PPS)数据不需要GOP缓存,在这里直接返回
    if (is_sequence_header) { return err; } 
    
    gop_cache->cache(msg);  // 普通视频帧放入GOP缓存
    
    // 关于ATC,大概描述如下
    // SRS默认ATC是关闭,即给客户端的RTMP流永远从0开始。
    // 开启ATC之后,RTMP流的时间戳就是ATC时间(即绝对时间),这样的目的大概是为了实现HLS的主备时间一致
    if (atc) {
        if (meta->vsh()) { meta->vsh()->timestamp = msg->timestamp; }
        if (meta->data()) { meta->data()->timestamp = msg->timestamp; }
    }
}

srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
    // 判断报文是否是AAC sequence header数据
    bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size);
    bool is_sequence_header = is_aac_sequence_header;

    // 如果是AAC sequence header数据,需要和meta对象中保存的数据比较,是否为重复数据
    
    // 服务器只有工作在源站模式才能走到这里,此函数内部执行源站的全部工作,内容多且独立,后面单独分析
    // 1) hls->on_audio()   HLS处理
    // 2) dash->on_audio()  DASH处理
    // 3)dvr->on_audio()   DVR处理
    // 4)hds->on_audio()   HDS处理
    // 5) forwarder->on_audio()   直接向指定站点转发视频数据
    hub->on_audio(msg);
    
    // 如果是类似RTMP推流-->WebRTC拉流场景,则有相应的bridger_->on_audio()处理
    if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) {
        return srs_error_wrap(err, "bridger consume audio");
    }
    
    // SRS接收到推流客户端的数据后,在这里将数据复制到每个拉流端的SrsLiveConsumer缓存队列中
    if (!drop_for_reduce) {
        for (int i = 0; i < (int)consumers.size(); i++) {
            SrsLiveConsumer* consumer = consumers.at(i);
            if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
                return srs_error_wrap(err, "consume message");
            }
        }
    }

    // AVC sequence header(SPS+PPS)数据不需要GOP缓存,在这里直接返回
    if (is_sequence_header) { return err; } 
    
    gop_cache->cache(msg);  // 普通视频帧放入GOP缓存
    
    // 关于ATC,大概描述如下
    // SRS默认ATC是关闭,即给客户端的RTMP流永远从0开始。
    // 开启ATC之后,RTMP流的时间戳就是ATC时间(即绝对时间),这样的目的大概是为了实现HLS的主备时间一致
    if (atc) {
        if (meta->vsh()) { meta->vsh()->timestamp = msg->timestamp; }
        if (meta->data()) { meta->data()->timestamp = msg->timestamp; }
    }
}

srs_error_t SrsLiveConsumer::enqueue(msg) 
{
    // ATC默认不开启时,音视频数据先进入SrsRtmpJitter对象根据算法检查甚至修改报文的时间戳
    //  full算法:保证报文的时间戳从0开始,且单调递增
    //  zero算法:只保证时间戳从0开始
    //  off算法:不做jitter处理
    if (!atc) {
        if ((err = jitter->correct(msg, ag)) != srs_success) {
            return srs_error_wrap(err, "consume message");
        }
    }
    
    // 将最新接收到的报文入队列,在队列内部会判断如果缓存的报文过多,则丢弃一部分旧报文
    queue->enqueue(msg, NULL);

    if (mw_waiting) { // 为了提供读写效率,此标志表示采用批量写方式
        
        srs_utime_t duration = queue->duration(); // 计算队列总缓存报文的总时间
        bool match_min_msgs = queue->size() > mw_min_msgs; // 计算队列总缓存报文的总数量
        
        // For ATC, maybe the SH timestamp bigger than A/V packet,
        // when encoder republish or overflow.
        // @see https://github.com/ossrs/srs/pull/749
        if (atc && duration < 0) {
            srs_cond_signal(mw_wait);
            mw_waiting = false;
            return err;
        }
        
        // 如果队列缓存报文的总时间超过mw_duration门限且缓存报文的总数量超过mw_min_msgs
        // 则调用srs_cond_signal(mw_wait)通知拉流(play)协程从队列中取数据
        if (match_min_msgs && duration > mw_duration) {
            srs_cond_signal(mw_wait);
            mw_waiting = false;
            return err;
        }
    }
}

srs_error_t SrsMessageQueue::enqueue() 
{
    msgs.push_back(msg);   // Message数据包最终进入vector队列
    
    // 设置队列缓存报文的av_start_time 和 av_end_time 
    if (msg->is_av()) {
        if (av_start_time == -1) {
            av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
        }
        av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
    }

    // 默认的最大缓存时间(max_queue_size )一般是30秒
    // 如果被修改为<= 0,则表示不做判断,一直缓存
    if (max_queue_size <= 0) { return err; }
    
    // 如果队列中缓存报文超过max_queue_size,则通过shrink()函数丢弃旧报文
    while (av_end_time - av_start_time > max_queue_size) {
        shrink();
    }
    
    return err;
}

void SrsMessageQueue::shrink()
{
    SrsSharedPtrMessage* video_sh = NULL;
    SrsSharedPtrMessage* audio_sh = NULL;
    int msgs_size = (int)msgs.size();
    
    // remove all msg
    // igone the sequence header
    for (int i = 0; i < (int)msgs.size(); i++) {
        SrsSharedPtrMessage* msg = msgs.at(i);
        
        if (msg->is_video() && SrsFlvVideo::sh(msg->payload, msg->size)) {
            srs_freep(video_sh);
            video_sh = msg;
            continue;
        }
        else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload, msg->size)) {
            srs_freep(audio_sh);
            audio_sh = msg;
            continue;
        }
        
        srs_freep(msg);
    }
    msgs.clear();
    
    // update av_start_time
    av_start_time = av_end_time;
    //push_back secquence header and update timestamp
    if (video_sh) {
        video_sh->timestamp = srsu2ms(av_end_time);
        msgs.push_back(video_sh);
    }
    if (audio_sh) {
        audio_sh->timestamp = srsu2ms(av_end_time);
        msgs.push_back(audio_sh);
    }
    
    if (!_ignore_shrink) {
        srs_trace("shrinking, size=%d, removed=%d, max=%dms", (int)msgs.size(), msgs_size - (int)msgs.size(), srsu2msi(max_queue_size));
    }
}

综上,推流端处理逻辑大致处理完成,下一章将继续分析拉流端处理逻辑。


学习总结:

通过分析,我们了解了SRS4.0 RTMP服务模块推流端处理的整体逻辑:
1)推流端一共有两个主要的协程,其中SrsRtmpConn::do_publishing()协程主要工作是统计接收的数据,并检测如果没有接收到新数据,则断开推流端连接。SrsRecvThread::cycle()协程真正接收客户端的推流数据。
2)每个推流客户端对应一个SrsLiveSource对象,每个拉流客户端对应一个SrsLiveConsumer对象。SrsRecvThread::cycle()协程将接收到的RTMP数据包通过SrsLiveSource对象最终复制到每个SrsLiveConsumer对象,最终实现从推流端到拉流端的报文转发。
3)如果SRS服务器工作在edge模式,相关的边缘处理逻辑都封装在SrsPublishEdge类中。
如果服务器工作在orgin模式,相关的处理逻辑(HLS / DVR / Forward)都封装在SrsOriginHub类中。
如果要实现SRS内部的RTMP与WebRTC数据互通,则需要进一步分析ISrsLiveSourceBridger类。

整个过程,可以参考下面的数据流程图。
4、SRS4.0源代码分析之RTMP推流处理

上一篇:web调取摄像头


下一篇:python-进程&线程