webrtc-m79-音频处理-音频的接收流程

1 代码

与视频相比,接收流程的前面的一部分是重合的。

void UDPPort::OnReadPacket(rtc::AsyncPacketSocket* socket,
                           const char* data,
                           size_t size,
                           const rtc::SocketAddress& remote_addr,
                           const int64_t& packet_time_us) {
  RTC_DCHECK(socket == socket_);
  RTC_DCHECK(!remote_addr.IsUnresolvedIP());

  // Look for a response from the STUN server.
  // Even if the response doesn't match one of our outstanding requests, we
  // will eat it because it might be a response to a retransmitted packet, and
  // we already cleared the request when we got the first response.
  if (server_addresses_.find(remote_addr) != server_addresses_.end()) {
    requests_.CheckResponse(data, size);
    return;
  }

  if (Connection* conn = GetConnection(remote_addr)) {
    conn->OnReadPacket(data, size, packet_time_us); // 注意这里
  } else {
    Port::OnReadPacket(data, size, remote_addr, PROTO_UDP);
  }
}


void Connection::OnReadPacket(const char* data,
                              size_t size,
                              int64_t packet_time_us) {
  std::unique_ptr<IceMessage> msg;
  std::string remote_ufrag;
  const rtc::SocketAddress& addr(remote_candidate_.address());
  if (!port_->GetStunMessage(data, size, addr, &msg, &remote_ufrag)) {
    // The packet did not parse as a valid STUN message
    // This is a data packet, pass it along.
    last_data_received_ = rtc::TimeMillis();
    UpdateReceiving(last_data_received_);
    recv_rate_tracker_.AddSamples(size);
    SignalReadPacket(this, data, size, packet_time_us); // 注意这里P2PTransportChannel::AddConnection ===>其内   connection->SignalReadPacket.connect(this, &P2PTransportChannel::OnReadPacket); // 注意这里

    // If timed out sending writability checks, start up again
    if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) {
      RTC_LOG(LS_WARNING)
          << "Received a data packet on a timed-out Connection. "
             "Resetting state to STATE_WRITE_INIT.";
      set_write_state(STATE_WRITE_INIT);
    }
  } else if (!msg) {
    // The packet was STUN, but failed a check and was handled internally.
  } else {
    // The packet is STUN and passed the Port checks.
    // Perform our own checks to ensure this packet is valid.
    // If this is a STUN request, then update the receiving bit and respond.
    // If this is a STUN response, then update the writable bit.
    // Log at LS_INFO if we receive a ping on an unwritable connection.
    rtc::LoggingSeverity sev = (!writable() ? rtc::LS_INFO : rtc::LS_VERBOSE);
    switch (msg->type()) {
      case STUN_BINDING_REQUEST:
        RTC_LOG_V(sev) << ToString() << ": Received STUN ping, id="
                       << rtc::hex_encode(msg->transaction_id());

        if (remote_ufrag == remote_candidate_.username()) {
          HandleBindingRequest(msg.get());
        } else {
          // The packet had the right local username, but the remote username
          // was not the right one for the remote address.
          RTC_LOG(LS_ERROR)
              << ToString()
              << ": Received STUN request with bad remote username "
              << remote_ufrag;
          port_->SendBindingErrorResponse(msg.get(), addr,
                                          STUN_ERROR_UNAUTHORIZED,
                                          STUN_ERROR_REASON_UNAUTHORIZED);
        }
        break;

      // Response from remote peer. Does it match request sent?
      // This doesn't just check, it makes callbacks if transaction
      // id's match.
      case STUN_BINDING_RESPONSE:
      case STUN_BINDING_ERROR_RESPONSE:
        if (msg->ValidateMessageIntegrity(data, size,
                                          remote_candidate().password())) {
          requests_.CheckResponse(msg.get());
        }
        // Otherwise silently discard the response message.
        break;

      // Remote end point sent an STUN indication instead of regular binding
      // request. In this case |last_ping_received_| will be updated but no
      // response will be sent.
      case STUN_BINDING_INDICATION:
        ReceivedPing(msg->transaction_id());
        break;

      default:
        RTC_NOTREACHED();
        break;
    }
  }
}


void P2PTransportChannel::OnReadPacket(Connection* connection,
                                       const char* data,
                                       size_t len,
                                       int64_t packet_time_us) {
  RTC_DCHECK_RUN_ON(network_thread_);

  // Do not deliver, if packet doesn't belong to the correct transport channel.
  if (!FindConnection(connection))
    return;

  // Let the client know of an incoming packet
  SignalReadPacket(this, data, len, packet_time_us, 0); // 注意这里DtlsTransport::ConnectToIceTransport ===> ice_transport_->SignalReadPacket.connect(this, &DtlsTransport::OnReadPacket); // 注意这里

  // May need to switch the sending connection based on the receiving media path
  // if this is the controlled side.
  if (ice_role_ == ICEROLE_CONTROLLED) {
    MaybeSwitchSelectedConnection(connection, "data received");
  }
}


