学习目标:
本章我们将分析SRS4.0 RTMP服务模块和推流相关的代码处理逻辑。
学习内容:
根据上节内容可知,SRS4.0针对RTMP推流客户端的处理逻辑,主要在协程SrsRtmpConn::do_cycle()中通过调用SrsRtmpConn::publishing()函数进行处理。
// 为了方便理解,下面函数使用了简化后的伪代码,但不影响理解函数的主流程
srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
{
http_hooks_on_publish(); // 此回调函数向外通知客户端推流开始
// 检测是否可以向当前的source对象推流,如果source的状态是已经有人在推流了,则acquire_publish函数返回失败
if ((err = acquire_publish(source)) == srs_success)
{
// 创建推流端接收协程,这个协程用于实际接收客户端向服务器发送的推流数据
// 接收协程将在do_publishing()函数内部被启动,并在do_publishing()函数退出后主动结束
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, );
// 进入推流处理函数do_publishing()
// SrsRtmpConn协程将在do_publishing函数内部循环处理,
// 同时do_publishing函数内部还会启动SrsPublishRecvThread协程
err = do_publishing(source, &rtrd);
rtrd.stop(); // 执行到这里,表示SrsRtmpConn协程退出循环,此时需要结束之前创建的接收协程
}
release_publish(source); // 推流结束,释放source中的推流信息
http_hooks_on_unpublish(); // 此回调函数向外通知客户端推流结束
}
// SrsRtmpConn协程在推流端的处理,主要工作是统计接收的数据,并检测如果没有接收到新数据,则结束协程
srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
{
rtrd->start(); // 启动接收协程
while (true) { // conn协程好像主要做统计工作,以及判断接收数据超时后退出do_publishing
if ((err = trd->pull()) != srs_success) // 协程运行出错,则结束协程
return srs_error_wrap();
if (nb_msgs == 0) {
rtrd->wait(20秒); // 如果是第一次接收报文,conn协程在此阻塞20秒
}
else {
rtrd->wait(5秒); // 如果是普通接收报文,conn协程在此阻塞5秒
}
if ((rtrd->error_code()) != srs_success)
return srs_error_wrap(); // 如果接收协程错误则退出循环,结束协程
if (rtrd->nb_msgs() <= nb_msgs)
return srs_error_wrap(); // 超时检测发现没有接收到新数据则退出循环,结束协程
nb_msgs = rtrd->nb_msgs(); // 更新数据,???这里要是变量溢出翻转了怎么办
stat->on_video_frames(); // 通过单件类SrsStatistic,刷新每条流的接收视频帧计数
}
}
推流端接收协程的创建和处理逻辑
// 在创建SrsPublishRecvThread对象时,它的构造函数会同时创建一个SrsRecvThread协程对象
// 所以,真正的推流端接收协程是SrsRecvThread::do_cycle()
SrsPublishRecvThread::SrsPublishRecvThread()
: trd(this, rtmp_sdk, tm, parent_cid)
{
}
srs_error_t SrsRecvThread::cycle() {
rtmp->set_recv_timeout(SRS_UTIME_NO_TIMEOUT); // 设置为阻塞式读取
do_cycle(); //
rtmp->set_recv_timeout(timeout); // 读取连接结束,恢复默认timeout
}
srs_error_t SrsRecvThread::do_cycle()
{
rtmp->recv_message(&msg); // 此函数用于得到一个完整RTMP Message消息
// 到此我们大概知道,具体的RTMP协议都封装在SrsRtmpServer类中
// 对于初学者,可以先不关心SrsRtmpServer类的实现细节
pumper->consume(msg); // 对于推流端,这里的pumper对象类型为SrsPublishRecvThread
// 对于拉流端,这里的pumper对象类型为SrsQueueRecvThread
}
// 接收推流端数据并处理
srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
{
_conn->handle_publish_message(_source, msg); // 最终,SrsRtmpConn接收推流报文
srs_thread_yield(); // 当前协程主动让出CPU
}
处理一个完整的RTMP报文
srs_error_t SrsRtmpConn::handle_publish_message()
{
// 如果收到的报文是RTMP协议相关的控制命令,则此分支处理,然后就直接返回
if (msg->header.is_amf0_command() || msg->header.is_amf3_command())
{
rtmp->decode_message(msg, &pkt);
rtmp->fmle_unpublish(); // for fmle, drop others except the fmle start packet.
return err;
}
// 处理RTMP相关的video, audio, data报文
process_publish_message(source, msg); // video, audio, data message
}
srs_error_t SrsRtmpConn::process_publish_message()
{
if (info->edge) {
// 如果sever工作在edge模式,则调用此接口向源站转发报文,然后直接返回
// 此时可以看到,edge模式下,SRS属于纯转发,所以不会在本地做HLS(ts、m3u8文件)处理
// 同样,对于edge模式下的,推流处理逻辑主要在SrsPublishEdge和SrsEdgeForwarder类实现
// 对于初学者,可暂时不关注这两个类,只要知道SRS对edge模式的设计规格是:
// 推流到edge上时,edge会直接将流转发给源站;播放edge上的流时,edge会回源拉流。
return source->on_edge_proxy_publish(msg);
}
// 非edge模式,报文分类传入source对象
if (msg->header.is_audio()) { // 音频报文处理分支
return source->on_audio(msg);
}
if (msg->header.is_video()) { // 视频报文处理分支
return source->on_video(msg);
}
if (msg->header.is_aggregate()) { // 聚合消息处理分支
return source->on_aggregate(msg);
}
// onMetaData音视频元数据处理分支
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
rtmp->decode_message(msg, &pkt);
source->on_meta_data(msg, metadata);
return;
}
}
// 在继续分析后面的函数处理之前,我们大概要了解一下RTMP对H264数据的封装格式
// RTMP推送的音视频流的封装形式和FLV格式相似
// 首先需要发送"AVC sequence header"和"AAC sequence header",
// 这两项数据包含的是重要的编码信息,没有它们,解码器将无法解码。
// 1)RTMP对SPS+PPS数据的封装格式如下(SPS和PPS全局关键编码信息,也属于关键帧)
+----------------------------------------------------------------------------+
|FrameType(1 byte) 高4位表示是否是关键帧(1:关键帧),低4位表示编码类型(7表示H264编码) |
+----------------------------------------------------------------------------+
|0x00 0x00 0x00 0x00 (4 byte) |
+----------------------------------------------------------------------------+
|configurationVersion (1 byte) 0x01 |
+----------------------------------------------------------------------------+
|AVCProfileIndication (1 byte) SPS[1] |
+----------------------------------------------------------------------------+
|profile_compatibility (1 byte) SPS[2] |
+----------------------------------------------------------------------------+
|AVCLevelIndication (1 byte) SPS[3] |
+----------------------------------------------------------------------------+
|lengthSi*usOne (1 byte) 0xff |
+----------------------------------------------------------------------------+
|sps number (1 byte) SPS的序号0xE1 |
+----------------------------------------------------------------------------+
|sps data length (2 byte) 去掉分隔符00000001之后的实际长度 |
+----------------------------------------------------------------------------+
|sps data |
+----------------------------------------------------------------------------+
|pps number (1 byte) PPS的序号0x01 |
+----------------------------------------------------------------------------+
|pps data length (2 byte) 去掉分隔符00000001之后的实际长度 |
+----------------------------------------------------------------------------+
|pps data |
+----------------------------------------------------------------------------+
// 2)RTMP对普通H264视频数据的封装
+----------------------------------------------------------------------------+
| FrameType[1 byte] 高4位表示是否是关键帧(1:关键帧),低4位表示编码类型(7表示H264编码) |
+----------------------------------------------------------------------------+
| 0x01 0x00 0x00 0x00 (4 byte) |
+----------------------------------------------------------------------------+
| NALU size (4 byte) 去掉分隔符00000001之后的长度 |
+----------------------------------------------------------------------------+
| NALU data |
+----------------------------------------------------------------------------+
// 音视频报文的处理流程基本一致,其中有一个mix_queue用于解决时间戳乱序问题
srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio)
srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video)
{
last_packet_time = shared_video->header.timestamp; // 更新最新接收报文的时间
// 检查视频报文的封装格式是否正确,错误报文直接丢弃,判断依据就是RTMP封装的第一个字节frame_type
if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
return err;
}
SrsSharedPtrMessage msg;
msg.create(shared_video);// 将shared_video中的视频数据转移到msg对象内
// 如果不需要mix_correct算法,则直接交给on_video_imp处理
if (!mix_correct) { return on_video_imp(&msg); }
// 否则将音视频报文放入算法队列,执行mix_correct
mix_queue->push(msg.copy());
SrsSharedPtrMessage* m = mix_queue->pop(); // 从算法队列取报文并处理
if (m->is_audio()) {
err = on_audio_imp(m);
} else {
err = on_video_imp(m);
}
}
srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg)
{
// AVC sequence header就是SPS+PPS的RTMP包,具体根据报文的第1、2字节进行判断:
// 第一个字节是FrameType,其中:高4位表示是否是关键帧(1:关键帧,2、3、4、5表示其它帧类型),
// 低4位表示编码类型(7表示H264编码,12表示H265编码)
// 第二个字节:0表示SPS+PPS数据,1表示H264视频帧
// 这里就是根据上面的原则判断报文是否是AVC sequence header(SPS+PPS)
bool is_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size);
// 如果是AVC sequence header数据,需要和meta对象中保存的数据比较,是否为重复数据
// 将最新的SPS+PPS数据保存到meta对象,
if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
return srs_error_wrap(err, "meta update video");
}
// 服务器只有工作在源站模式才能走到这里,此函数内部执行源站的全部工作,内容多且独立,后面单独分析
// 1) hls->on_video() HLS处理
// 2) dash->on_video() DASH处理
// 3) dvr->on_video() DVR处理
// 4) hds->on_video() HDS处理
// 5) forwarder->on_video() 直接向指定站点转发视频数据
if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
return srs_error_wrap(err, "hub consume video");
}
// 如果是类似RTMP推流-->WebRTC拉流场景,则有相应的bridger_->on_video()处理
if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume video");
}
// 典型的RTMP直播场景,一个推流端可能同时对应多个拉流端,
// 每个拉流端都需要创建一个属于自己的SrsLiveConsumer消费者对象
// SRS接收到推流客户端的数据后,在这里将数据复制到每个拉流端的SrsLiveConsumer缓存队列中
for (int i = 0; i < (int)consumers.size(); i++) {
SrsLiveConsumer* consumer = consumers.at(i); // 遍历拉流端队列并复制消息
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume video");
}
}
// AVC sequence header(SPS+PPS)数据不需要GOP缓存,在这里直接返回
if (is_sequence_header) { return err; }
gop_cache->cache(msg); // 普通视频帧放入GOP缓存
// 关于ATC,大概描述如下
// SRS默认ATC是关闭,即给客户端的RTMP流永远从0开始。
// 开启ATC之后,RTMP流的时间戳就是ATC时间(即绝对时间),这样的目的大概是为了实现HLS的主备时间一致
if (atc) {
if (meta->vsh()) { meta->vsh()->timestamp = msg->timestamp; }
if (meta->data()) { meta->data()->timestamp = msg->timestamp; }
}
}
srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
// 判断报文是否是AAC sequence header数据
bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size);
bool is_sequence_header = is_aac_sequence_header;
// 如果是AAC sequence header数据,需要和meta对象中保存的数据比较,是否为重复数据
// 服务器只有工作在源站模式才能走到这里,此函数内部执行源站的全部工作,内容多且独立,后面单独分析
// 1) hls->on_audio() HLS处理
// 2) dash->on_audio() DASH处理
// 3)dvr->on_audio() DVR处理
// 4)hds->on_audio() HDS处理
// 5) forwarder->on_audio() 直接向指定站点转发视频数据
hub->on_audio(msg);
// 如果是类似RTMP推流-->WebRTC拉流场景,则有相应的bridger_->on_audio()处理
if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "bridger consume audio");
}
// SRS接收到推流客户端的数据后,在这里将数据复制到每个拉流端的SrsLiveConsumer缓存队列中
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsLiveConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
}
// AVC sequence header(SPS+PPS)数据不需要GOP缓存,在这里直接返回
if (is_sequence_header) { return err; }
gop_cache->cache(msg); // 普通视频帧放入GOP缓存
// 关于ATC,大概描述如下
// SRS默认ATC是关闭,即给客户端的RTMP流永远从0开始。
// 开启ATC之后,RTMP流的时间戳就是ATC时间(即绝对时间),这样的目的大概是为了实现HLS的主备时间一致
if (atc) {
if (meta->vsh()) { meta->vsh()->timestamp = msg->timestamp; }
if (meta->data()) { meta->data()->timestamp = msg->timestamp; }
}
}
srs_error_t SrsLiveConsumer::enqueue(msg)
{
// ATC默认不开启时,音视频数据先进入SrsRtmpJitter对象根据算法检查甚至修改报文的时间戳
// full算法:保证报文的时间戳从0开始,且单调递增
// zero算法:只保证时间戳从0开始
// off算法:不做jitter处理
if (!atc) {
if ((err = jitter->correct(msg, ag)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
// 将最新接收到的报文入队列,在队列内部会判断如果缓存的报文过多,则丢弃一部分旧报文
queue->enqueue(msg, NULL);
if (mw_waiting) { // 为了提供读写效率,此标志表示采用批量写方式
srs_utime_t duration = queue->duration(); // 计算队列总缓存报文的总时间
bool match_min_msgs = queue->size() > mw_min_msgs; // 计算队列总缓存报文的总数量
// For ATC, maybe the SH timestamp bigger than A/V packet,
// when encoder republish or overflow.
// @see https://github.com/ossrs/srs/pull/749
if (atc && duration < 0) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
// 如果队列缓存报文的总时间超过mw_duration门限且缓存报文的总数量超过mw_min_msgs
// 则调用srs_cond_signal(mw_wait)通知拉流(play)协程从队列中取数据
if (match_min_msgs && duration > mw_duration) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
}
}
srs_error_t SrsMessageQueue::enqueue()
{
msgs.push_back(msg); // Message数据包最终进入vector队列
// 设置队列缓存报文的av_start_time 和 av_end_time
if (msg->is_av()) {
if (av_start_time == -1) {
av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
// 默认的最大缓存时间(max_queue_size )一般是30秒
// 如果被修改为<= 0,则表示不做判断,一直缓存
if (max_queue_size <= 0) { return err; }
// 如果队列中缓存报文超过max_queue_size,则通过shrink()函数丢弃旧报文
while (av_end_time - av_start_time > max_queue_size) {
shrink();
}
return err;
}
void SrsMessageQueue::shrink()
{
SrsSharedPtrMessage* video_sh = NULL;
SrsSharedPtrMessage* audio_sh = NULL;
int msgs_size = (int)msgs.size();
// remove all msg
// igone the sequence header
for (int i = 0; i < (int)msgs.size(); i++) {
SrsSharedPtrMessage* msg = msgs.at(i);
if (msg->is_video() && SrsFlvVideo::sh(msg->payload, msg->size)) {
srs_freep(video_sh);
video_sh = msg;
continue;
}
else if (msg->is_audio() && SrsFlvAudio::sh(msg->payload, msg->size)) {
srs_freep(audio_sh);
audio_sh = msg;
continue;
}
srs_freep(msg);
}
msgs.clear();
// update av_start_time
av_start_time = av_end_time;
//push_back secquence header and update timestamp
if (video_sh) {
video_sh->timestamp = srsu2ms(av_end_time);
msgs.push_back(video_sh);
}
if (audio_sh) {
audio_sh->timestamp = srsu2ms(av_end_time);
msgs.push_back(audio_sh);
}
if (!_ignore_shrink) {
srs_trace("shrinking, size=%d, removed=%d, max=%dms", (int)msgs.size(), msgs_size - (int)msgs.size(), srsu2msi(max_queue_size));
}
}
综上,推流端处理逻辑大致处理完成,下一章将继续分析拉流端处理逻辑。
学习总结:
通过分析,我们了解了SRS4.0 RTMP服务模块推流端处理的整体逻辑:
1)推流端一共有两个主要的协程,其中SrsRtmpConn::do_publishing()协程主要工作是统计接收的数据,并检测如果没有接收到新数据,则断开推流端连接。SrsRecvThread::cycle()协程真正接收客户端的推流数据。
2)每个推流客户端对应一个SrsLiveSource对象,每个拉流客户端对应一个SrsLiveConsumer对象。SrsRecvThread::cycle()协程将接收到的RTMP数据包通过SrsLiveSource对象最终复制到每个SrsLiveConsumer对象,最终实现从推流端到拉流端的报文转发。
3)如果SRS服务器工作在edge模式,相关的边缘处理逻辑都封装在SrsPublishEdge类中。
如果服务器工作在orgin模式,相关的处理逻辑(HLS / DVR / Forward)都封装在SrsOriginHub类中。
如果要实现SRS内部的RTMP与WebRTC数据互通,则需要进一步分析ISrsLiveSourceBridger类。
整个过程,可以参考下面的数据流程图。