Neteq
MCU模块
底层 Socket 收到一个 UDP 包后,触发从 UDP 包到 RTP 包的解析,经过对 SSRC 和 PayloadType 的匹配,找到对应的音频流接收的 Channel,然后从 InsertPacketInternal 输入到 NetEQ 的接收模块中。
1、数据包的插入
int NetEqImpl::InsertPacketInternal(const RTPHeader& rtp_header,
rtc::ArrayView<const uint8_t> payload) {
if (payload.empty()) {
RTC_LOG_F(LS_ERROR) << "payload is empty";
return kInvalidPointer;
}
//设置接收时间戳和统计
int64_t receive_time_ms = clock_->TimeInMilliseconds();
stats_->ReceivedPacket();
//构造packet包
PacketList packet_list;
// Insert packet in a packet list.
packet_list.push_back([&rtp_header, &payload, &receive_time_ms] {
// Convert to Packet.
Packet packet;
packet.payload_type = rtp_header.payloadType;
packet.sequence_number = rtp_header.sequenceNumber;
packet.timestamp = rtp_header.timestamp;
packet.payload.SetData(payload.data(), payload.size());
packet.packet_info = RtpPacketInfo(rtp_header, receive_time_ms);
// Waiting time will be set upon inserting the packet in the buffer.
RTC_DCHECK(!packet.waiting_time);
return packet;
}());
bool update_sample_rate_and_channels = first_packet_;
//如果是首包,重置timestamp_scaler_
if (update_sample_rate_and_channels) {
// Reset timestamp scaling.
timestamp_scaler_->Reset();
}
if (!decoder_database_->IsRed(rtp_header.payloadType)) {
// Scale timestamp to internal domain (only for some codecs).
timestamp_scaler_->ToInternal(&packet_list);
}
//先保留数据
// Store these for later use, since the first packet may very well disappear
// before we need these values.
uint32_t main_timestamp = packet_list.front().timestamp;
uint8_t main_payload_type = packet_list.front().payload_type;
uint16_t main_sequence_number = packet_list.front().sequence_number;
// Reinitialize NetEq if it's needed (changed SSRC or first call).
//首包,需要重置packet_buffer
if (update_sample_rate_and_channels) {
// Note: |first_packet_| will be cleared further down in this method, once
// the packet has been successfully inserted into the packet buffer.
// Flush the packet buffer and DTMF buffer.
packet_buffer_->Flush();
dtmf_buffer_->Flush();
// Update audio buffer timestamp.
sync_buffer_->IncreaseEndTimestamp(main_timestamp - timestamp_);
// Update codecs.
timestamp_ = main_timestamp;
}
//允许重传
if (nack_enabled_) {
RTC_DCHECK(nack_);
if (update_sample_rate_and_channels) {
nack_->Reset();
}
nack_->UpdateLastReceivedPacket(rtp_header.sequenceNumber,
rtp_header.timestamp);
}
void NackTracker::UpdateLastReceivedPacket(uint16_t sequence_number,
uint32_t timestamp) {
// Just record the value of sequence number and timestamp if this is the
// first packet.
if (!any_rtp_received_) {
sequence_num_last_received_rtp_ = sequence_number;
timestamp_last_received_rtp_ = timestamp;
any_rtp_received_ = true;
// If no packet is decoded, to have a reasonable estimate of time-to-play
// use the given values.
if (!any_rtp_decoded_) {
sequence_num_last_decoded_rtp_ = sequence_number;
timestamp_last_decoded_rtp_ = timestamp;
}
return;
}
if (sequence_number == sequence_num_last_received_rtp_)
return;
//从nack_list_中删除对应seq的包
// Received RTP should not be in the list.
nack_list_.erase(sequence_number);
//判断包seq是否小于上次接收到的包seq,小于直接返回
// If this is an old sequence number, no more action is required, return.
if (IsNewerSequenceNumber(sequence_num_last_received_rtp_, sequence_number))
return;
//更新每个rtp包样本数量
UpdateSamplesPerPacket(sequence_number, timestamp);
UpdateList(sequence_number);
sequence_num_last_received_rtp_ = sequence_number;
timestamp_last_received_rtp_ = timestamp;
LimitNackListSize();
}
void NackTracker::UpdateList(uint16_t sequence_number_current_received_rtp) {
// Some of the packets which were considered late, now are considered missing.
//比当前包小于一定阈值的包,被认作丢包,sequence_number_current_received_rtp - nack_threshold_packets_
ChangeFromLateToMissing(sequence_number_current_received_rtp);
//从sequence_num_last_received_rtp_ + 1~sequence_number_current_received_rtp
// 之间的包seq,upper_bound_missing = sequence_number_current_received_rtp - nack_threshold_packets_.
//该seq小 于丢包判断上限,就认为丢包,插入nack_list_中
if (IsNewerSequenceNumber(sequence_number_current_received_rtp,
sequence_num_last_received_rtp_ + 1))
AddToList(sequence_number_current_received_rtp);
}
//对冗余包进行拆分并插入packet_list
if (decoder_database_->IsRed(rtp_header.payloadType)) {
if (!red_payload_splitter_->SplitRed(&packet_list)) {
return kRedundancySplitError;
}
// Only accept a few RED payloads of the same type as the main data,
// DTMF events and CNG.
red_payload_splitter_->CheckRedPayloads(&packet_list, *decoder_database_);
if (packet_list.empty()) {
return kRedundancySplitError;
}
}
// 将packet的包头信息拆解出来,插入parsed_packet_list,根据接收类型(正常包,red包)统计接收fec包个数,将parsed_packet_list插入到packet_buffer_中
PacketList parsed_packet_list;
while (!packet_list.empty()) {
Packet& packet = packet_list.front();
const DecoderDatabase::DecoderInfo* info =
decoder_database_->GetDecoderInfo(packet.payload_type);
if (!info) {
RTC_LOG(LS_WARNING) << "SplitAudio unknown payload type";
return kUnknownRtpPayloadType;
}
if (info->IsComfortNoise()) {
// Carry comfort noise packets along.
parsed_packet_list.splice(parsed_packet_list.end(), packet_list,
packet_list.begin());
} else {
const auto sequence_number = packet.sequence_number;
const auto payload_type = packet.payload_type;
const Packet::Priority original_priority = packet.priority;
const auto& packet_info = packet.packet_info;
auto packet_from_result = [&](AudioDecoder::ParseResult& result) {
Packet new_packet;
new_packet.sequence_number = sequence_number;
new_packet.payload_type = payload_type;
new_packet.timestamp = result.timestamp;
new_packet.priority.codec_level = result.priority;
new_packet.priority.red_level = original_priority.red_level;
new_packet.packet_info = packet_info;
new_packet.frame = std::move(result.frame);
return new_packet;
};
std::vector<AudioDecoder::ParseResult> results =
info->GetDecoder()->ParsePayload(std::move(packet.payload),
packet.timestamp);
if (results.empty()) {
packet_list.pop_front();
} else {
bool first = true;
for (auto& result : results) {
RTC_DCHECK(result.frame);
RTC_DCHECK_GE(result.priority, 0);
if (first) {
// Re-use the node and move it to parsed_packet_list.
packet_list.front() = packet_from_result(result);
parsed_packet_list.splice(parsed_packet_list.end(), packet_list,
packet_list.begin());
first = false;
} else {
parsed_packet_list.push_back(packet_from_result(result));
}
}
}
}
}
//按照一定优先级,采集时间戳>seq>优先级(解码类型、red类型),插入队列中,
//如果队列中含有包有相同的时间戳,则替换。
const int ret = packet_buffer_->InsertPacketList(
&parsed_packet_list, *decoder_database_, ¤t_rtp_payload_type_,
¤t_cng_rtp_payload_type_, stats_.get());
if (ret == PacketBuffer::kFlushed) {
// Reset DSP timestamp etc. if packet buffer flushed.
new_codec_ = true;
update_sample_rate_and_channels = true;
} else if (ret != PacketBuffer::kOK) {
return kOtherError;
}
//设置解码器采样率和通道数
if (update_sample_rate_and_channels && !packet_buffer_->Empty()) {
// We do not use |current_rtp_payload_type_| to |set payload_type|, but
// get the next RTP header from |packet_buffer_| to obtain the payload type.
// The reason for it is the following corner case. If NetEq receives a
// CNG packet with a sample rate different than the current CNG then it
// flushes its buffer, assuming send codec must have been changed. However,
// payload type of the hypothetically new send codec is not known.
const Packet* next_packet = packet_buffer_->PeekNextPacket();
RTC_DCHECK(next_packet);
const int payload_type = next_packet->payload_type;
size_t channels = 1;
if (!decoder_database_->IsComfortNoise(payload_type)) {
AudioDecoder* decoder = decoder_database_->GetDecoder(payload_type);
assert(decoder); // Payloads are already checked to be valid.
channels = decoder->Channels();
}
const DecoderDatabase::DecoderInfo* decoder_info =
decoder_database_->GetDecoderInfo(payload_type);
assert(decoder_info);
if (decoder_info->SampleRateHz() != fs_hz_ ||
channels != algorithm_buffer_->Channels()) {
SetSampleRateAndChannels(decoder_info->SampleRateHz(), channels);
}
if (nack_enabled_) {
RTC_DCHECK(nack_);
// Update the sample rate even if the rate is not new, because of Reset().
nack_->UpdateSampleRate(fs_hz_);
}
}
const DecoderDatabase::DecoderInfo* dec_info =
decoder_database_->GetDecoderInfo(main_payload_type);
assert(dec_info); // Already checked that the payload type is known.
delay_manager_->LastDecodedWasCngOrDtmf(dec_info->IsComfortNoise() ||
dec_info->IsDtmf());
if (delay_manager_->last_pack_cng_or_dtmf() == 0) {
// Calculate the total speech length carried in each packet.
if (number_of_primary_packets > 0) {
const size_t packet_length_samples =
number_of_primary_packets * decoder_frame_length_;
//设置包长
if (packet_length_samples != decision_logic_->packet_length_samples()) {
decision_logic_->set_packet_length_samples(packet_length_samples);
delay_manager_->SetPacketAudioLength(
rtc::dchecked_cast<int>((1000 * packet_length_samples) / fs_hz_));
}
}
// Update statistics.
if ((enable_rtx_handling_ || (int32_t)(main_timestamp - timestamp_) >= 0) &&
!new_codec_) {
// Only update statistics if incoming packet is not older than last played
// out packet or RTX handling is enabled, and if new codec flag is not
// set.
delay_manager_->Update(main_sequence_number, main_timestamp, fs_hz_);
}
int DelayManager::Update(uint16_t sequence_number,
uint32_t timestamp,
int sample_rate_hz) {
if (sample_rate_hz <= 0) {
return -1;
}
if (!first_packet_received_) {
// Prepare for next packet arrival.
packet_iat_stopwatch_ = tick_timer_->GetNewStopwatch();
last_seq_no_ = sequence_number;
last_timestamp_ = timestamp;
first_packet_received_ = true;
return 0;
}
// Try calculating packet length from current and previous timestamps.
int packet_len_ms;
//计算包长,采集时间差/采样率 = 每个样本采集间隔时长
if (!IsNewerTimestamp(timestamp, last_timestamp_) ||
!IsNewerSequenceNumber(sequence_number, last_seq_no_)) {
// Wrong timestamp or sequence order; use stored value.
packet_len_ms = packet_len_ms_;
} else {
// Calculate timestamps per packet and derive packet length in ms.
int64_t packet_len_samp =
static_cast<uint32_t>(timestamp - last_timestamp_) /
static_cast<uint16_t>(sequence_number - last_seq_no_);
packet_len_ms =
rtc::saturated_cast<int>(1000 * packet_len_samp / sample_rate_hz);
}
bool reordered = false;
if (packet_len_ms > 0) {
// Cannot update statistics unless |packet_len_ms| is valid.
//包seq不连续,矫正
// Inter-arrival time (IAT) in integer "packet times" (rounding down). This
// is the value added to the inter-arrival time histogram.
//抖动的计算 iat_ms/packet_len_ms,接收间隔/包时长
int iat_ms = packet_iat_stopwatch_->ElapsedMs();
int iat_packets = iat_ms / packet_len_ms;
// Check for discontinuous packet sequence and re-ordering.
if (IsNewerSequenceNumber(sequence_number, last_seq_no_ + 1)) {
// Compensate for gap in the sequence numbers. Reduce IAT with the
// expected extra time due to lost packets.
int packet_offset =
static_cast<uint16_t>(sequence_number - last_seq_no_ - 1);
iat_packets -= packet_offset;
iat_ms -= packet_offset * packet_len_ms;
} else if (!IsNewerSequenceNumber(sequence_number, last_seq_no_)) {
int packet_offset =
static_cast<uint16_t>(last_seq_no_ + 1 - sequence_number);
iat_packets += packet_offset;
iat_ms += packet_offset * packet_len_ms;
reordered = true;
}
int iat_delay = iat_ms - packet_len_ms;
int relative_delay;
if (reordered) {
relative_delay = std::max(iat_delay, 0);
} else {
//更新延时抖动到delay_history_(2000ms)
UpdateDelayHistory(iat_delay, timestamp, sample_rate_hz);
//计算历史抖动和,结果必须为正
relative_delay = CalculateRelativePacketArrivalDelay();
}
statistics_->RelativePacketArrivalDelay(relative_delay);
switch (histogram_mode_) {
case RELATIVE_ARRIVAL_DELAY: {
//计算索引,kBucketSizeMs为20,并插入buckets_对应位置。并调整buckets_中每个元素概率、
//int forget_factor = (1 << 15) * (1 - start_forget_weight_.value() / (add_count_ + 1));
//遗忘因子的收敛和初始值start_forget_weight_有关,和添加的抖动个数有关,添加的抖动数量越多,变化越平缓
//int quantile = 1041529569; // 0.97 in Q30.
// int forget_factor = 32745; // 0.9993 in Q15.
const int index = relative_delay / kBucketSizeMs;
if (index < histogram_->NumBuckets()) {
// Maximum delay to register is 2000 ms.
//添加抖动到对应概率直方图中
histogram_->Add(index);
}
break;
}
case INTER_ARRIVAL_TIME: {
// Saturate IAT between 0 and maximum value.
iat_packets =
std::max(std::min(iat_packets, histogram_->NumBuckets() - 1), 0);
histogram_->Add(iat_packets);
break;
}
}
// Calculate new |target_level_| based on updated statistics.
target_level_ = CalculateTargetLevel(iat_packets, reordered);
LimitTargetLevel();
} // End if (packet_len_ms > 0).
if (enable_rtx_handling_ && reordered &&
num_reordered_packets_ < kMaxReorderedPackets) {
++num_reordered_packets_;
return 0;
}
num_reordered_packets_ = 0;
// Prepare for next packet arrival.
packet_iat_stopwatch_ = tick_timer_->GetNewStopwatch();
last_seq_no_ = sequence_number;
last_timestamp_ = timestamp;
return 0;
}
RELATIVE_ARRIVAL_DELAY模式将历史所有抖动进行相关联,抖动变化会比较平缓,收敛比较慢
INTER_ARRIVAL_TIME模式直接添加当前抖动到直方图中,变化比较及时,抖动感知应该会更强
int DelayManager::CalculateTargetLevel(int iat_packets, bool reordered) {
int limit_probability = histogram_quantile_;
//统计buckets_中所以iat的和大于0.97
int bucket_index = histogram_->Quantile(limit_probability);
int target_level;
switch (histogram_mode_) {
case RELATIVE_ARRIVAL_DELAY: {
//这种模式下,返回的index是从0开始,直到概率和大于0.97,返会对应index的抖动大小
//既没有取平均值,也不是终值,取得是抖动比较大的,然后计算target_level
target_level = 1;
if (packet_len_ms_ > 0) {
target_level += bucket_index * kBucketSizeMs / packet_len_ms_;
}
base_target_level_ = target_level;
break;
}
case INTER_ARRIVAL_TIME: {
//
target_level = std::max(bucket_index, 1);
base_target_level_ = target_level;
// Update detector for delay peaks.
bool delay_peak_found =
peak_detector_.Update(iat_packets, reordered, target_level);
if (delay_peak_found) {
target_level = std::max(target_level, peak_detector_.MaxPeakHeight());
}
break;
}
}
// Sanity check. |target_level| must be strictly positive.
target_level = std::max(target_level, 1);
// Scale to Q8 and assign to member variable.
target_level_ = target_level << 8;
if (extra_delay_ms_ && packet_len_ms_ > 0) {
int extra_delay = (extra_delay_ms_.value() << 8) / packet_len_ms_;
target_level_ += extra_delay;
}
return target_level_;
}
bool DelayPeakDetector::Update(int inter_arrival_time,
bool reordered,
int target_level) {
if (ignore_reordered_packets_ && reordered) {
return CheckPeakConditions();
}
//当iat>B+ + peak_detection_threshold_ 或者iat>2B时判断检查到了一个抖动峰值
if (inter_arrival_time > target_level + peak_detection_threshold_ ||
inter_arrival_time > 2 * target_level) {
// A delay peak is observed.
if (!peak_period_stopwatch_) {
// This is the first peak. Reset the period counter.
peak_period_stopwatch_ = tick_timer_->GetNewStopwatch();
} else if (peak_period_stopwatch_->ElapsedMs() > 0) {
//kMaxPeakPeriodMs为10000,10s
//如果距离上一次检测到的峰值不到10s,判断为在一个抖动峰值期间内
if (peak_period_stopwatch_->ElapsedMs() <= kMaxPeakPeriodMs) {
// This is not the first peak, and the period is valid.
// Store peak data in the vector.
Peak peak_data;
peak_data.period_ms = peak_period_stopwatch_->ElapsedMs();
peak_data.peak_height_packets = inter_arrival_time;
peak_history_.push_back(peak_data);
while (peak_history_.size() > kMaxNumPeaks) {
// Delete the oldest data point.
peak_history_.pop_front();
}
peak_period_stopwatch_ = tick_timer_->GetNewStopwatch();
} else if (peak_period_stopwatch_->ElapsedMs() <= 2 * kMaxPeakPeriodMs) {
// Invalid peak due to too long period. Reset period counter and start
// looking for next peak.
// //如果距离上一次检测到的峰值在10s~20s内,则不再一个抖动峰值期间内,待后续抖动峰值是否连续
peak_period_stopwatch_ = tick_timer_->GetNewStopwatch();
} else {
//如果距离上一次检测到的峰值时间超过2*kMaxPeakPeriodMs,即超过20s,则认为网络已经变化,重置峰值检测
//清空peak_history_和峰值监测时间 peak_period_stopwatch_.reset();
// More than 2 times the maximum period has elapsed since the last peak
// was registered. It seams that the network conditions have changed.
// Reset the peak statistics.
Reset();
}
}
}
return CheckPeakConditions();
}
//是否出现峰值检测,peak_history_个数大于kMinPeaksToTrigger (2),上次的峰值监测时间小于历史峰值中出现的最大的峰值区间
bool DelayPeakDetector::CheckPeakConditions() {
size_t s = peak_history_.size();
if (s >= kMinPeaksToTrigger &&
peak_period_stopwatch_->ElapsedMs() <= 2 * MaxPeakPeriod()) {
peak_found_ = true;
} else {
peak_found_ = false;
}
return peak_found_;
}
}
case INTER_ARRIVAL_TIME: {
target_level = std::max(bucket_index, 1);
base_target_level_ = target_level;
// Update detector for delay peaks.
bool delay_peak_found =
peak_detector_.Update(iat_packets, reordered, target_level);
if (delay_peak_found) {
//出现峰值,则目标抖动为历史峰值的最大值,peak_history_的size为8,如果想抖动更新更及时,可以减小☞5,抖动峰值期间判断时间缩小
target_level = std::max(target_level, peak_detector_.MaxPeakHeight());
}
break;
}
总结:
1、抖动更新模式选择 INTER_ARRIVAL_TIME,更为及时
2、packet_buffer_的max_number_of_packets_最大值可以通过Dependencies配置更改,网络较好的情况下,可以适当减小,默认是200,一个包长20ms,就是4s的抖动。
3、历史抖动delay_history_的size减小,则抖动更新更为灵敏,但是可能出现剧烈抖动,size增大,抖动更新更为平缓,更为稳定。默认size大小为2000ms。
4、遗忘因子的计算:int forget_factor = (1 << 15) * (1 - start_forget_weight_.value() / (add_count_ + 1));初始值越大,收敛越缓慢,随着add_count_ 的增大,变化越来越小,add_count_ 可以设置一个固定最大值,动态更新iat概率。默认值为 int forget_factor = 32745; // 0.9993 in Q15。int quantile = 1041529569; // 0.97 in Q30.适当减小也可增加收敛速度。
5、峰值检测,峰值周期判断为kMaxPeakPeriodMs,默认10s,即10s内出现的抖动峰值认为在一个峰值区间内,kMaxPeakPeriodMs增大,意味着,网络变化比较缓慢,减小,意味着网络变化比较迅速。
kMaxNumPeaks为peak_history_的size,默认为8,即历史峰值区间个数,可适当减小,更为精准。同时可以偶现尖端峰值,peak_history_中可以进行峰值过滤。