void DtlsTransport::OnReadPacket(rtc::PacketTransportInternal* transport,
                                 const char* data,
                                 size_t size,
                                 const int64_t& packet_time_us,
                                 int flags) {
  RTC_DCHECK_RUN_ON(&thread_checker_);
  RTC_DCHECK(transport == ice_transport_);
  RTC_DCHECK(flags == 0);

  if (!dtls_active_) {
    // Not doing DTLS.
    SignalReadPacket(this, data, size, packet_time_us, 0);
    return;
  }

  switch (dtls_state()) {
    case DTLS_TRANSPORT_NEW:
      if (dtls_) {
        RTC_LOG(LS_INFO) << ToString()
                         << ": Packet received before DTLS started.";
      } else {
        RTC_LOG(LS_WARNING) << ToString()
                            << ": Packet received before we know if we are "
                               "doing DTLS or not.";
      }
      // Cache a client hello packet received before DTLS has actually started.
      if (IsDtlsClientHelloPacket(data, size)) {
        RTC_LOG(LS_INFO) << ToString()
                         << ": Caching DTLS ClientHello packet until DTLS is "
                            "started.";
        cached_client_hello_.SetData(data, size);
        // If we haven't started setting up DTLS yet (because we don't have a
        // remote fingerprint/role), we can use the client hello as a clue that
        // the peer has chosen the client role, and proceed with the handshake.
        // The fingerprint will be verified when it's set.
        if (!dtls_ && local_certificate_) {
          SetDtlsRole(rtc::SSL_SERVER);
          SetupDtls();
        }
      } else {
        RTC_LOG(LS_INFO) << ToString()
                         << ": Not a DTLS ClientHello packet; dropping.";
      }
      break;

    case DTLS_TRANSPORT_CONNECTING:
    case DTLS_TRANSPORT_CONNECTED:
      // We should only get DTLS or SRTP packets; STUN's already been demuxed.
      // Is this potentially a DTLS packet?
      if (IsDtlsPacket(data, size)) {
        if (!HandleDtlsPacket(data, size)) {
          RTC_LOG(LS_ERROR) << ToString() << ": Failed to handle DTLS packet.";
          return;
        }
      } else {
        // Not a DTLS packet; our handshake should be complete by now.
        if (dtls_state() != DTLS_TRANSPORT_CONNECTED) {
          RTC_LOG(LS_ERROR) << ToString()
                            << ": Received non-DTLS packet before DTLS "
                               "complete.";
          return;
        }

        // And it had better be a SRTP packet.
        if (!IsRtpPacket(data, size)) {
          RTC_LOG(LS_ERROR)
              << ToString() << ": Received unexpected non-DTLS packet.";
          return;
        }

        // Sanity check.
        RTC_DCHECK(!srtp_ciphers_.empty());

        // Signal this upwards as a bypass packet.
        SignalReadPacket(this, data, size, packet_time_us, PF_SRTP_BYPASS); // RtpTransport::SetRtpPacketTransport ===> new_packet_transport->SignalReadPacket.connect(this, &RtpTransport::OnReadPacket); // 注意这里
      } // 此时的 this 指针实际上指向的就是 webrtc::DtlsSrtpTransport
      break;
    case DTLS_TRANSPORT_FAILED:
    case DTLS_TRANSPORT_CLOSED:
      // This shouldn't be happening. Drop the packet.
      break;
  }
}


// webrtc::DtlsSrtpTransport 的继承关系
class webrtc::RtpTransport : public webrtc::RtpTransportInternal
class webrtc::SrtpTransport : public webrtc::RtpTransport
class webrtc::DtlsSrtpTransport : public webrtc::SrtpTransport


void RtpTransport::OnReadPacket(rtc::PacketTransportInternal* transport,
                                const char* data,
                                size_t len,
                                const int64_t& packet_time_us,
                                int flags) {
  TRACE_EVENT0("webrtc", "RtpTransport::OnReadPacket");

  // When using RTCP multiplexing we might get RTCP packets on the RTP
  // transport. We check the RTP payload type to determine if it is RTCP.
  auto array_view = rtc::MakeArrayView(data, len);
  cricket::RtpPacketType packet_type = cricket::InferRtpPacketType(array_view);
  // Filter out the packet that is neither RTP nor RTCP.
  if (packet_type == cricket::RtpPacketType::kUnknown) {
    return;
  }

  // Protect ourselves against crazy data.
  if (!cricket::IsValidRtpPacketSize(packet_type, len)) {
    RTC_LOG(LS_ERROR) << "Dropping incoming "
                      << cricket::RtpPacketTypeToString(packet_type)
                      << " packet: wrong size=" << len;
    return;
  }

  rtc::CopyOnWriteBuffer packet(data, len);
  if (packet_type == cricket::RtpPacketType::kRtcp) {
    OnRtcpPacketReceived(std::move(packet), packet_time_us); // 注意这里
  } else {
    OnRtpPacketReceived(std::move(packet), packet_time_us); // 多态 SrtpTransport::OnRtpPacketReceived
  }
}



void SrtpTransport::OnRtpPacketReceived(rtc::CopyOnWriteBuffer packet,
                                        int64_t packet_time_us) {
  if (!IsSrtpActive()) {
    RTC_LOG(LS_WARNING)
        << "Inactive SRTP transport received an RTP packet. Drop it.";
    return;
  }
  TRACE_EVENT0("webrtc", "SRTP Decode");
  char* data = packet.data<char>();
  int len = rtc::checked_cast<int>(packet.size());
  if (!UnprotectRtp(data, len, &len)) { // 注意这里
    int seq_num = -1;
    uint32_t ssrc = 0;
    cricket::GetRtpSeqNum(data, len, &seq_num);
    cricket::GetRtpSsrc(data, len, &ssrc);

    // Limit the error logging to avoid excessive logs when there are lots of
    // bad packets.
    const int kFailureLogThrottleCount = 100;
    if (decryption_failure_count_ % kFailureLogThrottleCount == 0) {
      RTC_LOG(LS_ERROR) << "Failed to unprotect RTP packet: size=" << len
                        << ", seqnum=" << seq_num << ", SSRC=" << ssrc
                        << ", previous failure count: "
                        << decryption_failure_count_;
    }
    ++decryption_failure_count_;
    return;
  }
  packet.SetSize(len);
  DemuxPacket(std::move(packet), packet_time_us); // 注意这里
}


