简介
PACER网络报文平滑策略是webrtc Qos策略之一,是针对数据发送端的。如果是单纯的音频数据通信,由于一帧音频数据的长度固定并且音频码率较平稳,不会出现忽高忽低的现象,可以不考虑使用pacer。但是对于视频数据来说,一帧视频的数据量可能很大,已经大于网络的MTU,尤其是I帧(关键帧)数据量更是通常远大于MTU,所以需要封装到多个RTP报文中,如果这些视频RTP报文同时一起发生到网络上可能会引起网络衰减和通信恶化。WebRTC引入Pacer,pacer会根据estimator中计算出来的码率调整发包频率,目的就是为了能让视频数据安装评估的码率均匀的分布在各个时间片里进行发送。
源码分析
RtpTransportControllerSend是RTP发送端传输控制对象。
RtpTransportControllerSend::RtpTransportControllerSend(
Clock* clock,
webrtc::RtcEventLog* event_log,
NetworkStatePredictorFactoryInterface* predictor_factory,
NetworkControllerFactoryInterface* controller_factory,
const BitrateConstraints& bitrate_config,
std::unique_ptr<ProcessThread> process_thread,
TaskQueueFactory* task_queue_factory,
const WebRtcKeyValueConfig* trials)
: clock_(clock),
event_log_(event_log),
bitrate_configurator_(bitrate_config),
pacer_started_(false),
process_thread_(std::move(process_thread)),
use_task_queue_pacer_(IsEnabled(trials, "WebRTC-TaskQueuePacer")),
process_thread_pacer_(use_task_queue_pacer_
? nullptr
: new PacedSender(clock,
&packet_router_,
event_log,
trials,
process_thread_.get())),
task_queue_pacer_(
use_task_queue_pacer_
? new TaskQueuePacedSender(
clock,
&packet_router_,
event_log,
trials,
task_queue_factory,
/*hold_back_window = */ PacingController::kMinSleepTime)
: nullptr),
observer_(nullptr),
controller_factory_override_(controller_factory),
controller_factory_fallback_(
std::make_unique<GoogCcNetworkControllerFactory>(predictor_factory)),
process_interval_(controller_factory_fallback_->GetProcessInterval()),
last_report_block_time_(Timestamp::Millis(clock_->TimeInMilliseconds())),
reset_feedback_on_route_change_(
!IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")),
send_side_bwe_with_overhead_(
!IsDisabled(trials, "WebRTC-SendSideBwe-WithOverhead")),
add_pacing_to_cwin_(
IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")),
relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()),
transport_overhead_bytes_per_packet_(0),
network_available_(false),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
task_queue_(task_queue_factory->CreateTaskQueue(
"rtp_send_controller",
TaskQueueFactory::Priority::NORMAL)) {
ParseFieldTrial({&relay_bandwidth_cap_},
trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints"));
initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);
initial_config_.event_log = event_log;
initial_config_.key_value_config = trials;
RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
pacer()->SetPacingRates(
DataRate::BitsPerSec(bitrate_config.start_bitrate_bps), DataRate::Zero());
if (absl::StartsWith(trials->Lookup("WebRTC-LazyPacerStart"), "Disabled")) {
EnsureStarted();
}
}
use_task_queue_pacer_是通过配置信息WebRTC-TaskQueuePacer进行控制的,其实就是process_thread_pacer_或task_queue_pacer_其中之一。而不管是是PacedSender还是TaskQueuePacedSender,其本质都是PacingController pacing_controller_。
PacingController
此类实现了一个leaky-bucket数据包调整算法。它处理确定何时发送哪些数据包的逻辑,但处理的实际时间是在外部完成的(例如PacedSender)。此外,数据包准备好发送时的转发也通过PacedSendingController::PacketSender接口在外部处理。它有两种处理模式:
- kPeriodic 周期模式使用IntervalBudget类跟踪比特率预算,并且期望ProcessPackets()被称为固定速率,例如,由PacedSender实现的每5毫秒一次。
- kDynamic 动态模式允许调用ProcessPacket之间的任意时间差。
PacingController::EnqueuePacket 添加数据包
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
<< "SetPacingRate must be called before InsertPacket.";
RTC_CHECK(packet->packet_type());
// Get priority first and store in temporary, to avoid chance of object being
// moved before GetPriorityForType() being called.
const int priority = GetPriorityForType(*packet->packet_type());
EnqueuePacketInternal(std::move(packet), priority);
}
将数据包添加到队列中,并在到达发送时间时调用PacketRouter::SendPacket()。首先调用GetPriorityForType()获取优先级,然后调用EnqueuePacketInternal将packet添加到RoundRobinPacketQueue packet_queue_队列中。
GetPriorityForType
根据packet类型获取优先级
int GetPriorityForType(RtpPacketMediaType type) {
// Lower number takes priority over higher.
switch (type) {
case RtpPacketMediaType::kAudio:
// Audio is always prioritized over other packet types.
return kFirstPriority + 1;
case RtpPacketMediaType::kRetransmission:
// Send retransmissions before new media.
return kFirstPriority + 2;
case RtpPacketMediaType::kVideo:
case RtpPacketMediaType::kForwardErrorCorrection:
// Video has "normal" priority, in the old speak.
// Send redundancy concurrently to video. If it is delayed it might have a
// lower chance of being useful.
return kFirstPriority + 3;
case RtpPacketMediaType::kPadding:
// Packets that are in themselves likely useless, only sent to keep the
// BWE high.
return kFirstPriority + 4;
}
RTC_CHECK_NOTREACHED();
}
音频总是优先于其他数据包类型。
在新的媒体数据之前发送重传数据包。
视频具有普通的优先级。视频中同时发生冗余数据。如果它发生延迟,它可能更小的概率是有用的。
最低优先级数据包本身可能是无用的,发送它们只是为了保持BWE高。
PacingController::EnqueuePacketInternal
将数据包根据优先级加入到packet_queue_中。
void PacingController::EnqueuePacketInternal(
std::unique_ptr<RtpPacketToSend> packet,
int priority) {
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
Timestamp now = CurrentTime();
if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() &&
NextSendTime() <= now) {
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
UpdateBudgetWithElapsedTime(elapsed_time);
}
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
PacingController::ProcessPackets() 处理数据包
void PacingController::ProcessPackets() {
......
bool first_packet_in_probe = false;
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target.
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
}
}
......
while (!paused_) {
......
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
......
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
// Send done, update send/process time to the target send time.
OnPacketSent(packet_type, packet_size, target_send_time);
// If we are currently probing, we need to stop the send loop when we have
// reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
......
}
}
从比特率探测器prober_中获取探测到的推荐发送值。然后从packet_queue_中POP出最高优先级的数据进行发送,直到已经发送的数据量大于等于推荐值时跳出循环,等待下一轮发送。
如果pacer queue没有更多待发送的报文,但budget却还可以发送更多的数据,这个时候pacer会进行padding报文补充。