ZLMediaKit的媒体源创建流程解析

首先我们创建媒体源:

//创建媒体源
 m_media =mk_media_create("__defaultVost__", app.c_str(), name.c_str(), 0, 0, 0);
API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream,
                                             float duration, int hls_enabled, int mp4_enabled) {
    assert(vhost && app && stream);
    MediaHelper::Ptr *obj(new MediaHelper::Ptr(new MediaHelper(vhost, app, stream, duration, hls_enabled, mp4_enabled)));
    (*obj)->attachEvent();
    return (mk_media) obj;
}
//其实就是创建了一个DevChannel
    MediaHelper(ArgsType &&...args){
        _channel = std::make_shared<DevChannel>(std::forward<ArgsType>(args)...);
    }
    //所以就是MultiMediaSourceMuxer对象的创建
class DevChannel  : public MultiMediaSourceMuxer

  int video_codec = 0; // 0:h264
  初始化媒体源,也就是添加通道
  mk_media_init_video(m_media, video_codec, m_param.width, m_param.height,
                        static_cast<float>(m_param.fps));
                        
API_EXPORT void API_CALL mk_media_init_video(mk_media ctx, int codec_id, int width, int height, float fps){
    assert(ctx);
    MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
    VideoInfo info;
    info.codecId = (CodecId)codec_id;
    info.iFrameRate = fps;
    info.iWidth = width;
    info.iHeight = height;
    //最终是通道的initVideo
    (*obj)->getChannel()->initVideo(info);
}

void DevChannel::initVideo(const VideoInfo &info) {
    _video = std::make_shared<VideoInfo>(info);
    switch (info.codecId){
        case CodecH265 : addTrack(std::make_shared<H265Track>()); break;
        //发现就是添加通道而已,通道是在这里创建的
        case CodecH264 : addTrack(std::make_shared<H264Track>()); break;
        default: WarnL << "不支持该类型的视频编码类型:" << info.codecId; break;
    }
}
     
void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) {
    if (CodecL16 == track->getCodecId()) {
        WarnL << "L16音频格式目前只支持RTSP协议推流拉流!!!";
        return;
    }
    //MultiMuxerPrivate::Ptr _muxer;
    _muxer->addTrack(track);
}                   

//这里说一下,一个媒体源对应一个MediaSink                        
void MediaSink::addTrack(const Track::Ptr &track_in) {
    lock_guard<recursive_mutex> lck(_mtx);
    if (_all_track_ready) {
        WarnL << "all track is ready, add this track too late!";
        return;
    }
    //克隆Track,只拷贝其数据,不拷贝其数据转发关系
    auto track = track_in->clone();
    auto codec_id = track->getCodecId();
    //这里添加通道
    _track_map[codec_id] = track;
    //设置通道回调
    _track_ready_callback[codec_id] = [this, track]() {
    //这里执行的是MultiMuxerPrivate中的onTrackReady
        onTrackReady(track);
    };
    _ticker.resetTime();

//设置代理
    track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
        if (_all_track_ready) {
            //    //这里执行的是MultiMuxerPrivate中的onTrackFrame
            onTrackFrame(frame);
        } else {
            //还有Track未就绪,先缓存之
            _frame_unread[frame->getCodecId()].emplace_back(Frame::getCacheAbleFrame(frame));
        }
    }));
}  

设置好以后,下一步添加数据:

API_EXPORT void API_CALL mk_media_input_h264(mk_media ctx, void *data, int len, uint32_t dts, uint32_t pts) {
    assert(ctx && data && len > 0);
    MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx;
    (*obj)->getChannel()->inputH264((char *) data, len, dts, pts);
}

void DevChannel::inputH264(const char *data, int len, uint32_t dts, uint32_t pts) {
    if(dts == 0){
        dts = (uint32_t)_aTicker[0].elapsedTime();
    }
    if(pts == 0){
        pts = dts;
    }

    //由于rtmp/hls/mp4需要缓存时间戳相同的帧,
    //所以使用FrameNoCacheAble类型的帧反而会在转换成FrameCacheAble时多次内存拷贝
    //在此处只拷贝一次,性能开销更低
    auto frame = FrameImp::create<H264Frame>();
    frame->_dts = dts;
    frame->_pts = pts;
    frame->_buffer.assign(data, len);
    frame->_prefix_size = prefixSize(data,len);
    inputFrame(frame);
}


void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) {
    GET_CONFIG(bool, modify_stamp, General::kModifyStamp);
    auto frame = frame_in;
    if (modify_stamp) {
        //开启了时间戳覆盖
        frame = std::make_shared<FrameModifyStamp>(frame, _stamp[frame->getTrackType()]);
    }
    // MultiMuxerPrivate::Ptr _muxer;
    _muxer->inputFrame(frame);

#if defined(ENABLE_RTPPROXY)
    lock_guard<mutex> lck(_rtp_sender_mtx);
    for (auto &pr : _rtp_sender) {
        pr.second->inputFrame(frame);
    }
#endif //ENABLE_RTPPROXY

}


void MediaSink::inputFrame(const Frame::Ptr &frame) {
    lock_guard<recursive_mutex> lck(_mtx);
    auto it = _track_map.find(frame->getCodecId());
    if (it == _track_map.end()) {
        return;
    }
    //这里是H264track的inputFrame
    it->second->inputFrame(frame);
    checkTrackIfReady(nullptr);
}