void RtpTransport::DemuxPacket(rtc::CopyOnWriteBuffer packet,
                               int64_t packet_time_us) {
  webrtc::RtpPacketReceived parsed_packet(&header_extension_map_);
  if (!parsed_packet.Parse(std::move(packet))) { // 注意这里
    RTC_LOG(LS_ERROR)
        << "Failed to parse the incoming RTP packet before demuxing. Drop it.";
    return;
  }

  if (packet_time_us != -1) {
    parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000);
  }
  if (!rtp_demuxer_.OnRtpPacket(parsed_packet)) { // 注意这里
    RTC_LOG(LS_WARNING) << "Failed to demux RTP packet: "
                        << RtpDemuxer::DescribePacket(parsed_packet);
  }
}


bool RtpDemuxer::OnRtpPacket(const RtpPacketReceived& packet) {
  RtpPacketSinkInterface* sink = ResolveSink(packet);
  if (sink != nullptr) {
    sink->OnRtpPacket(packet); // 参考链接: https://blog.csdn.net/zhengbin6072/article/details/108411342
    return true;
  }
  return false;
}

void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) {
  // Take packet time from the |parsed_packet|.
  // RtpPacketReceived.arrival_time_ms = (timestamp_us + 500) / 1000;
  int64_t packet_time_us = -1;
  if (parsed_packet.arrival_time_ms() > 0) {
    packet_time_us = parsed_packet.arrival_time_ms() * 1000;
  }

  if (!has_received_packet_) {
    has_received_packet_ = true;
    signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FIRSTPACKETRECEIVED);
  }

  if (!srtp_active() && srtp_required_) {
    // Our session description indicates that SRTP is required, but we got a
    // packet before our SRTP filter is active. This means either that
    // a) we got SRTP packets before we received the SDES keys, in which case
    //    we can't decrypt it anyway, or
    // b) we got SRTP packets before DTLS completed on both the RTP and RTCP
    //    transports, so we haven't yet extracted keys, even if DTLS did
    //    complete on the transport that the packets are being sent on. It's
    //    really good practice to wait for both RTP and RTCP to be good to go
    //    before sending  media, to prevent weird failure modes, so it's fine
    //    for us to just eat packets here. This is all sidestepped if RTCP mux
    //    is used anyway.
    RTC_LOG(LS_WARNING) << "Can't process incoming RTP packet when "
                           "SRTP is inactive and crypto is required";
    return;
  }

  auto packet_buffer = parsed_packet.Buffer();

  invoker_.AsyncInvoke<void>(
      RTC_FROM_HERE, worker_thread_, [this, packet_buffer, packet_time_us] {
        RTC_DCHECK(worker_thread_->IsCurrent());
        media_channel_->OnPacketReceived(packet_buffer, packet_time_us); // 以音频为例
      });
}


void WebRtcVoiceMediaChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet,
                                               int64_t packet_time_us) {
  RTC_DCHECK(worker_thread_checker_.IsCurrent());

  webrtc::PacketReceiver::DeliveryStatus delivery_result =
      call_->Receiver()->DeliverPacket(webrtc::MediaType::AUDIO, packet, // 
                                       packet_time_us);

  if (delivery_result != webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC) {
    return;
  }

  // Create an unsignaled receive stream for this previously not received ssrc.
  // If there already is N unsignaled receive streams, delete the oldest.
  // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=5208
  uint32_t ssrc = 0;
  if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) {
    return;
  }
  RTC_DCHECK(!absl::c_linear_search(unsignaled_recv_ssrcs_, ssrc));

  // Add new stream.
  StreamParams sp = unsignaled_stream_params_;
  sp.ssrcs.push_back(ssrc);
  RTC_LOG(LS_INFO) << "Creating unsignaled receive stream for SSRC=" << ssrc;
  if (!AddRecvStream(sp)) {
    RTC_LOG(LS_WARNING) << "Could not create unsignaled receive stream.";
    return;
  }
  unsignaled_recv_ssrcs_.push_back(ssrc);
  RTC_HISTOGRAM_COUNTS_LINEAR("WebRTC.Audio.NumOfUnsignaledStreams",
                              unsignaled_recv_ssrcs_.size(), 1, 100, 101);

  // Remove oldest unsignaled stream, if we have too many.
  if (unsignaled_recv_ssrcs_.size() > kMaxUnsignaledRecvStreams) {
    uint32_t remove_ssrc = unsignaled_recv_ssrcs_.front();
    RTC_DLOG(LS_INFO) << "Removing unsignaled receive stream with SSRC="
                      << remove_ssrc;
    RemoveRecvStream(remove_ssrc);
  }
  RTC_DCHECK_GE(kMaxUnsignaledRecvStreams, unsignaled_recv_ssrcs_.size());

  SetOutputVolume(ssrc, default_recv_volume_);
  SetBaseMinimumPlayoutDelayMs(ssrc, default_recv_base_minimum_delay_ms_);

  // The default sink can only be attached to one stream at a time, so we hook
  // it up to the *latest* unsignaled stream we've seen, in order to support the
  // case where the SSRC of one unsignaled stream changes.
  if (default_sink_) {
    for (uint32_t drop_ssrc : unsignaled_recv_ssrcs_) {
      auto it = recv_streams_.find(drop_ssrc);
      it->second->SetRawAudioSink(nullptr);
    }
    std::unique_ptr<webrtc::AudioSinkInterface> proxy_sink(
        new ProxySink(default_sink_.get()));
    SetRawAudioSink(ssrc, std::move(proxy_sink));
  }

  delivery_result = call_->Receiver()->DeliverPacket(webrtc::MediaType::AUDIO,
                                                     packet, packet_time_us);
  RTC_DCHECK_NE(webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC, delivery_result);
}


