WebRTC进阶流媒体服务器开发(六)Mediasoup源码分析之Mediasoup主业务流程

一:主业务的创建

主要场景是对房间的管理,多方进行音视频互动。

WebRTC进阶流媒体服务器开发(六)Mediasoup源码分析之Mediasoup主业务流程

Router代表房间,Transport代表一个传输,每个用户加入房间都会创建一个对应的连接。
Producer生产者,共享的音视频流中,每个音频、视频流都会产生一个生产者
Consumer消费者,对于每个加入房间的用户,都可以消费其他用户的音视频数据

1.首先调用CreateRouter,创建房间Router,然后加入worker的管理列表中。对于每个worker都会包含多个Router

2.创建Router之后,调用CreateTransport创建Transport,返回accept告诉Router创建transport成功,返回给Router

3.Router发送connect信令给Transport,连接创建成功

4.连接创建成功之后,调用CreateProducer创建生产者(可以创建多个)

5.如果需要获取其他用户的数据,则需要去CreateConsumer创建消费者

二:主业务源码分析

见worker.cpp中OnChannelRequest方法,处理请求之后,使用Request->Accept(data);返回确认消息

inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request)
{
    MS_TRACE();

    MS_DEBUG_DEV(
      "Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id);

    switch (request->methodId)
    {
        case Channel::ChannelRequest::MethodId::WORKER_CLOSE:
        {
            if (this->closed)
                return;

            MS_DEBUG_DEV("Worker close request, stopping");

            Close();

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_DUMP:
        {
            json data = json::object();

            FillJson(data);

            request->Accept(data);

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE:
        {
            json data = json::object();

            FillJsonResourceUsage(data);

            request->Accept(data);

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_UPDATE_SETTINGS:
        {
            Settings::HandleRequest(request);

            break;
        }

        case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER:  //创建房间
        {
            std::string routerId; 
            // This may throw.
            SetNewRouterIdFromInternal(request->internal, routerId);   //根据request获取routerId(在应用层随机产生的)
            auto* router = new RTC::Router(routerId);   //创建router
            this->mapRouters[routerId] = router;   //存放在字典中
            MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str()); 
            request->Accept();   //回复消息,创建完成
            break;
        }

        case Channel::ChannelRequest::MethodId::ROUTER_CLOSE:
        {
            // This may throw.
            RTC::Router* router = GetRouterFromInternal(request->internal);

            // Remove it from the map and delete it.
            this->mapRouters.erase(router->id);
            delete router;

            MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());

            request->Accept();

            break;
        }

        // Any other request must be delivered to the corresponding Router.
        default:  //其他的信令处理,比如创建transport,accept没有在这里处理,在HandleRequest中调用!!!!
        {
            // This may throw.
            RTC::Router* router = GetRouterFromInternal(request->internal);

            router->HandleRequest(request);

            break;
        }
    }
}

(一)进入Router.cpp中----创建Transport

    void Router::SetNewTransportIdFromInternal(json& internal, std::string& transportId) const
    {
        MS_TRACE();

        auto jsonTransportIdIt = internal.find("transportId");  //查找transportId,在jsonRequest中的5元组中的transportId中
if (jsonTransportIdIt == internal.end() || !jsonTransportIdIt->is_string()) MS_THROW_ERROR("missing internal.transportId"); transportId.assign(jsonTransportIdIt->get<std::string>());  //赋值 if (this->mapTransports.find(transportId) != this->mapTransports.end()) MS_THROW_ERROR("a Transport with same transportId already exists"); }
    void Router::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {
            case Channel::ChannelRequest::MethodId::ROUTER_DUMP:
            {
                json data = json::object();

                FillJson(data);

                request->Accept(data);

                break;
            }

            case Channel::ChannelRequest::MethodId::ROUTER_CREATE_WEBRTC_TRANSPORT:  //多个transport,这里主要查看WebRtcTransport
            {
                std::string transportId;

                // This may throw.
                SetNewTransportIdFromInternal(request->internal, transportId);  //从request中获取transportId,存放到变量transportId中

                // This may throw.
                auto* webRtcTransport = new RTC::WebRtcTransport(transportId, this, request->data);  //创建WebRtcTransport对象,传入transportId和request5元组中的data数据

                // Insert into the map.
                this->mapTransports[transportId] = webRtcTransport;  //放入到map中去

                MS_DEBUG_DEV("WebRtcTransport created [transportId:%s]", transportId.c_str());

                json data = json::object();  //声明一个JSON格式对象

                webRtcTransport->FillJson(data);  //开始将webRtcTransport中的响应,返回确认消息到data

                request->Accept(data);  //将接受的数据返回给JS层

                break;
            }
            case Channel::ChannelRequest::MethodId::ROUTER_CREATE_PLAIN_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_PIPE_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_DIRECT_TRANSPORT:case Channel::ChannelRequest::MethodId::ROUTER_CREATE_AUDIO_LEVEL_OBSERVER:case Channel::ChannelRequest::MethodId::TRANSPORT_CLOSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_CLOSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_PAUSE:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_RESUME:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_ADD_PRODUCER:case Channel::ChannelRequest::MethodId::RTP_OBSERVER_REMOVE_PRODUCER:// Any other request must be delivered to the corresponding Transport.
            default:
            {
                // This may throw.
                RTC::Transport* transport = GetTransportFromInternal(request->internal);

                transport->HandleRequest(request);

                break;
            }
        }
    }

查看WebRtcTransport.cpp文件,构造函数

    WebRtcTransport::WebRtcTransport(const std::string& id, RTC::Transport::Listener* listener, json& data)
      : RTC::Transport::Transport(id, listener, data)
    {
     //主要实现JSON数据的解析---request->data就是data
        bool enableUdp{ true };
        auto jsonEnableUdpIt = data.find("enableUdp");bool enableTcp{ false };
        auto jsonEnableTcpIt = data.find("enableTcp");bool preferUdp{ false };
        auto jsonPreferUdpIt = data.find("preferUdp");bool preferTcp{ false };
        auto jsonPreferTcpIt = data.find("preferTcp");
        auto jsonListenIpsIt = data.find("listenIps");
        std::vector<ListenIp> listenIps(jsonListenIpsIt->size());

        for (size_t i{ 0 }; i < jsonListenIpsIt->size(); ++i)
        {
            auto& jsonListenIp = (*jsonListenIpsIt)[i];
            auto& listenIp     = listenIps[i];

            if (!jsonListenIp.is_object())
                MS_THROW_TYPE_ERROR("wrong listenIp (not an object)");

            auto jsonIpIt = jsonListenIp.find("ip");

            if (jsonIpIt == jsonListenIp.end())
                MS_THROW_TYPE_ERROR("missing listenIp.ip");
            else if (!jsonIpIt->is_string())
                MS_THROW_TYPE_ERROR("wrong listenIp.ip (not an string");

            listenIp.ip.assign(jsonIpIt->get<std::string>());

            // This may throw.
            Utils::IP::NormalizeIp(listenIp.ip);

            auto jsonAnnouncedIpIt = jsonListenIp.find("announcedIp");

            if (jsonAnnouncedIpIt != jsonListenIp.end())
            {
                if (!jsonAnnouncedIpIt->is_string())
                    MS_THROW_TYPE_ERROR("wrong listenIp.announcedIp (not an string)");

                listenIp.announcedIp.assign(jsonAnnouncedIpIt->get<std::string>());
            }
        }

        try
        {
            uint16_t iceLocalPreferenceDecrement{ 0 };

            if (enableUdp && enableTcp)
                this->iceCandidates.reserve(2 * jsonListenIpsIt->size());
            else
                this->iceCandidates.reserve(jsonListenIpsIt->size());

            for (auto& listenIp : listenIps)
            {
                if (enableUdp)
                {
                    uint16_t iceLocalPreference =
                      IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement;

                    if (preferUdp)
                        iceLocalPreference += 1000;

                    uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference);

                    // This may throw.
                    auto* udpSocket = new RTC::UdpSocket(this, listenIp.ip);

                    this->udpSockets[udpSocket] = listenIp.announcedIp;

                    if (listenIp.announcedIp.empty())
                        this->iceCandidates.emplace_back(udpSocket, icePriority);
                    else
                        this->iceCandidates.emplace_back(udpSocket, icePriority, listenIp.announcedIp);
                }

                if (enableTcp)
                {
                    uint16_t iceLocalPreference =
                      IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement;

                    if (preferTcp)
                        iceLocalPreference += 1000;

                    uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference);

                    // This may throw.
                    auto* tcpServer = new RTC::TcpServer(this, this, listenIp.ip);

                    this->tcpServers[tcpServer] = listenIp.announcedIp;

                    if (listenIp.announcedIp.empty())
                        this->iceCandidates.emplace_back(tcpServer, icePriority);
                    else
                        this->iceCandidates.emplace_back(tcpServer, icePriority, listenIp.announcedIp);
                }

                // Decrement initial ICE local preference for next IP.
                iceLocalPreferenceDecrement += 100;
            }

            // Create a ICE server.
            this->iceServer = new RTC::IceServer(
              this, Utils::Crypto::GetRandomString(16), Utils::Crypto::GetRandomString(32));

            // Create a DTLS transport.
            this->dtlsTransport = new RTC::DtlsTransport(this);
        }
        catch (const MediaSoupError& error)
        {
            // Must delete everything since the destructor won't be called.
        }
    }