void H264Track::inputFrame(const Frame::Ptr &frame) {
    using H264FrameInternal = FrameInternal<H264FrameNoCacheAble>;

    int type = H264_TYPE(*((uint8_t *) frame->data() + frame->prefixSize()));
    if (type != H264Frame::NAL_B_P && type != H264Frame::NAL_IDR) {
        //非I/B/P帧情况下,split一下,防止多个帧粘合在一起
        splitH264(frame->data(), frame->size(), frame->prefixSize(), [&](const char *ptr, size_t len, size_t prefix) {
            H264FrameInternal::Ptr sub_frame = std::make_shared<H264FrameInternal>(frame, (char *) ptr, len, prefix);
            inputFrame_l(sub_frame);
        });
    } else {
        inputFrame_l(frame);
    }
}

//通道中解析sps,pps
void H264Track::inputFrame_l(const Frame::Ptr &frame){
    int type = H264_TYPE(*((uint8_t *) frame->data() + frame->prefixSize()));
    switch (type) {
        case H264Frame::NAL_SPS: {
            _sps = string(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize());
            break;
        }
        case H264Frame::NAL_PPS: {
            _pps = string(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize());
            break;
        }
        case H264Frame::NAL_IDR: {
            insertConfigFrame(frame);
            VideoTrack::inputFrame(frame);
            break;
        }
        case H264Frame::NAL_AUD: {
            //忽略AUD帧;
            break;
        }

        default:
            VideoTrack::inputFrame(frame);
            break;
    }

    _is_idr = type == H264Frame::NAL_IDR;
    if (_width == 0 && ready()) {
        onReady();
    }
}


void H264Track::insertConfigFrame(const Frame::Ptr &frame){
    if(_is_idr){
        return;
    }

    if(!_sps.empty()){
        auto spsFrame = FrameImp::create<H264Frame>();
        spsFrame->_prefix_size = 4;
        spsFrame->_buffer.assign("\x00\x00\x00\x01",4);
        spsFrame->_buffer.append(_sps);
        spsFrame->_dts = frame->dts();
        VideoTrack::inputFrame(spsFrame);
    }

    if(!_pps.empty()){
        auto ppsFrame = FrameImp::create<H264Frame>();
        ppsFrame->_prefix_size = 4;
        ppsFrame->_buffer.assign("\x00\x00\x00\x01",4);
        ppsFrame->_buffer.append(_pps);
        ppsFrame->_dts = frame->dts();
        VideoTrack::inputFrame(ppsFrame);
    }
}


void MediaSink::checkTrackIfReady(const Track::Ptr &track){
    if (!_all_track_ready && !_track_ready_callback.empty()) {
        if (track) {
            checkTrackIfReady_l(track);
        } else {
            for (auto &pr : _track_map) {
                checkTrackIfReady_l(pr.second);
            }
        }
    }

    if(!_all_track_ready){
        if(_ticker.elapsedTime() > MAX_WAIT_MS_READY){
            //如果超过规定时间,那么不再等待并忽略未准备好的Track
            emitAllTrackReady();
            return;
        }

        if(!_track_ready_callback.empty()){
            //在超时时间内,如果存在未准备好的Track,那么继续等待
            return;
        }

        if(_track_map.size() == _max_track_size){
            //如果已经添加了音视频Track,并且不存在未准备好的Track,那么说明所有Track都准备好了
            emitAllTrackReady();
            return;
        }

        if(_track_map.size() == 1 && _ticker.elapsedTime() > MAX_WAIT_MS_ADD_TRACK){
            //如果只有一个Track,那么在该Track添加后,我们最多还等待若干时间(可能后面还会添加Track)
            emitAllTrackReady();
            return;
        }
    }
}


void MediaSink::checkTrackIfReady_l(const Track::Ptr &track){
    //Track由未就绪状态转换成就绪状态,我们就触发onTrackReady回调
    auto it_callback = _track_ready_callback.find(track->getCodecId());
    if (it_callback != _track_ready_callback.end() && track->ready()) {
        it_callback->second();
        _track_ready_callback.erase(it_callback);
    }
}

bool H264Track::ready() {
    return !_sps.empty() && !_pps.empty();
}


void MediaSink::emitAllTrackReady() {
    if (_all_track_ready) {
        return;
    }

    DebugL << "all track ready use " << _ticker.elapsedTime() << "ms";
    if (!_track_ready_callback.empty()) {
        //这是超时强制忽略未准备好的Track
        _track_ready_callback.clear();
        //移除未准备好的Track
        for (auto it = _track_map.begin(); it != _track_map.end();) {
            if (!it->second->ready()) {
                WarnL << "track not ready for a long time, ignored: " << it->second->getCodecName();
                it = _track_map.erase(it);
                continue;
            }
            ++it;
        }
    }

    if (!_track_map.empty()) {
        //最少有一个有效的Track
        _all_track_ready = true;
        onAllTrackReady();

        //全部Track就绪,我们一次性把之前的帧输出
        for(auto &pr : _frame_unread){
            if (_track_map.find(pr.first) == _track_map.end()) {
                //该Track已经被移除
                continue;
            }
            pr.second.for_each([&](const Frame::Ptr &frame) {
                onTrackFrame(frame);
            });
        }
        _frame_unread.clear();
    }
}


void MultiMuxerPrivate::onAllTrackReady() {
    if (_rtmp) {
        _rtmp->onAllTrackReady();
    }
    if (_rtsp) {
        _rtsp->onAllTrackReady();
    }
#if defined(ENABLE_MP4)
    if (_fmp4) {
        _fmp4->onAllTrackReady();
    }
#endif
    if (_track_listener) {
        _track_listener->onAllTrackReady();
    }
    InfoL << "stream: " << _stream_url << " , codec info: " << getTrackInfoStr(this);
}

   void onAllTrackReady(){
        _media_src->setSdp(getSdp());
    }
上一篇:MP4解析


下一篇:回溯法