PacketReceiver::DeliveryStatus Call::DeliverPacket(
    MediaType media_type,
    rtc::CopyOnWriteBuffer packet,
    int64_t packet_time_us) {
  RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
  if (IsRtcp(packet.cdata(), packet.size()))
    return DeliverRtcp(media_type, packet.cdata(), packet.size());

  return DeliverRtp(media_type, std::move(packet), packet_time_us);//
}



PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
                                                rtc::CopyOnWriteBuffer packet,
                                                int64_t packet_time_us) {
  TRACE_EVENT0("webrtc", "Call::DeliverRtp");

  RtpPacketReceived parsed_packet;
  if (!parsed_packet.Parse(std::move(packet)))
    return DELIVERY_PACKET_ERROR;

  if (packet_time_us != -1) {
    if (receive_time_calculator_) {
      // Repair packet_time_us for clock resets by comparing a new read of
      // the same clock (TimeUTCMicros) to a monotonic clock reading.
      packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
          packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
    }
    parsed_packet.set_arrival_time_ms((packet_time_us + 500) / 1000);
  } else {
    parsed_packet.set_arrival_time_ms(clock_->TimeInMilliseconds());
  }

  // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
  // These are empty (zero length payload) RTP packets with an unsignaled
  // payload type.
  const bool is_keep_alive_packet = parsed_packet.payload_size() == 0;

  RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
             is_keep_alive_packet);

  ReadLockScoped read_lock(*receive_crit_);
  auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
  if (it == receive_rtp_config_.end()) {
    RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
                      << parsed_packet.Ssrc();
    // Destruction of the receive stream, including deregistering from the
    // RtpDemuxer, is not protected by the |receive_crit_| lock. But
    // deregistering in the |receive_rtp_config_| map is protected by that lock.
    // So by not passing the packet on to demuxing in this case, we prevent
    // incoming packets to be passed on via the demuxer to a receive stream
    // which is being torned down.
    return DELIVERY_UNKNOWN_SSRC;
  }

  parsed_packet.IdentifyExtensions(it->second.extensions);

  NotifyBweOfReceivedPacket(parsed_packet, media_type);

  // RateCounters expect input parameter as int, save it as int,
  // instead of converting each time it is passed to RateCounter::Add below.
  int length = static_cast<int>(parsed_packet.size());
  if (media_type == MediaType::AUDIO) {
    if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) { // RtpStreamReceiverController::OnRtpPacket
      received_bytes_per_second_counter_.Add(length);
      received_audio_bytes_per_second_counter_.Add(length);
      event_log_->Log(
          std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
      const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
      if (!first_received_rtp_audio_ms_) {
        first_received_rtp_audio_ms_.emplace(arrival_time_ms);
      }
      last_received_rtp_audio_ms_.emplace(arrival_time_ms);
      return DELIVERY_OK;
    }
  } else if (media_type == MediaType::VIDEO) {
    parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
    if (video_receiver_controller_.OnRtpPacket(parsed_packet)) {
      received_bytes_per_second_counter_.Add(length);
      received_video_bytes_per_second_counter_.Add(length);
      event_log_->Log(
          std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
      const int64_t arrival_time_ms = parsed_packet.arrival_time_ms();
      if (!first_received_rtp_video_ms_) {
        first_received_rtp_video_ms_.emplace(arrival_time_ms);
      }
      last_received_rtp_video_ms_.emplace(arrival_time_ms);
      return DELIVERY_OK;
    }
  }
  return DELIVERY_UNKNOWN_SSRC;
}


bool RtpStreamReceiverController::OnRtpPacket(const RtpPacketReceived& packet) {
  rtc::CritScope cs(&lock_);
  return demuxer_.OnRtpPacket(packet);
}


bool RtpDemuxer::OnRtpPacket(const RtpPacketReceived& packet) {
  RtpPacketSinkInterface* sink = ResolveSink(packet);
  if (sink != nullptr) {
    sink->OnRtpPacket(packet); // 这里的 sink 指向的是 webrtc::voe::ChannelReceive
    return true;
  }
  return false;
}


											AudioReceiveStream::AudioReceiveStream
											===>
											    rtp_stream_receiver_ = receiver_controller->CreateReceiver( // RtpStreamReceiverController::CreateReceiver
											        config.rtp.remote_ssrc, channel_receive_.get()); // channel_receive_ 实际上指向的也是 webrtc::voe::ChannelReceive
											        
											// webrtc::voe::ChannelReceive
											// class ChannelReceiveInterface : public RtpPacketSinkInterface
											// class ChannelReceive : public ChannelReceiveInterface,
											                       public MediaTransportAudioSinkInterface
											
											// class webrtc::RtpStreamReceiverController::Receiver : public RtpStreamReceiverInterface                      
											std::unique_ptr<RtpStreamReceiverInterface>
											RtpStreamReceiverController::CreateReceiver(uint32_t ssrc,
											                                            RtpPacketSinkInterface* sink) { // sink 指向的就是 webrtc::voe::ChannelReceive
											  return std::make_unique<Receiver>(this, ssrc, sink); // webrtc::RtpStreamReceiverController::Receiver
											}
											
											
											RtpStreamReceiverController::Receiver::Receiver(
											    RtpStreamReceiverController* controller,
											    uint32_t ssrc,
											    RtpPacketSinkInterface* sink) // sink 指向的就是 webrtc::voe::ChannelReceive
											    : controller_(controller), sink_(sink) {
											  const bool sink_added = controller_->AddSink(ssrc, sink_); // 
											  if (!sink_added) {
											    RTC_LOG(LS_ERROR)
											        << "RtpStreamReceiverController::Receiver::Receiver: Sink "
											        << "could not be added for SSRC=" << ssrc << ".";
											  }
											}



