Webrtc中Neteq源码分析

Neteq

MCU模块

底层 Socket 收到一个 UDP 包后,触发从 UDP 包到 RTP 包的解析,经过对 SSRC 和 PayloadType 的匹配,找到对应的音频流接收的 Channel,然后从 InsertPacketInternal 输入到 NetEQ 的接收模块中。

1、数据包的插入

int NetEqImpl::InsertPacketInternal(const RTPHeader& rtp_header,
                                    rtc::ArrayView<const uint8_t> payload) {
  if (payload.empty()) {
    RTC_LOG_F(LS_ERROR) << "payload is empty";
    return kInvalidPointer;
  }

  //设置接收时间戳和统计
  int64_t receive_time_ms = clock_->TimeInMilliseconds();
  stats_->ReceivedPacket();
  
  //构造packet包
  PacketList packet_list;
  // Insert packet in a packet list.
  packet_list.push_back([&rtp_header, &payload, &receive_time_ms] {
    // Convert to Packet.
    Packet packet;
    packet.payload_type = rtp_header.payloadType;
    packet.sequence_number = rtp_header.sequenceNumber;
    packet.timestamp = rtp_header.timestamp;
    packet.payload.SetData(payload.data(), payload.size());
    packet.packet_info = RtpPacketInfo(rtp_header, receive_time_ms);
    // Waiting time will be set upon inserting the packet in the buffer.
    RTC_DCHECK(!packet.waiting_time);
    return packet;
  }());

  bool update_sample_rate_and_channels = first_packet_;
  //如果是首包,重置timestamp_scaler_
  if (update_sample_rate_and_channels) {
    // Reset timestamp scaling.
    timestamp_scaler_->Reset();
  }
  
  if (!decoder_database_->IsRed(rtp_header.payloadType)) {
    // Scale timestamp to internal domain (only for some codecs).
    timestamp_scaler_->ToInternal(&packet_list);
  }
   //先保留数据
  // Store these for later use, since the first packet may very well disappear
  // before we need these values.
  uint32_t main_timestamp = packet_list.front().timestamp;
  uint8_t main_payload_type = packet_list.front().payload_type;
  uint16_t main_sequence_number = packet_list.front().sequence_number;

  // Reinitialize NetEq if it's needed (changed SSRC or first call).
  //首包,需要重置packet_buffer
  if (update_sample_rate_and_channels) {
    // Note: |first_packet_| will be cleared further down in this method, once
    // the packet has been successfully inserted into the packet buffer.

    // Flush the packet buffer and DTMF buffer.
    packet_buffer_->Flush();
    dtmf_buffer_->Flush();

    // Update audio buffer timestamp.
    sync_buffer_->IncreaseEndTimestamp(main_timestamp - timestamp_);

    // Update codecs.
    timestamp_ = main_timestamp;
  }
  //允许重传
  if (nack_enabled_) {
    RTC_DCHECK(nack_);
    if (update_sample_rate_and_channels) {
      nack_->Reset();
    }
    nack_->UpdateLastReceivedPacket(rtp_header.sequenceNumber,
                                    rtp_header.timestamp);
  }

void NackTracker::UpdateLastReceivedPacket(uint16_t sequence_number,
                                           uint32_t timestamp) {
  // Just record the value of sequence number and timestamp if this is the
  // first packet.
  if (!any_rtp_received_) {
    sequence_num_last_received_rtp_ = sequence_number;
    timestamp_last_received_rtp_ = timestamp;
    any_rtp_received_ = true;
    // If no packet is decoded, to have a reasonable estimate of time-to-play
    // use the given values.
    if (!any_rtp_decoded_) {
      sequence_num_last_decoded_rtp_ = sequence_number;
      timestamp_last_decoded_rtp_ = timestamp;
    }
    return;
  }

  if (sequence_number == sequence_num_last_received_rtp_)
    return;
  
  //从nack_list_中删除对应seq的包
  // Received RTP should not be in the list.
  nack_list_.erase(sequence_number);

  //判断包seq是否小于上次接收到的包seq,小于直接返回
  // If this is an old sequence number, no more action is required, return.
  if (IsNewerSequenceNumber(sequence_num_last_received_rtp_, sequence_number))
    return;
  
  //更新每个rtp包样本数量
  UpdateSamplesPerPacket(sequence_number, timestamp);

  UpdateList(sequence_number);

  sequence_num_last_received_rtp_ = sequence_number;
  timestamp_last_received_rtp_ = timestamp;
  LimitNackListSize();
}
void NackTracker::UpdateList(uint16_t sequence_number_current_received_rtp) {
  // Some of the packets which were considered late, now are considered missing.
  //比当前包小于一定阈值的包,被认作丢包,sequence_number_current_received_rtp - nack_threshold_packets_
  ChangeFromLateToMissing(sequence_number_current_received_rtp);

  //从sequence_num_last_received_rtp_ + 1~sequence_number_current_received_rtp
  // 之间的包seq,upper_bound_missing = sequence_number_current_received_rtp - nack_threshold_packets_.
  //该seq小 于丢包判断上限,就认为丢包,插入nack_list_中
  if (IsNewerSequenceNumber(sequence_number_current_received_rtp,
                            sequence_num_last_received_rtp_ + 1))
    AddToList(sequence_number_current_received_rtp);
}
  //对冗余包进行拆分并插入packet_list
  if (decoder_database_->IsRed(rtp_header.payloadType)) {
    if (!red_payload_splitter_->SplitRed(&packet_list)) {
      return kRedundancySplitError;
    }
    // Only accept a few RED payloads of the same type as the main data,
    // DTMF events and CNG.
    red_payload_splitter_->CheckRedPayloads(&packet_list, *decoder_database_);
    if (packet_list.empty()) {
      return kRedundancySplitError;
    }
  }

  // 将packet的包头信息拆解出来,插入parsed_packet_list,根据接收类型(正常包,red包)统计接收fec包个数,将parsed_packet_list插入到packet_buffer_中
 PacketList parsed_packet_list;
  while (!packet_list.empty()) {
    Packet& packet = packet_list.front();
    const DecoderDatabase::DecoderInfo* info =
        decoder_database_->GetDecoderInfo(packet.payload_type);
    if (!info) {
      RTC_LOG(LS_WARNING) << "SplitAudio unknown payload type";
      return kUnknownRtpPayloadType;
    }

    if (info->IsComfortNoise()) {
      // Carry comfort noise packets along.
      parsed_packet_list.splice(parsed_packet_list.end(), packet_list,
                                packet_list.begin());
    } else {
      const auto sequence_number = packet.sequence_number;
      const auto payload_type = packet.payload_type;
      const Packet::Priority original_priority = packet.priority;
      const auto& packet_info = packet.packet_info;
      auto packet_from_result = [&](AudioDecoder::ParseResult& result) {
        Packet new_packet;
        new_packet.sequence_number = sequence_number;
        new_packet.payload_type = payload_type;
        new_packet.timestamp = result.timestamp;
        new_packet.priority.codec_level = result.priority;
        new_packet.priority.red_level = original_priority.red_level;
        new_packet.packet_info = packet_info;
        new_packet.frame = std::move(result.frame);
        return new_packet;
      };

      std::vector<AudioDecoder::ParseResult> results =
          info->GetDecoder()->ParsePayload(std::move(packet.payload),
                                           packet.timestamp);
      if (results.empty()) {
        packet_list.pop_front();
      } else {
        bool first = true;
        for (auto& result : results) {
          RTC_DCHECK(result.frame);
          RTC_DCHECK_GE(result.priority, 0);
          if (first) {
            // Re-use the node and move it to parsed_packet_list.
            packet_list.front() = packet_from_result(result);
            parsed_packet_list.splice(parsed_packet_list.end(), packet_list,
                                      packet_list.begin());
            first = false;
          } else {
            parsed_packet_list.push_back(packet_from_result(result));
          }
        }
      }
    }
  }
  //按照一定优先级,采集时间戳>seq>优先级(解码类型、red类型),插入队列中,
  //如果队列中含有包有相同的时间戳,则替换。
  const int ret = packet_buffer_->InsertPacketList(
      &parsed_packet_list, *decoder_database_, &current_rtp_payload_type_,
      &current_cng_rtp_payload_type_, stats_.get());
  if (ret == PacketBuffer::kFlushed) {
    // Reset DSP timestamp etc. if packet buffer flushed.
    new_codec_ = true;
    update_sample_rate_and_channels = true;
  } else if (ret != PacketBuffer::kOK) {
    return kOtherError;
  }
//设置解码器采样率和通道数
if (update_sample_rate_and_channels && !packet_buffer_->Empty()) {
    // We do not use |current_rtp_payload_type_| to |set payload_type|, but
    // get the next RTP header from |packet_buffer_| to obtain the payload type.
    // The reason for it is the following corner case. If NetEq receives a
    // CNG packet with a sample rate different than the current CNG then it
    // flushes its buffer, assuming send codec must have been changed. However,
    // payload type of the hypothetically new send codec is not known.
    const Packet* next_packet = packet_buffer_->PeekNextPacket();
    RTC_DCHECK(next_packet);
    const int payload_type = next_packet->payload_type;
    size_t channels = 1;
    if (!decoder_database_->IsComfortNoise(payload_type)) {
      AudioDecoder* decoder = decoder_database_->GetDecoder(payload_type);
      assert(decoder);  // Payloads are already checked to be valid.
      channels = decoder->Channels();
    }
    const DecoderDatabase::DecoderInfo* decoder_info =
        decoder_database_->GetDecoderInfo(payload_type);
    assert(decoder_info);
    if (decoder_info->SampleRateHz() != fs_hz_ ||
        channels != algorithm_buffer_->Channels()) {
      SetSampleRateAndChannels(decoder_info->SampleRateHz(), channels);
    }
    if (nack_enabled_) {
      RTC_DCHECK(nack_);
      // Update the sample rate even if the rate is not new, because of Reset().
      nack_->UpdateSampleRate(fs_hz_);
    }
  }
  const DecoderDatabase::DecoderInfo* dec_info =
      decoder_database_->GetDecoderInfo(main_payload_type);
  assert(dec_info);  // Already checked that the payload type is known.
  delay_manager_->LastDecodedWasCngOrDtmf(dec_info->IsComfortNoise() ||
                                          dec_info->IsDtmf());
  if (delay_manager_->last_pack_cng_or_dtmf() == 0) {
    // Calculate the total speech length carried in each packet.
    if (number_of_primary_packets > 0) {
      const size_t packet_length_samples =
          number_of_primary_packets * decoder_frame_length_;
       //设置包长
      if (packet_length_samples != decision_logic_->packet_length_samples()) {
        decision_logic_->set_packet_length_samples(packet_length_samples);
        delay_manager_->SetPacketAudioLength(
            rtc::dchecked_cast<int>((1000 * packet_length_samples) / fs_hz_));
      }
    }

    // Update statistics.
    if ((enable_rtx_handling_ || (int32_t)(main_timestamp - timestamp_) >= 0) &&
        !new_codec_) {
      // Only update statistics if incoming packet is not older than last played
      // out packet or RTX handling is enabled, and if new codec flag is not
      // set.
      delay_manager_->Update(main_sequence_number, main_timestamp, fs_hz_);
    }
int DelayManager::Update(uint16_t sequence_number,
                         uint32_t timestamp,
                         int sample_rate_hz) {
  if (sample_rate_hz <= 0) {
    return -1;
  }

  if (!first_packet_received_) {
    // Prepare for next packet arrival.
    packet_iat_stopwatch_ = tick_timer_->GetNewStopwatch();
    last_seq_no_ = sequence_number;
    last_timestamp_ = timestamp;
    first_packet_received_ = true;
    return 0;
  }

  // Try calculating packet length from current and previous timestamps.
  int packet_len_ms;
  //计算包长,采集时间差/采样率 = 每个样本采集间隔时长
  if (!IsNewerTimestamp(timestamp, last_timestamp_) ||
      !IsNewerSequenceNumber(sequence_number, last_seq_no_)) {
    // Wrong timestamp or sequence order; use stored value.
    packet_len_ms = packet_len_ms_;
  } else {
    // Calculate timestamps per packet and derive packet length in ms.
    int64_t packet_len_samp =
        static_cast<uint32_t>(timestamp - last_timestamp_) /
        static_cast<uint16_t>(sequence_number - last_seq_no_);
    packet_len_ms =
        rtc::saturated_cast<int>(1000 * packet_len_samp / sample_rate_hz);
  }

  bool reordered = false;
  if (packet_len_ms > 0) {
    // Cannot update statistics unless |packet_len_ms| is valid.
    
    //包seq不连续,矫正
    // Inter-arrival time (IAT) in integer "packet times" (rounding down). This
    // is the value added to the inter-arrival time histogram.
    //抖动的计算 iat_ms/packet_len_ms,接收间隔/包时长
    int iat_ms = packet_iat_stopwatch_->ElapsedMs();
    int iat_packets = iat_ms / packet_len_ms;
    // Check for discontinuous packet sequence and re-ordering.
    if (IsNewerSequenceNumber(sequence_number, last_seq_no_ + 1)) {
      // Compensate for gap in the sequence numbers. Reduce IAT with the
      // expected extra time due to lost packets.
      int packet_offset =
          static_cast<uint16_t>(sequence_number - last_seq_no_ - 1);
      iat_packets -= packet_offset;
      iat_ms -= packet_offset * packet_len_ms;
    } else if (!IsNewerSequenceNumber(sequence_number, last_seq_no_)) {
      int packet_offset =
          static_cast<uint16_t>(last_seq_no_ + 1 - sequence_number);
      iat_packets += packet_offset;
      iat_ms += packet_offset * packet_len_ms;
      reordered = true;
    }

    int iat_delay = iat_ms - packet_len_ms;
    int relative_delay;
    if (reordered) {
      relative_delay = std::max(iat_delay, 0);
    } else {
      //更新延时抖动到delay_history_(2000ms)
      UpdateDelayHistory(iat_delay, timestamp, sample_rate_hz);
      //计算历史抖动和,结果必须为正
      relative_delay = CalculateRelativePacketArrivalDelay();
    }
    statistics_->RelativePacketArrivalDelay(relative_delay);

    switch (histogram_mode_) {
      case RELATIVE_ARRIVAL_DELAY: {
        //计算索引,kBucketSizeMs为20,并插入buckets_对应位置。并调整buckets_中每个元素概率、
        //int forget_factor = (1 << 15) * (1 - start_forget_weight_.value() / (add_count_ + 1));
        //遗忘因子的收敛和初始值start_forget_weight_有关,和添加的抖动个数有关,添加的抖动数量越多,变化越平缓
        //int quantile = 1041529569;  // 0.97 in Q30.
 		// int forget_factor = 32745;  // 0.9993 in Q15.
        const int index = relative_delay / kBucketSizeMs;
        if (index < histogram_->NumBuckets()) {
          // Maximum delay to register is 2000 ms.
          //添加抖动到对应概率直方图中
          histogram_->Add(index);
        }
        break;
      }
      case INTER_ARRIVAL_TIME: {
        // Saturate IAT between 0 and maximum value.
        iat_packets =
            std::max(std::min(iat_packets, histogram_->NumBuckets() - 1), 0);
        histogram_->Add(iat_packets);
        break;
      }
    }
    // Calculate new |target_level_| based on updated statistics.
    target_level_ = CalculateTargetLevel(iat_packets, reordered);

    LimitTargetLevel();
  }  // End if (packet_len_ms > 0).

  if (enable_rtx_handling_ && reordered &&
      num_reordered_packets_ < kMaxReorderedPackets) {
    ++num_reordered_packets_;
    return 0;
  }
  num_reordered_packets_ = 0;
  // Prepare for next packet arrival.
  packet_iat_stopwatch_ = tick_timer_->GetNewStopwatch();
  last_seq_no_ = sequence_number;
  last_timestamp_ = timestamp;
  return 0;
}

RELATIVE_ARRIVAL_DELAY模式将历史所有抖动进行相关联,抖动变化会比较平缓,收敛比较慢
INTER_ARRIVAL_TIME模式直接添加当前抖动到直方图中,变化比较及时,抖动感知应该会更强

int DelayManager::CalculateTargetLevel(int iat_packets, bool reordered) {
  int limit_probability = histogram_quantile_;
  //统计buckets_中所以iat的和大于0.97
  int bucket_index = histogram_->Quantile(limit_probability);
  int target_level;
  switch (histogram_mode_) {
    case RELATIVE_ARRIVAL_DELAY: {
     //这种模式下,返回的index是从0开始,直到概率和大于0.97,返会对应index的抖动大小
     //既没有取平均值,也不是终值,取得是抖动比较大的,然后计算target_level
      target_level = 1;
      if (packet_len_ms_ > 0) {
        target_level += bucket_index * kBucketSizeMs / packet_len_ms_;
      }
      base_target_level_ = target_level;
      break;
    }
    case INTER_ARRIVAL_TIME: {
    //
      target_level = std::max(bucket_index, 1);
      base_target_level_ = target_level;
      // Update detector for delay peaks.
      bool delay_peak_found =
          peak_detector_.Update(iat_packets, reordered, target_level);
      if (delay_peak_found) {
        target_level = std::max(target_level, peak_detector_.MaxPeakHeight());
      }
      break;
    }
  }

  // Sanity check. |target_level| must be strictly positive.
  target_level = std::max(target_level, 1);
  // Scale to Q8 and assign to member variable.
  target_level_ = target_level << 8;
  if (extra_delay_ms_ && packet_len_ms_ > 0) {
    int extra_delay = (extra_delay_ms_.value() << 8) / packet_len_ms_;
    target_level_ += extra_delay;
  }
  return target_level_;
}

bool DelayPeakDetector::Update(int inter_arrival_time,
                               bool reordered,
                               int target_level) {
  if (ignore_reordered_packets_ && reordered) {
    return CheckPeakConditions();
  }
  //当iat>B+ + peak_detection_threshold_ 或者iat>2B时判断检查到了一个抖动峰值
  if (inter_arrival_time > target_level + peak_detection_threshold_ ||
      inter_arrival_time > 2 * target_level) {
    // A delay peak is observed.
    if (!peak_period_stopwatch_) {
      // This is the first peak. Reset the period counter.
      peak_period_stopwatch_ = tick_timer_->GetNewStopwatch();
    } else if (peak_period_stopwatch_->ElapsedMs() > 0) {
      //kMaxPeakPeriodMs为10000,10s
      //如果距离上一次检测到的峰值不到10s,判断为在一个抖动峰值期间内
      if (peak_period_stopwatch_->ElapsedMs() <= kMaxPeakPeriodMs) {
        // This is not the first peak, and the period is valid.
        // Store peak data in the vector.
        Peak peak_data;
        peak_data.period_ms = peak_period_stopwatch_->ElapsedMs();
        peak_data.peak_height_packets = inter_arrival_time;
        peak_history_.push_back(peak_data);
        while (peak_history_.size() > kMaxNumPeaks) {
          // Delete the oldest data point.
          peak_history_.pop_front();
        }
        peak_period_stopwatch_ = tick_timer_->GetNewStopwatch();
      } else if (peak_period_stopwatch_->ElapsedMs() <= 2 * kMaxPeakPeriodMs) {
        // Invalid peak due to too long period. Reset period counter and start
        // looking for next peak.
        //  //如果距离上一次检测到的峰值在10s~20s内,则不再一个抖动峰值期间内,待后续抖动峰值是否连续
        peak_period_stopwatch_ = tick_timer_->GetNewStopwatch();
      } else {
        //如果距离上一次检测到的峰值时间超过2*kMaxPeakPeriodMs,即超过20s,则认为网络已经变化,重置峰值检测
        //清空peak_history_和峰值监测时间  peak_period_stopwatch_.reset();
        // More than 2 times the maximum period has elapsed since the last peak
        // was registered. It seams that the network conditions have changed.
        // Reset the peak statistics.
        Reset();
      }
    }
  }
  return CheckPeakConditions();
}
//是否出现峰值检测,peak_history_个数大于kMinPeaksToTrigger (2),上次的峰值监测时间小于历史峰值中出现的最大的峰值区间
bool DelayPeakDetector::CheckPeakConditions() {
  size_t s = peak_history_.size();
  if (s >= kMinPeaksToTrigger &&
      peak_period_stopwatch_->ElapsedMs() <= 2 * MaxPeakPeriod()) {
    peak_found_ = true;
  } else {
    peak_found_ = false;
  }
  return peak_found_;
}
} 
    case INTER_ARRIVAL_TIME: {
      target_level = std::max(bucket_index, 1);
      base_target_level_ = target_level;
      // Update detector for delay peaks.
      bool delay_peak_found =
          peak_detector_.Update(iat_packets, reordered, target_level);
      if (delay_peak_found) {
        //出现峰值,则目标抖动为历史峰值的最大值,peak_history_的size为8,如果想抖动更新更及时,可以减小☞5,抖动峰值期间判断时间缩小
        target_level = std::max(target_level, peak_detector_.MaxPeakHeight());
      }
      break;
    }

总结:
1、抖动更新模式选择 INTER_ARRIVAL_TIME,更为及时
2、packet_buffer_的max_number_of_packets_最大值可以通过Dependencies配置更改,网络较好的情况下,可以适当减小,默认是200,一个包长20ms,就是4s的抖动。
3、历史抖动delay_history_的size减小,则抖动更新更为灵敏,但是可能出现剧烈抖动,size增大,抖动更新更为平缓,更为稳定。默认size大小为2000ms。
4、遗忘因子的计算:int forget_factor = (1 << 15) * (1 - start_forget_weight_.value() / (add_count_ + 1));初始值越大,收敛越缓慢,随着add_count_ 的增大,变化越来越小,add_count_ 可以设置一个固定最大值,动态更新iat概率。默认值为 int forget_factor = 32745; // 0.9993 in Q15。int quantile = 1041529569; // 0.97 in Q30.适当减小也可增加收敛速度。
5、峰值检测,峰值周期判断为kMaxPeakPeriodMs,默认10s,即10s内出现的抖动峰值认为在一个峰值区间内,kMaxPeakPeriodMs增大,意味着,网络变化比较缓慢,减小,意味着网络变化比较迅速。
kMaxNumPeaks为peak_history_的size,默认为8,即历史峰值区间个数,可适当减小,更为精准。同时可以偶现尖端峰值,peak_history_中可以进行峰值过滤。

参考文档:
添加链接描述
添加链接描述
添加链接描述

上一篇:基于ffmpeg和libvlc的视频剪辑、播放器


下一篇:【mediasoup】判断数据包是否是rtp包