Mediasoup手册:https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransport

(二)进入Router.cpp中----创建connect连接

    void Router::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {
       //因为connect不属于Router,所以最后也是进入default中去!!!
            // Any other request must be delivered to the corresponding Transport.
            default:
            {
                // This may throw.
                RTC::Transport* transport = GetTransportFromInternal(request->internal);  //从request中获取transport对象

                transport->HandleRequest(request);

                break;
            }
        }
    }

进入WebRtcTransport.cpp中去,查看对应的HandleRequest方法,创建connect连接!

    void WebRtcTransport::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {
            case Channel::ChannelRequest::MethodId::TRANSPORT_CONNECT:  //涉及到很多协议!
            {
                // Ensure this method is not called twice.
                if (this->connectCalled)
                    MS_THROW_ERROR("connect() already called");

                RTC::DtlsTransport::Fingerprint dtlsRemoteFingerprint;
                RTC::DtlsTransport::Role dtlsRemoteRole;

                auto jsonDtlsParametersIt = request->data.find("dtlsParameters");

                if (jsonDtlsParametersIt == request->data.end() || !jsonDtlsParametersIt->is_object())
                    MS_THROW_TYPE_ERROR("missing dtlsParameters");

                auto jsonFingerprintsIt = jsonDtlsParametersIt->find("fingerprints");

                if (jsonFingerprintsIt == jsonDtlsParametersIt->end() || !jsonFingerprintsIt->is_array())
                {
                    MS_THROW_TYPE_ERROR("missing dtlsParameters.fingerprints");
                }
                else if (jsonFingerprintsIt->empty())
                {
                    MS_THROW_TYPE_ERROR("empty dtlsParameters.fingerprints array");
                }

                // NOTE: Just take the first fingerprint.
                for (auto& jsonFingerprint : *jsonFingerprintsIt)
                {
                    if (!jsonFingerprint.is_object())
                        MS_THROW_TYPE_ERROR("wrong entry in dtlsParameters.fingerprints (not an object)");

                    auto jsonAlgorithmIt = jsonFingerprint.find("algorithm");

                    if (jsonAlgorithmIt == jsonFingerprint.end())
                        MS_THROW_TYPE_ERROR("missing fingerprint.algorithm");
                    else if (!jsonAlgorithmIt->is_string())
                        MS_THROW_TYPE_ERROR("wrong fingerprint.algorithm (not a string)");

                    dtlsRemoteFingerprint.algorithm =
                      RTC::DtlsTransport::GetFingerprintAlgorithm(jsonAlgorithmIt->get<std::string>());

                    if (dtlsRemoteFingerprint.algorithm == RTC::DtlsTransport::FingerprintAlgorithm::NONE)
                    {
                        MS_THROW_TYPE_ERROR("invalid fingerprint.algorithm value");
                    }

                    auto jsonValueIt = jsonFingerprint.find("value");

                    if (jsonValueIt == jsonFingerprint.end())
                        MS_THROW_TYPE_ERROR("missing fingerprint.value");
                    else if (!jsonValueIt->is_string())
                        MS_THROW_TYPE_ERROR("wrong fingerprint.value (not a string)");

                    dtlsRemoteFingerprint.value = jsonValueIt->get<std::string>();

                    // Just use the first fingerprint.
                    break;
                }

                auto jsonRoleIt = jsonDtlsParametersIt->find("role");

                if (jsonRoleIt != jsonDtlsParametersIt->end())
                {
                    if (!jsonRoleIt->is_string())
                        MS_THROW_TYPE_ERROR("wrong dtlsParameters.role (not a string)");

                    dtlsRemoteRole = RTC::DtlsTransport::StringToRole(jsonRoleIt->get<std::string>());

                    if (dtlsRemoteRole == RTC::DtlsTransport::Role::NONE)
                        MS_THROW_TYPE_ERROR("invalid dtlsParameters.role value");
                }
                else
                {
                    dtlsRemoteRole = RTC::DtlsTransport::Role::AUTO;
                }

                // Set local DTLS role.
                switch (dtlsRemoteRole)
                {
                    case RTC::DtlsTransport::Role::CLIENT:
                    {
                        this->dtlsRole = RTC::DtlsTransport::Role::SERVER;

                        break;
                    }
                    // If the peer has role "auto" we become "client" since we are ICE controlled.
                    case RTC::DtlsTransport::Role::SERVER:
                    case RTC::DtlsTransport::Role::AUTO:
                    {
                        this->dtlsRole = RTC::DtlsTransport::Role::CLIENT;

                        break;
                    }
                    case RTC::DtlsTransport::Role::NONE:
                    {
                        MS_THROW_TYPE_ERROR("invalid remote DTLS role");
                    }
                }

                this->connectCalled = true;

                // Pass the remote fingerprint to the DTLS transport.
                if (this->dtlsTransport->SetRemoteFingerprint(dtlsRemoteFingerprint))
                {
                    // If everything is fine, we may run the DTLS transport if ready.
                    MayRunDtlsTransport();
                }

                // Tell the caller about the selected local DTLS role.
                json data = json::object();

                switch (this->dtlsRole)
                {
                    case RTC::DtlsTransport::Role::CLIENT:
                        data["dtlsLocalRole"] = "client";
                        break;

                    case RTC::DtlsTransport::Role::SERVER:
                        data["dtlsLocalRole"] = "server";
                        break;

                    default:
                        MS_ABORT("invalid local DTLS role");
                }

                request->Accept(data);

                break;
            }

            case Channel::ChannelRequest::MethodId::TRANSPORT_RESTART_ICE:
            {
                std::string usernameFragment = Utils::Crypto::GetRandomString(16);
                std::string password         = Utils::Crypto::GetRandomString(32);

                this->iceServer->SetUsernameFragment(usernameFragment);
                this->iceServer->SetPassword(password);

                MS_DEBUG_DEV(
                  "WebRtcTransport ICE usernameFragment and password changed [id:%s]", this->id.c_str());

                // Reply with the updated ICE local parameters.
                json data = json::object();

                data["iceParameters"]    = json::object();
                auto jsonIceParametersIt = data.find("iceParameters");

                (*jsonIceParametersIt)["usernameFragment"] = this->iceServer->GetUsernameFragment();
                (*jsonIceParametersIt)["password"]         = this->iceServer->GetPassword();
                (*jsonIceParametersIt)["iceLite"]          = true;

                request->Accept(data);

                break;
            }

            default:
            {
                // Pass it to the parent class.
                RTC::Transport::HandleRequest(request);
            }
        }
    }