void ChannelReceive::OnRtpPacket(const RtpPacketReceived& packet) {
  int64_t now_ms = rtc::TimeMillis();

  {
    rtc::CritScope cs(&sync_info_lock_);
    last_received_rtp_timestamp_ = packet.Timestamp();
    last_received_rtp_system_time_ms_ = now_ms;
  }

  // Store playout timestamp for the received RTP packet
  UpdatePlayoutTimestamp(false);

  const auto& it = payload_type_frequencies_.find(packet.PayloadType());
  if (it == payload_type_frequencies_.end())
    return;
  // TODO(nisse): Set payload_type_frequency earlier, when packet is parsed.
  RtpPacketReceived packet_copy(packet);
  packet_copy.set_payload_type_frequency(it->second);

  rtp_receive_statistics_->OnRtpPacket(packet_copy);

  RTPHeader header;
  packet_copy.GetHeader(&header);

  ReceivePacket(packet_copy.data(), packet_copy.size(), header); // 
}


void ChannelReceive::ReceivePacket(const uint8_t* packet,
                                   size_t packet_length,
                                   const RTPHeader& header) {
  const uint8_t* payload = packet + header.headerLength;
  assert(packet_length >= header.headerLength);
  size_t payload_length = packet_length - header.headerLength;

  size_t payload_data_length = payload_length - header.paddingLength;

  // E2EE Custom Audio Frame Decryption (This is optional).
  // Keep this buffer around for the lifetime of the OnReceivedPayloadData call.
  rtc::Buffer decrypted_audio_payload;
  if (frame_decryptor_ != nullptr) {
    const size_t max_plaintext_size = frame_decryptor_->GetMaxPlaintextByteSize(
        cricket::MEDIA_TYPE_AUDIO, payload_length);
    decrypted_audio_payload.SetSize(max_plaintext_size);

    const std::vector<uint32_t> csrcs(header.arrOfCSRCs,
                                      header.arrOfCSRCs + header.numCSRCs);
    const FrameDecryptorInterface::Result decrypt_result =
        frame_decryptor_->Decrypt(
            cricket::MEDIA_TYPE_AUDIO, csrcs,
            /*additional_data=*/nullptr,
            rtc::ArrayView<const uint8_t>(payload, payload_data_length),
            decrypted_audio_payload);

    if (decrypt_result.IsOk()) {
      decrypted_audio_payload.SetSize(decrypt_result.bytes_written);
    } else {
      // Interpret failures as a silent frame.
      decrypted_audio_payload.SetSize(0);
    }

    payload = decrypted_audio_payload.data();
    payload_data_length = decrypted_audio_payload.size();
  } else if (crypto_options_.sframe.require_frame_encryption) {
    RTC_DLOG(LS_ERROR)
        << "FrameDecryptor required but not set, dropping packet";
    payload_data_length = 0;
  }

  OnReceivedPayloadData(
      rtc::ArrayView<const uint8_t>(payload, payload_data_length), header); // 
}



void ChannelReceive::OnReceivedPayloadData(
    rtc::ArrayView<const uint8_t> payload,
    const RTPHeader& rtpHeader) {
  // We should not be receiving any RTP packets if media_transport is set.
  RTC_CHECK(!media_transport());

  if (!Playing()) {
    // Avoid inserting into NetEQ when we are not playing. Count the
    // packet as discarded.
    return;
  }

  // Push the incoming payload (parsed and ready for decoding) into the ACM
  if (acm_receiver_.InsertPacket(rtpHeader, payload) != 0) { // 注意这里
    RTC_DLOG(LS_ERROR) << "ChannelReceive::OnReceivedPayloadData() unable to "
                          "push data to the ACM";
    return;
  }

  int64_t round_trip_time = 0;
  _rtpRtcpModule->RTT(remote_ssrc_, &round_trip_time, NULL, NULL, NULL); // 

  std::vector<uint16_t> nack_list = acm_receiver_.GetNackList(round_trip_time);
  if (!nack_list.empty()) {
    // Can't use nack_list.data() since it's not supported by all
    // compilers.
    ResendPackets(&(nack_list[0]), static_cast<int>(nack_list.size()));
  }
}


int AcmReceiver::InsertPacket(const RTPHeader& rtp_header,
                              rtc::ArrayView<const uint8_t> incoming_payload) {
  if (incoming_payload.empty()) {
    neteq_->InsertEmptyPacket(rtp_header);
    return 0;
  }

  int payload_type = rtp_header.payloadType;
  auto format = neteq_->GetDecoderFormat(payload_type);
  if (format && absl::EqualsIgnoreCase(format->sdp_format.name, "red")) {
    // This is a RED packet. Get the format of the audio codec.
    payload_type = incoming_payload[0] & 0x7f;
    format = neteq_->GetDecoderFormat(payload_type);
  }
  if (!format) {
    RTC_LOG_F(LS_ERROR) << "Payload-type " << payload_type
                        << " is not registered.";
    return -1;
  }

  {
    rtc::CritScope lock(&crit_sect_);
    if (absl::EqualsIgnoreCase(format->sdp_format.name, "cn")) {
      if (last_decoder_ && last_decoder_->num_channels > 1) {
        // This is a CNG and the audio codec is not mono, so skip pushing in
        // packets into NetEq.
        return 0;
      }
    } else {
      last_decoder_ = DecoderInfo{/*payload_type=*/payload_type,
                                  /*sample_rate_hz=*/format->sample_rate_hz,
                                  /*num_channels=*/format->num_channels,
                                  /*sdp_format=*/std::move(format->sdp_format)};
    }
  }  // |crit_sect_| is released.

  if (neteq_->InsertPacket(rtp_header, incoming_payload) < 0) { // 多态 NetEqImpl::InsertPacket
    RTC_LOG(LERROR) << "AcmReceiver::InsertPacket "
                    << static_cast<int>(rtp_header.payloadType)
                    << " Failed to insert packet";
    return -1;
  }
  return 0;
}

int NetEqImpl::InsertPacket(const RTPHeader& rtp_header,
                            rtc::ArrayView<const uint8_t> payload) {
  rtc::MsanCheckInitialized(payload);
  TRACE_EVENT0("webrtc", "NetEqImpl::InsertPacket");
  rtc::CritScope lock(&crit_sect_);
  if (InsertPacketInternal(rtp_header, payload) != 0) { // 注意这里
    return kFail;
  }
  return kOK;
}


int NetEqImpl::InsertPacketInternal(const RTPHeader& rtp_header,
                                    rtc::ArrayView<const uint8_t> payload) {
  if (payload.empty()) {
    RTC_LOG_F(LS_ERROR) << "payload is empty";
    return kInvalidPointer;
  }

  int64_t receive_time_ms = clock_->TimeInMilliseconds();
  stats_->ReceivedPacket();

  PacketList packet_list;
  // Insert packet in a packet list.
  packet_list.push_back([&rtp_header, &payload, &receive_time_ms] {
    // Convert to Packet.
    Packet packet;
    packet.payload_type = rtp_header.payloadType;
    packet.sequence_number = rtp_header.sequenceNumber;
    packet.timestamp = rtp_header.timestamp;
    packet.payload.SetData(payload.data(), payload.size());
    packet.packet_info = RtpPacketInfo(rtp_header, receive_time_ms);
    // Waiting time will be set upon inserting the packet in the buffer.
    RTC_DCHECK(!packet.waiting_time);
    return packet;
  }());

  bool update_sample_rate_and_channels = first_packet_;

  if (update_sample_rate_and_channels) {
    // Reset timestamp scaling.
    timestamp_scaler_->Reset();
  }

  if (!decoder_database_->IsRed(rtp_header.payloadType)) {
    // Scale timestamp to internal domain (only for some codecs).
    timestamp_scaler_->ToInternal(&packet_list);
  }

  // Store these for later use, since the first packet may very well disappear
  // before we need these values.
  uint32_t main_timestamp = packet_list.front().timestamp;
  uint8_t main_payload_type = packet_list.front().payload_type;
  uint16_t main_sequence_number = packet_list.front().sequence_number;

  // Reinitialize NetEq if it's needed (changed SSRC or first call).
  if (update_sample_rate_and_channels) {
    // Note: |first_packet_| will be cleared further down in this method, once
    // the packet has been successfully inserted into the packet buffer.

    // Flush the packet buffer and DTMF buffer.
    packet_buffer_->Flush();
    dtmf_buffer_->Flush();

    // Update audio buffer timestamp.
    sync_buffer_->IncreaseEndTimestamp(main_timestamp - timestamp_);

    // Update codecs.
    timestamp_ = main_timestamp;
  }

  if (nack_enabled_) {
    RTC_DCHECK(nack_);
    if (update_sample_rate_and_channels) {
      nack_->Reset();
    }
    nack_->UpdateLastReceivedPacket(rtp_header.sequenceNumber,
                                    rtp_header.timestamp);
  }

  // Check for RED payload type, and separate payloads into several packets.
  if (decoder_database_->IsRed(rtp_header.payloadType)) {
    if (!red_payload_splitter_->SplitRed(&packet_list)) {
      return kRedundancySplitError;
    }
    // Only accept a few RED payloads of the same type as the main data,
    // DTMF events and CNG.
    red_payload_splitter_->CheckRedPayloads(&packet_list, *decoder_database_);
    if (packet_list.empty()) {
      return kRedundancySplitError;
    }
  }

  // Check payload types.
  if (decoder_database_->CheckPayloadTypes(packet_list) ==
      DecoderDatabase::kDecoderNotFound) {
    return kUnknownRtpPayloadType;
  }

  RTC_DCHECK(!packet_list.empty());

  // Update main_timestamp, if new packets appear in the list
  // after RED splitting.
  if (decoder_database_->IsRed(rtp_header.payloadType)) {
    timestamp_scaler_->ToInternal(&packet_list);
    main_timestamp = packet_list.front().timestamp;
    main_payload_type = packet_list.front().payload_type;
    main_sequence_number = packet_list.front().sequence_number;
  }

  // Process DTMF payloads. Cycle through the list of packets, and pick out any
  // DTMF payloads found.
  PacketList::iterator it = packet_list.begin();
  while (it != packet_list.end()) {
    const Packet& current_packet = (*it);
    RTC_DCHECK(!current_packet.payload.empty());
    if (decoder_database_->IsDtmf(current_packet.payload_type)) {
      DtmfEvent event;
      int ret = DtmfBuffer::ParseEvent(current_packet.timestamp,
                                       current_packet.payload.data(),
                                       current_packet.payload.size(), &event);
      if (ret != DtmfBuffer::kOK) {
        return kDtmfParsingError;
      }
      if (dtmf_buffer_->InsertEvent(event) != DtmfBuffer::kOK) {
        return kDtmfInsertError;
      }
      it = packet_list.erase(it);
    } else {
      ++it;
    }
  }

  PacketList parsed_packet_list;
  while (!packet_list.empty()) {
    Packet& packet = packet_list.front();
    const DecoderDatabase::DecoderInfo* info =
        decoder_database_->GetDecoderInfo(packet.payload_type);
    if (!info) {
      RTC_LOG(LS_WARNING) << "SplitAudio unknown payload type";
      return kUnknownRtpPayloadType;
    }

    if (info->IsComfortNoise()) {
      // Carry comfort noise packets along.
      parsed_packet_list.splice(parsed_packet_list.end(), packet_list,
                                packet_list.begin());
    } else {
      const auto sequence_number = packet.sequence_number;
      const auto payload_type = packet.payload_type;
      const Packet::Priority original_priority = packet.priority;
      const auto& packet_info = packet.packet_info;
      auto packet_from_result = [&](AudioDecoder::ParseResult& result) {
        Packet new_packet;
        new_packet.sequence_number = sequence_number;
        new_packet.payload_type = payload_type;
        new_packet.timestamp = result.timestamp;
        new_packet.priority.codec_level = result.priority;
        new_packet.priority.red_level = original_priority.red_level;
        new_packet.packet_info = packet_info;
        new_packet.frame = std::move(result.frame);
        return new_packet;
      };

      std::vector<AudioDecoder::ParseResult> results =
          info->GetDecoder()->ParsePayload(std::move(packet.payload), // 假设音频是 OPUS, AudioDecoderOpusImpl::ParsePayload
                                           packet.timestamp);
      if (results.empty()) {
        packet_list.pop_front();
      } else {
        bool first = true;
        for (auto& result : results) {
          RTC_DCHECK(result.frame);
          RTC_DCHECK_GE(result.priority, 0);
          if (first) {
            // Re-use the node and move it to parsed_packet_list.
            packet_list.front() = packet_from_result(result);
            parsed_packet_list.splice(parsed_packet_list.end(), packet_list,
                                      packet_list.begin());
            first = false;
          } else {
            parsed_packet_list.push_back(packet_from_result(result));
          }
        }
      }
    }
  }

  // Calculate the number of primary (non-FEC/RED) packets.
  const size_t number_of_primary_packets = std::count_if(
      parsed_packet_list.begin(), parsed_packet_list.end(),
      [](const Packet& in) { return in.priority.codec_level == 0; });
  if (number_of_primary_packets < parsed_packet_list.size()) {
    stats_->SecondaryPacketsReceived(parsed_packet_list.size() -
                                     number_of_primary_packets);
  }

  // Insert packets in buffer.
  const int ret = packet_buffer_->InsertPacketList( // 注意这里PacketBuffer::InsertPacketList
      &parsed_packet_list, *decoder_database_, &current_rtp_payload_type_,
      &current_cng_rtp_payload_type_, stats_.get());
  if (ret == PacketBuffer::kFlushed) {
    // Reset DSP timestamp etc. if packet buffer flushed.
    new_codec_ = true;
    update_sample_rate_and_channels = true;
  } else if (ret != PacketBuffer::kOK) {
    return kOtherError;
  }

  if (first_packet_) {
    first_packet_ = false;
    // Update the codec on the next GetAudio call.
    new_codec_ = true;
  }

  if (current_rtp_payload_type_) {
    RTC_DCHECK(decoder_database_->GetDecoderInfo(*current_rtp_payload_type_)) // 注意这里
        << "Payload type " << static_cast<int>(*current_rtp_payload_type_)
        << " is unknown where it shouldn't be";
  }

  if (update_sample_rate_and_channels && !packet_buffer_->Empty()) {
    // We do not use |current_rtp_payload_type_| to |set payload_type|, but
    // get the next RTP header from |packet_buffer_| to obtain the payload type.
    // The reason for it is the following corner case. If NetEq receives a
    // CNG packet with a sample rate different than the current CNG then it
    // flushes its buffer, assuming send codec must have been changed. However,
    // payload type of the hypothetically new send codec is not known.
    const Packet* next_packet = packet_buffer_->PeekNextPacket();
    RTC_DCHECK(next_packet);
    const int payload_type = next_packet->payload_type;
    size_t channels = 1;
    if (!decoder_database_->IsComfortNoise(payload_type)) {
      AudioDecoder* decoder = decoder_database_->GetDecoder(payload_type);
      assert(decoder);  // Payloads are already checked to be valid.
      channels = decoder->Channels();
    }
    const DecoderDatabase::DecoderInfo* decoder_info =
        decoder_database_->GetDecoderInfo(payload_type);
    assert(decoder_info);
    if (decoder_info->SampleRateHz() != fs_hz_ ||
        channels != algorithm_buffer_->Channels()) {
      SetSampleRateAndChannels(decoder_info->SampleRateHz(), channels);
    }
    if (nack_enabled_) {
      RTC_DCHECK(nack_);
      // Update the sample rate even if the rate is not new, because of Reset().
      nack_->UpdateSampleRate(fs_hz_);
    }
  }

  // TODO(hlundin): Move this code to DelayManager class.
  const DecoderDatabase::DecoderInfo* dec_info =
      decoder_database_->GetDecoderInfo(main_payload_type); // 注意这里
  assert(dec_info);  // Already checked that the payload type is known.
  delay_manager_->LastDecodedWasCngOrDtmf(dec_info->IsComfortNoise() ||
                                          dec_info->IsDtmf());
  if (delay_manager_->last_pack_cng_or_dtmf() == 0) {
    // Calculate the total speech length carried in each packet.
    if (number_of_primary_packets > 0) {
      const size_t packet_length_samples =
          number_of_primary_packets * decoder_frame_length_;
      if (packet_length_samples != decision_logic_->packet_length_samples()) {
        decision_logic_->set_packet_length_samples(packet_length_samples);
        delay_manager_->SetPacketAudioLength(
            rtc::dchecked_cast<int>((1000 * packet_length_samples) / fs_hz_));
      }
    }

    // Update statistics.
    if ((enable_rtx_handling_ || (int32_t)(main_timestamp - timestamp_) >= 0) &&
        !new_codec_) {
      // Only update statistics if incoming packet is not older than last played
      // out packet or RTX handling is enabled, and if new codec flag is not
      // set.
      delay_manager_->Update(main_sequence_number, main_timestamp, fs_hz_);
    }
  } else if (delay_manager_->last_pack_cng_or_dtmf() == -1) {
    // This is first "normal" packet after CNG or DTMF.
    // Reset packet time counter and measure time until next packet,
    // but don't update statistics.
    delay_manager_->set_last_pack_cng_or_dtmf(0);
    delay_manager_->ResetPacketIatCount();
  }
  return 0;
}