(三)进入WebRtcTransport.cpp中----使用CreateProducer创建生产者(消费者)

            default:  //如前面的WebRtcTransport中的是我itch所示,进入default中去,调用父类方法
            {
                // Pass it to the parent class.
                RTC::Transport::HandleRequest(request);
            }

进入Transport.cpp文件中去!,创建生产者和消费者(两个方式一致)

    void Transport::HandleRequest(Channel::ChannelRequest* request)
    {
        MS_TRACE();

        switch (request->methodId)
        {

            case Channel::ChannelRequest::MethodId::TRANSPORT_PRODUCE:  //创建生产者
            {
                std::string producerId;

                // This may throw.
                SetNewProducerIdFromInternal(request->internal, producerId);  //获取producerId

                // This may throw.
                auto* producer = new RTC::Producer(producerId, this, request->data);  //根据上面的Id创建producer生产者

                // Insert the Producer into the RtpListener.
                // This may throw. If so, delete the Producer and throw.
                try
                {
                    this->rtpListener.AddProducer(producer);  //加入数据
                }
                catch (const MediaSoupError& error)
                {
                    delete producer;

                    throw;
                }

                // Notify the listener.
                // This may throw if a Producer with same id already exists.
                try
                {
                    this->listener->OnTransportNewProducer(this, producer);  //router中加入生产者
                }
                catch (const MediaSoupError& error)
                {
                    this->rtpListener.RemoveProducer(producer);

                    delete producer;

                    throw;
                }

                // Insert into the map.
                this->mapProducers[producerId] = producer;

                MS_DEBUG_DEV("Producer created [producerId:%s]", producerId.c_str());

                // Take the transport related RTP header extensions of the Producer and
                // add them to the Transport.
                // NOTE: Producer::GetRtpHeaderExtensionIds() returns the original
                // header extension ids of the Producer (and not their mapped values).
                const auto& producerRtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();

                if (producerRtpHeaderExtensionIds.mid != 0u)
                {
                    this->recvRtpHeaderExtensionIds.mid = producerRtpHeaderExtensionIds.mid;
                }

                if (producerRtpHeaderExtensionIds.rid != 0u)
                {
                    this->recvRtpHeaderExtensionIds.rid = producerRtpHeaderExtensionIds.rid;
                }

                if (producerRtpHeaderExtensionIds.rrid != 0u)
                {
                    this->recvRtpHeaderExtensionIds.rrid = producerRtpHeaderExtensionIds.rrid;
                }

                if (producerRtpHeaderExtensionIds.absSendTime != 0u)
                {
                    this->recvRtpHeaderExtensionIds.absSendTime = producerRtpHeaderExtensionIds.absSendTime;
                }

                if (producerRtpHeaderExtensionIds.transportWideCc01 != 0u)
                {
                    this->recvRtpHeaderExtensionIds.transportWideCc01 =
                      producerRtpHeaderExtensionIds.transportWideCc01;
                }

                // Create status response.
                json data = json::object();

                data["type"] = RTC::RtpParameters::GetTypeString(producer->GetType());

                request->Accept(data);

                // Check if TransportCongestionControlServer or REMB server must be
                // created.
                const auto& rtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();
                const auto& codecs                = producer->GetRtpParameters().codecs;

                // Set TransportCongestionControlServer.
                if (!this->tccServer)
                {
                    bool createTccServer{ false };
                    RTC::BweType bweType;

                    // Use transport-cc if:
                    // - there is transport-wide-cc-01 RTP header extension, and
                    // - there is "transport-cc" in codecs RTCP feedback.
                    //
                    // clang-format off
                    if (
                        rtpHeaderExtensionIds.transportWideCc01 != 0u &&
                        std::any_of(
                            codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
                            {
                                return std::any_of(
                                    codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
                                    {
                                        return fb.type == "transport-cc";
                                    });
                            })
                    )
                    // clang-format on
                    {
                        MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with transport-cc");

                        createTccServer = true;
                        bweType         = RTC::BweType::TRANSPORT_CC;
                    }
                    // Use REMB if:
                    // - there is abs-send-time RTP header extension, and
                    // - there is "remb" in codecs RTCP feedback.
                    //
                    // clang-format off
                    else if (
                        rtpHeaderExtensionIds.absSendTime != 0u &&
                        std::any_of(
                            codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
                            {
                                return std::any_of(
                                    codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
                                    {
                                        return fb.type == "goog-remb";
                                    });
                            })
                    )
                    // clang-format on
                    {
                        MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with REMB");

                        createTccServer = true;
                        bweType         = RTC::BweType::REMB;
                    }

                    if (createTccServer)
                    {
                        this->tccServer = new RTC::TransportCongestionControlServer(this, bweType, RTC::MtuSize);

                        if (this->maxIncomingBitrate != 0u)
                            this->tccServer->SetMaxIncomingBitrate(this->maxIncomingBitrate);

                        if (IsConnected())
                            this->tccServer->TransportConnected();
                    }
                }

                break;
            }

进入Producer.cpp中去创建Producer生产者:https://mediasoup.org/documentation/v3/mediasoup/api/#Producer

WebRTC进阶流媒体服务器开发(六)Mediasoup源码分析之Mediasoup主业务流程 

    Producer::Producer(const std::string& id, RTC::Producer::Listener* listener, json& data)
      : id(id), listener(listener)
    {
        MS_TRACE();

        auto jsonKindIt = data.find("kind");  //对JSON数据进行解析!---kind包含音视频---下面层层解析!!!

        if (jsonKindIt == data.end() || !jsonKindIt->is_string())
        {
            MS_THROW_TYPE_ERROR("missing kind");
        }

        // This may throw.
        this->kind = RTC::Media::GetKind(jsonKindIt->get<std::string>());

        if (this->kind == RTC::Media::Kind::ALL)
        {
            MS_THROW_TYPE_ERROR("invalid empty kind");
        }

        auto jsonRtpParametersIt = data.find("rtpParameters");

        if (jsonRtpParametersIt == data.end() || !jsonRtpParametersIt->is_object())
        {
            MS_THROW_TYPE_ERROR("missing rtpParameters");
        }

        // This may throw.
        this->rtpParameters = RTC::RtpParameters(*jsonRtpParametersIt);

        // Evaluate type.
        this->type = RTC::RtpParameters::GetType(this->rtpParameters);

        // Reserve a slot in rtpStreamByEncodingIdx and rtpStreamsScores vectors
        // for each RTP stream.
        this->rtpStreamByEncodingIdx.resize(this->rtpParameters.encodings.size(), nullptr);
        this->rtpStreamScores.resize(this->rtpParameters.encodings.size(), 0u);

        auto& encoding   = this->rtpParameters.encodings[0];
        auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);

        if (!RTC::Codecs::Tools::IsValidTypeForCodec(this->type, mediaCodec->mimeType))
        {
            MS_THROW_TYPE_ERROR(
              "%s codec not supported for %s",
              mediaCodec->mimeType.ToString().c_str(),
              RTC::RtpParameters::GetTypeString(this->type).c_str());
        }

        auto jsonRtpMappingIt = data.find("rtpMapping");

        if (jsonRtpMappingIt == data.end() || !jsonRtpMappingIt->is_object())
        {
            MS_THROW_TYPE_ERROR("missing rtpMapping");
        }

        auto jsonCodecsIt = jsonRtpMappingIt->find("codecs");  //获取编码器

        if (jsonCodecsIt == jsonRtpMappingIt->end() || !jsonCodecsIt->is_array())
        {
            MS_THROW_TYPE_ERROR("missing rtpMapping.codecs");
        }

        for (auto& codec : *jsonCodecsIt)
        {
            if (!codec.is_object())
            {
                MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (not an object)");
            }

            auto jsonPayloadTypeIt = codec.find("payloadType");

            // clang-format off
            if (
                jsonPayloadTypeIt == codec.end() ||
                !Utils::Json::IsPositiveInteger(*jsonPayloadTypeIt)
            )
            // clang-format on
            {
                MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (missing payloadType)");
            }

            auto jsonMappedPayloadTypeIt = codec.find("mappedPayloadType");

            // clang-format off
            if (
                jsonMappedPayloadTypeIt == codec.end() ||
                !Utils::Json::IsPositiveInteger(*jsonMappedPayloadTypeIt)
            )
            // clang-format on
            {
                MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (missing mappedPayloadType)");
            }

            this->rtpMapping.codecs[jsonPayloadTypeIt->get<uint8_t>()] =
              jsonMappedPayloadTypeIt->get<uint8_t>();
        }

        auto jsonEncodingsIt = jsonRtpMappingIt->find("encodings");

        if (jsonEncodingsIt == jsonRtpMappingIt->end() || !jsonEncodingsIt->is_array())
        {
            MS_THROW_TYPE_ERROR("missing rtpMapping.encodings");
        }

        this->rtpMapping.encodings.reserve(jsonEncodingsIt->size());

        for (auto& encoding : *jsonEncodingsIt)
        {
            if (!encoding.is_object())
            {
                MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings");
            }

            this->rtpMapping.encodings.emplace_back();

            auto& encodingMapping = this->rtpMapping.encodings.back();

            // ssrc is optional.
            auto jsonSsrcIt = encoding.find("ssrc");

            // clang-format off
            if (
                jsonSsrcIt != encoding.end() &&
                Utils::Json::IsPositiveInteger(*jsonSsrcIt)
            )
            // clang-format on
            {
                encodingMapping.ssrc = jsonSsrcIt->get<uint32_t>();
            }

            // rid is optional.
            auto jsonRidIt = encoding.find("rid");

            if (jsonRidIt != encoding.end() && jsonRidIt->is_string())
            {
                encodingMapping.rid = jsonRidIt->get<std::string>();
            }

            // However ssrc or rid must be present (if more than 1 encoding).
            // clang-format off
            if (
                jsonEncodingsIt->size() > 1 &&
                jsonSsrcIt == encoding.end() &&
                jsonRidIt == encoding.end()
            )
            // clang-format on
            {
                MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings (missing ssrc or rid)");
            }

            // If there is no mid and a single encoding, ssrc or rid must be present.
            // clang-format off
            if (
                this->rtpParameters.mid.empty() &&
                jsonEncodingsIt->size() == 1 &&
                jsonSsrcIt == encoding.end() &&
                jsonRidIt == encoding.end()
            )
            // clang-format on
            {
                MS_THROW_TYPE_ERROR(
                  "wrong entry in rtpMapping.encodings (missing ssrc or rid, or rtpParameters.mid)");
            }

            // mappedSsrc is mandatory.
            auto jsonMappedSsrcIt = encoding.find("mappedSsrc");

            // clang-format off
            if (
                jsonMappedSsrcIt == encoding.end() ||
                !Utils::Json::IsPositiveInteger(*jsonMappedSsrcIt)
            )
            // clang-format on
            {
                MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings (missing mappedSsrc)");
            }

            encodingMapping.mappedSsrc = jsonMappedSsrcIt->get<uint32_t>();
        }

        auto jsonPausedIt = data.find("paused");

        if (jsonPausedIt != data.end() && jsonPausedIt->is_boolean())
        {
            this->paused = jsonPausedIt->get<bool>();
        }

        // The number of encodings in rtpParameters must match the number of encodings
        // in rtpMapping.
        if (this->rtpParameters.encodings.size() != this->rtpMapping.encodings.size())
        {
            MS_THROW_TYPE_ERROR("rtpParameters.encodings size does not match rtpMapping.encodings size");
        }

        // Fill RTP header extension ids.
        // This may throw.
        for (auto& exten : this->rtpParameters.headerExtensions)
        {
            if (exten.id == 0u)
            {
                MS_THROW_TYPE_ERROR("RTP extension id cannot be 0");
            }

            if (this->rtpHeaderExtensionIds.mid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::MID)
            {
                this->rtpHeaderExtensionIds.mid = exten.id;
            }

            if (this->rtpHeaderExtensionIds.rid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::RTP_STREAM_ID)
            {
                this->rtpHeaderExtensionIds.rid = exten.id;
            }

            if (this->rtpHeaderExtensionIds.rrid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::REPAIRED_RTP_STREAM_ID)
            {
                this->rtpHeaderExtensionIds.rrid = exten.id;
            }

            if (this->rtpHeaderExtensionIds.absSendTime == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::ABS_SEND_TIME)
            {
                this->rtpHeaderExtensionIds.absSendTime = exten.id;
            }

            if (this->rtpHeaderExtensionIds.transportWideCc01 == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::TRANSPORT_WIDE_CC_01)
            {
                this->rtpHeaderExtensionIds.transportWideCc01 = exten.id;
            }

            // NOTE: Remove this once framemarking draft becomes RFC.
            if (this->rtpHeaderExtensionIds.frameMarking07 == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING_07)
            {
                this->rtpHeaderExtensionIds.frameMarking07 = exten.id;
            }

            if (this->rtpHeaderExtensionIds.frameMarking == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING)
            {
                this->rtpHeaderExtensionIds.frameMarking = exten.id;
            }

            if (this->rtpHeaderExtensionIds.ssrcAudioLevel == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::SSRC_AUDIO_LEVEL)
            {
                this->rtpHeaderExtensionIds.ssrcAudioLevel = exten.id;
            }

            if (this->rtpHeaderExtensionIds.videoOrientation == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::VIDEO_ORIENTATION)
            {
                this->rtpHeaderExtensionIds.videoOrientation = exten.id;
            }

            if (this->rtpHeaderExtensionIds.toffset == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::TOFFSET)
            {
                this->rtpHeaderExtensionIds.toffset = exten.id;
            }
        }

        // Set the RTCP report generation interval.
        if (this->kind == RTC::Media::Kind::AUDIO)
            this->maxRtcpInterval = RTC::RTCP::MaxAudioIntervalMs;
        else
            this->maxRtcpInterval = RTC::RTCP::MaxVideoIntervalMs;

        // Create a KeyFrameRequestManager.
        if (this->kind == RTC::Media::Kind::VIDEO)
        {
            auto jsonKeyFrameRequestDelayIt = data.find("keyFrameRequestDelay");
            uint32_t keyFrameRequestDelay   = 0u;

            // clang-format off
            if (
                jsonKeyFrameRequestDelayIt != data.end() &&
                jsonKeyFrameRequestDelayIt->is_number_integer()
            )
            // clang-format on
            {
                keyFrameRequestDelay = jsonKeyFrameRequestDelayIt->get<uint32_t>();
            }

            this->keyFrameRequestManager = new RTC::KeyFrameRequestManager(this, keyFrameRequestDelay);
        }
    }

进入router.cpp中查看添加生产者方法

    inline void Router::OnTransportNewProducer(RTC::Transport* /*transport*/, RTC::Producer* producer)
    {
        MS_TRACE();

        MS_ASSERT(
          this->mapProducerConsumers.find(producer) == this->mapProducerConsumers.end(),
          "Producer already present in mapProducerConsumers");

        if (this->mapProducers.find(producer->id) != this->mapProducers.end())
        {
            MS_THROW_ERROR("Producer already present in mapProducers [producerId:%s]", producer->id.c_str());
        }

        // Insert the Producer in the maps.
        this->mapProducers[producer->id] = producer;  //添加到map中去!
        this->mapProducerConsumers[producer];
        this->mapProducerRtpObservers[producer];
    }

(四)进入Transport.cpp中----查看TRANSPORT_CONSUME创建消费者

            case Channel::ChannelRequest::MethodId::TRANSPORT_CONSUME:
            {
                auto jsonProducerIdIt = request->internal.find("producerId");  //获取生产者id

                if (jsonProducerIdIt == request->internal.end() || !jsonProducerIdIt->is_string())
                    MS_THROW_ERROR("missing internal.producerId");

                std::string producerId = jsonProducerIdIt->get<std::string>();
                std::string consumerId;

                // This may throw.
                SetNewConsumerIdFromInternal(request->internal, consumerId);  //获取消费者id

                // Get type.
                auto jsonTypeIt = request->data.find("type");  //获取消费者类型

                if (jsonTypeIt == request->data.end() || !jsonTypeIt->is_string())
                    MS_THROW_TYPE_ERROR("missing type");

                // This may throw.
                auto type = RTC::RtpParameters::GetType(jsonTypeIt->get<std::string>());

                RTC::Consumer* consumer{ nullptr };

                switch (type)  //多种consumer
                {
                    case RTC::RtpParameters::Type::NONE:
                    {
                        MS_THROW_TYPE_ERROR("invalid type 'none'");
                        break;
                    }
                    case RTC::RtpParameters::Type::SIMPLE:  //普通音视频
                    {
                        consumer = new RTC::SimpleConsumer(consumerId, producerId, this, request->data);
                        break;
                    }

                    case RTC::RtpParameters::Type::SIMULCAST:
                        consumer = new RTC::SimulcastConsumer(consumerId, producerId, this, request->data);
                        break;
                    case RTC::RtpParameters::Type::SVC:
                        consumer = new RTC::SvcConsumer(consumerId, producerId, this, request->data);
                        break;
                    case RTC::RtpParameters::Type::PIPE:
                        consumer = new RTC::PipeConsumer(consumerId, producerId, this, request->data);
                        break;
                }

                // Notify the listener.
                // This may throw if no Producer is found.
this->listener->OnTransportNewConsumer(this, consumer, producerId);
// Insert into the maps.
                this->mapConsumers[consumerId] = consumer;

                for (auto ssrc : consumer->GetMediaSsrcs())
                {
                    this->mapSsrcConsumer[ssrc] = consumer;
                }

                for (auto ssrc : consumer->GetRtxSsrcs())
                {
                    this->mapRtxSsrcConsumer[ssrc] = consumer;
                }
// Create status response.
                json data = json::object();

                data["paused"]         = consumer->IsPaused();
                data["producerPaused"] = consumer->IsProducerPaused();

                consumer->FillJsonScore(data["score"]);

                auto preferredLayers = consumer->GetPreferredLayers();

                if (preferredLayers.spatial > -1 && preferredLayers.temporal > -1)
                {
                    data["preferredLayers"]["spatialLayer"]  = preferredLayers.spatial;
                    data["preferredLayers"]["temporalLayer"] = preferredLayers.temporal;
                }

                request->Accept(data);

                // Check if Transport Congestion Control client must be created.
                const auto& rtpHeaderExtensionIds = consumer->GetRtpHeaderExtensionIds();
                const auto& codecs                = consumer->GetRtpParameters().codecs;

                // Set TransportCongestionControlClient.
                if (!this->tccClient)
                {
                    bool createTccClient{ false };
                    RTC::BweType bweType;

                    // Use transport-cc if:
                    // - it's a video Consumer, and
                    // - there is transport-wide-cc-01 RTP header extension, and
                    // - there is "transport-cc" in codecs RTCP feedback.
                    //
                    // clang-format off
                    if (
                        consumer->GetKind() == RTC::Media::Kind::VIDEO &&
                        rtpHeaderExtensionIds.transportWideCc01 != 0u &&
                        std::any_of(
                            codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
                            {
                                return std::any_of(
                                    codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
                                    {
                                        return fb.type == "transport-cc";
                                    });
                            })
                    )
                    // clang-format on
                    {
                        MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with transport-cc");

                        createTccClient = true;
                        bweType         = RTC::BweType::TRANSPORT_CC;
                    }
                    // Use REMB if:
                    // - it's a video Consumer, and
                    // - there is abs-send-time RTP header extension, and
                    // - there is "remb" in codecs RTCP feedback.
                    //
                    // clang-format off
                    else if (
                        consumer->GetKind() == RTC::Media::Kind::VIDEO &&
                        rtpHeaderExtensionIds.absSendTime != 0u &&
                        std::any_of(
                            codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
                            {
                                return std::any_of(
                                    codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
                                    {
                                        return fb.type == "goog-remb";
                                    });
                            })
                    )
                    // clang-format on
                    {
                        MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with REMB");

                        createTccClient = true;
                        bweType         = RTC::BweType::REMB;
                    }

                    if (createTccClient)
                    {
                        // Tell all the Consumers that we are gonna manage their bitrate.
                        for (auto& kv : this->mapConsumers)
                        {
                            auto* consumer = kv.second;

                            consumer->SetExternallyManagedBitrate();
                        };

                        this->tccClient = new RTC::TransportCongestionControlClient(
                          this, bweType, this->initialAvailableOutgoingBitrate, this->maxOutgoingBitrate);

                        if (IsConnected())
                            this->tccClient->TransportConnected();
                    }
                }

                // If applicable, tell the new Consumer that we are gonna manage its
                // bitrate.
                if (this->tccClient)
                    consumer->SetExternallyManagedBitrate();

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
                // Create SenderBandwidthEstimator if:
                // - not already created,
                // - it's a video Consumer, and
                // - there is transport-wide-cc-01 RTP header extension, and
                // - there is "transport-cc" in codecs RTCP feedback.
                //
                // clang-format off
                if (
                    !this->senderBwe &&
                    consumer->GetKind() == RTC::Media::Kind::VIDEO &&
                    rtpHeaderExtensionIds.transportWideCc01 != 0u &&
                    std::any_of(
                        codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
                        {
                            return std::any_of(
                                codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
                                {
                                    return fb.type == "transport-cc";
                                });
                        })
                )
                // clang-format on
                {
                    MS_DEBUG_TAG(bwe, "enabling SenderBandwidthEstimator");

                    // Tell all the Consumers that we are gonna manage their bitrate.
                    for (auto& kv : this->mapConsumers)
                    {
                        auto* consumer = kv.second;

                        consumer->SetExternallyManagedBitrate();
                    };

                    this->senderBwe =
                      new RTC::SenderBandwidthEstimator(this, this->initialAvailableOutgoingBitrate);

                    if (IsConnected())
                        this->senderBwe->TransportConnected();
                }

                // If applicable, tell the new Consumer that we are gonna manage its
                // bitrate.
                if (this->senderBwe)
                    consumer->SetExternallyManagedBitrate();
#endif

                if (IsConnected())
                    consumer->TransportConnected();

                break;
            }

进入SimpleConsumer.cpp文件中去,查看创建SimpleConsumer构造函数

    SimpleConsumer::SimpleConsumer(
      const std::string& id, const std::string& producerId, RTC::Consumer::Listener* listener, json& data)
      : RTC::Consumer::Consumer(id, producerId, listener, data, RTC::RtpParameters::Type::SIMPLE)
    {
        MS_TRACE();

        // Ensure there is a single encoding.
        if (this->consumableRtpEncodings.size() != 1u)
            MS_THROW_TYPE_ERROR("invalid consumableRtpEncodings with size != 1");

        auto& encoding         = this->rtpParameters.encodings[0];
        const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);

        this->keyFrameSupported = RTC::Codecs::Tools::CanBeKeyFrame(mediaCodec->mimeType);

        // Create RtpStreamSend instance for sending a single stream to the remote.
        CreateRtpStream();  //创建RTP流
    void SimpleConsumer::CreateRtpStream()
    {
        MS_TRACE();

        auto& encoding         = this->rtpParameters.encodings[0];
        const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);

        MS_DEBUG_TAG(
          rtp, "[ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]", encoding.ssrc, mediaCodec->payloadType);

        // Set stream params.
        RTC::RtpStream::Params params;

        params.ssrc        = encoding.ssrc;
        params.payloadType = mediaCodec->payloadType;
        params.mimeType    = mediaCodec->mimeType;
        params.clockRate   = mediaCodec->clockRate;
        params.cname       = this->rtpParameters.rtcp.cname;

        // Check in band FEC in codec parameters.
        if (mediaCodec->parameters.HasInteger("useinbandfec") && mediaCodec->parameters.GetInteger("useinbandfec") == 1)
        {
            MS_DEBUG_TAG(rtp, "in band FEC enabled");

            params.useInBandFec = true;
        }

        // Check DTX in codec parameters.
        if (mediaCodec->parameters.HasInteger("usedtx") && mediaCodec->parameters.GetInteger("usedtx") == 1)
        {
            MS_DEBUG_TAG(rtp, "DTX enabled");

            params.useDtx = true;
        }

        // Check DTX in the encoding.
        if (encoding.dtx)
        {
            MS_DEBUG_TAG(rtp, "DTX enabled");

            params.useDtx = true;
        }

        for (const auto& fb : mediaCodec->rtcpFeedback)
        {
            if (!params.useNack && fb.type == "nack" && fb.parameter.empty())
            {
                MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported");

                params.useNack = true;
            }
            else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli")
            {
                MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported");

                params.usePli = true;
            }
            else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir")
            {
                MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported");

                params.useFir = true;
            }
        }

        // Create a RtpStreamSend for sending a single media stream.
        size_t bufferSize = params.useNack ? 600u : 0u;
  
        this->rtpStream = new RTC::RtpStreamSend(this, params, bufferSize);  //创建RTPStreamSend发送给远端客户端
        this->rtpStreams.push_back(this->rtpStream);  //保存在数据结构中去!

        // If the Consumer is paused, tell the RtpStreamSend.
        if (IsPaused() || IsProducerPaused())
            this->rtpStream->Pause();

        const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding);

        if (rtxCodec && encoding.hasRtx)
            this->rtpStream->SetRtx(rtxCodec->payloadType, encoding.rtx.ssrc);
    }

创建Consumer时,会创建一个RtpStreamSend对象,为后面向客户端发送数据做准备。

当共享者的数据传输上来,服务端通过ssrc找到对应的生产者,通过生产者获取消费者,最后通过RtpStreamSend对象发送给最终的消费者!!

 

上一篇:OKLink行业观察:再斥1.7亿美金购入比特币,Square想干什么?


下一篇:Tornado.Cash终于,终于发币了!TORN治理机制都在这里