std::vector<AudioDecoder::ParseResult> AudioDecoderOpusImpl::ParsePayload(
    rtc::Buffer&& payload,
    uint32_t timestamp) {
  std::vector<ParseResult> results;

  if (PacketHasFec(payload.data(), payload.size())) {
    const int duration =
        PacketDurationRedundant(payload.data(), payload.size());
    RTC_DCHECK_GE(duration, 0);
    rtc::Buffer payload_copy(payload.data(), payload.size());
    std::unique_ptr<EncodedAudioFrame> fec_frame(
        new OpusFrame(this, std::move(payload_copy), false)); // 注意这里OpusFrame.decoder_ 指向的就是 AudioDecoderOpusImpl
    results.emplace_back(timestamp - duration, 1, std::move(fec_frame));
  }
  std::unique_ptr<EncodedAudioFrame> frame(
      new OpusFrame(this, std::move(payload), true)); // OpusFrame.decoder_ 指向的就是 AudioDecoderOpusImpl
  results.emplace_back(timestamp, 0, std::move(frame));
  return results;
}



int PacketBuffer::InsertPacketList(
    PacketList* packet_list,
    const DecoderDatabase& decoder_database,
    absl::optional<uint8_t>* current_rtp_payload_type,
    absl::optional<uint8_t>* current_cng_rtp_payload_type,
    StatisticsCalculator* stats) {
  RTC_DCHECK(stats);
  bool flushed = false;
  for (auto& packet : *packet_list) {
    if (decoder_database.IsComfortNoise(packet.payload_type)) {
      if (*current_cng_rtp_payload_type &&
          **current_cng_rtp_payload_type != packet.payload_type) {
        // New CNG payload type implies new codec type.
        *current_rtp_payload_type = absl::nullopt;
        Flush();
        flushed = true;
      }
      *current_cng_rtp_payload_type = packet.payload_type;
    } else if (!decoder_database.IsDtmf(packet.payload_type)) {
      // This must be speech.
      if ((*current_rtp_payload_type &&
           **current_rtp_payload_type != packet.payload_type) ||
          (*current_cng_rtp_payload_type &&
           !EqualSampleRates(packet.payload_type,
                             **current_cng_rtp_payload_type,
                             decoder_database))) {
        *current_cng_rtp_payload_type = absl::nullopt;
        Flush();
        flushed = true;
      }
      *current_rtp_payload_type = packet.payload_type;
    }
    int return_val = InsertPacket(std::move(packet), stats); // 注意这里
    if (return_val == kFlushed) {
      // The buffer flushed, but this is not an error. We can still continue.
      flushed = true;
    } else if (return_val != kOK) {
      // An error occurred. Delete remaining packets in list and return.
      packet_list->clear();
      return return_val;
    }
  }
  packet_list->clear();
  return flushed ? kFlushed : kOK;
}



int PacketBuffer::InsertPacket(Packet&& packet, StatisticsCalculator* stats) {
  if (packet.empty()) {
    RTC_LOG(LS_WARNING) << "InsertPacket invalid packet";
    return kInvalidPacket;
  }

  RTC_DCHECK_GE(packet.priority.codec_level, 0);
  RTC_DCHECK_GE(packet.priority.red_level, 0);

  int return_val = kOK;

  packet.waiting_time = tick_timer_->GetNewStopwatch();

  if (buffer_.size() >= max_number_of_packets_) {
    // Buffer is full. Flush it.
    Flush();
    stats->FlushedPacketBuffer();
    RTC_LOG(LS_WARNING) << "Packet buffer flushed";
    return_val = kFlushed;
  }

  // Get an iterator pointing to the place in the buffer where the new packet
  // should be inserted. The list is searched from the back, since the most
  // likely case is that the new packet should be near the end of the list.
  PacketList::reverse_iterator rit = std::find_if(
      buffer_.rbegin(), buffer_.rend(), NewTimestampIsLarger(packet));

  // The new packet is to be inserted to the right of |rit|. If it has the same
  // timestamp as |rit|, which has a higher priority, do not insert the new
  // packet to list.
  if (rit != buffer_.rend() && packet.timestamp == rit->timestamp) {
    LogPacketDiscarded(packet.priority.codec_level, stats);
    return return_val;
  }

  // The new packet is to be inserted to the left of |it|. If it has the same
  // timestamp as |it|, which has a lower priority, replace |it| with the new
  // packet.
  PacketList::iterator it = rit.base();
  if (it != buffer_.end() && packet.timestamp == it->timestamp) {
    LogPacketDiscarded(it->priority.codec_level, stats);
    it = buffer_.erase(it);
  }
  buffer_.insert(it, std::move(packet));  // Insert the packet at that position.

  return return_val;
}

2 流程图

 

上一篇:WebRTC获取设备信息


下一篇:基于WebRTC编译网页播放平台播放视频video标签在运行却没有画面,如何解决?