1 /*
2 * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include "call/rtp_transport_controller_send.h"
11
12 #include <memory>
13 #include <utility>
14 #include <vector>
15
16 #include "absl/strings/match.h"
17 #include "absl/strings/string_view.h"
18 #include "absl/types/optional.h"
19 #include "api/task_queue/pending_task_safety_flag.h"
20 #include "api/transport/goog_cc_factory.h"
21 #include "api/transport/network_types.h"
22 #include "api/units/data_rate.h"
23 #include "api/units/time_delta.h"
24 #include "api/units/timestamp.h"
25 #include "call/rtp_video_sender.h"
26 #include "logging/rtc_event_log/events/rtc_event_remote_estimate.h"
27 #include "logging/rtc_event_log/events/rtc_event_route_change.h"
28 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
29 #include "rtc_base/checks.h"
30 #include "rtc_base/logging.h"
31 #include "rtc_base/rate_limiter.h"
32
33 namespace webrtc {
34 namespace {
35 static const int64_t kRetransmitWindowSizeMs = 500;
36 static const size_t kMaxOverheadBytes = 500;
37
38 constexpr TimeDelta kPacerQueueUpdateInterval = TimeDelta::Millis(25);
39
ConvertConstraints(int min_bitrate_bps,int max_bitrate_bps,int start_bitrate_bps,Clock * clock)40 TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
41 int max_bitrate_bps,
42 int start_bitrate_bps,
43 Clock* clock) {
44 TargetRateConstraints msg;
45 msg.at_time = Timestamp::Millis(clock->TimeInMilliseconds());
46 msg.min_data_rate = min_bitrate_bps >= 0
47 ? DataRate::BitsPerSec(min_bitrate_bps)
48 : DataRate::Zero();
49 msg.max_data_rate = max_bitrate_bps > 0
50 ? DataRate::BitsPerSec(max_bitrate_bps)
51 : DataRate::Infinity();
52 if (start_bitrate_bps > 0)
53 msg.starting_rate = DataRate::BitsPerSec(start_bitrate_bps);
54 return msg;
55 }
56
ConvertConstraints(const BitrateConstraints & contraints,Clock * clock)57 TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints,
58 Clock* clock) {
59 return ConvertConstraints(contraints.min_bitrate_bps,
60 contraints.max_bitrate_bps,
61 contraints.start_bitrate_bps, clock);
62 }
63
IsEnabled(const FieldTrialsView & trials,absl::string_view key)64 bool IsEnabled(const FieldTrialsView& trials, absl::string_view key) {
65 return absl::StartsWith(trials.Lookup(key), "Enabled");
66 }
67
IsRelayed(const rtc::NetworkRoute & route)68 bool IsRelayed(const rtc::NetworkRoute& route) {
69 return route.local.uses_turn() || route.remote.uses_turn();
70 }
71 } // namespace
72
PacerSettings(const FieldTrialsView & trials)73 RtpTransportControllerSend::PacerSettings::PacerSettings(
74 const FieldTrialsView& trials)
75 : holdback_window("holdback_window", TimeDelta::Millis(5)),
76 holdback_packets("holdback_packets", 3) {
77 ParseFieldTrial({&holdback_window, &holdback_packets},
78 trials.Lookup("WebRTC-TaskQueuePacer"));
79 }
80
RtpTransportControllerSend(Clock * clock,const RtpTransportConfig & config)81 RtpTransportControllerSend::RtpTransportControllerSend(
82 Clock* clock,
83 const RtpTransportConfig& config)
84 : clock_(clock),
85 event_log_(config.event_log),
86 task_queue_factory_(config.task_queue_factory),
87 bitrate_configurator_(config.bitrate_config),
88 pacer_started_(false),
89 pacer_settings_(*config.trials),
90 pacer_(clock,
91 &packet_router_,
92 *config.trials,
93 config.task_queue_factory,
94 pacer_settings_.holdback_window.Get(),
95 pacer_settings_.holdback_packets.Get(),
96 config.pacer_burst_interval),
97 observer_(nullptr),
98 controller_factory_override_(config.network_controller_factory),
99 controller_factory_fallback_(
100 std::make_unique<GoogCcNetworkControllerFactory>(
101 config.network_state_predictor_factory)),
102 process_interval_(controller_factory_fallback_->GetProcessInterval()),
103 last_report_block_time_(Timestamp::Millis(clock_->TimeInMilliseconds())),
104 reset_feedback_on_route_change_(
105 !IsEnabled(*config.trials, "WebRTC-Bwe-NoFeedbackReset")),
106 add_pacing_to_cwin_(
107 IsEnabled(*config.trials,
108 "WebRTC-AddPacingToCongestionWindowPushback")),
109 relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()),
110 transport_overhead_bytes_per_packet_(0),
111 network_available_(false),
112 congestion_window_size_(DataSize::PlusInfinity()),
113 is_congested_(false),
114 retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
115 task_queue_(*config.trials,
116 "rtp_send_controller",
117 config.task_queue_factory),
118 field_trials_(*config.trials) {
119 ParseFieldTrial({&relay_bandwidth_cap_},
120 config.trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints"));
121 initial_config_.constraints =
122 ConvertConstraints(config.bitrate_config, clock_);
123 initial_config_.event_log = config.event_log;
124 initial_config_.key_value_config = config.trials;
125 RTC_DCHECK(config.bitrate_config.start_bitrate_bps > 0);
126
127 pacer_.SetPacingRates(
128 DataRate::BitsPerSec(config.bitrate_config.start_bitrate_bps),
129 DataRate::Zero());
130 }
131
~RtpTransportControllerSend()132 RtpTransportControllerSend::~RtpTransportControllerSend() {
133 RTC_DCHECK_RUN_ON(&main_thread_);
134 RTC_DCHECK(video_rtp_senders_.empty());
135 if (task_queue_.IsCurrent()) {
136 // If these repeated tasks run on a task queue owned by
137 // `task_queue_`, they are stopped when the task queue is deleted.
138 // Otherwise, stop them here.
139 pacer_queue_update_task_.Stop();
140 controller_task_.Stop();
141 }
142 }
143
CreateRtpVideoSender(const std::map<uint32_t,RtpState> & suspended_ssrcs,const std::map<uint32_t,RtpPayloadState> & states,const RtpConfig & rtp_config,int rtcp_report_interval_ms,Transport * send_transport,const RtpSenderObservers & observers,RtcEventLog * event_log,std::unique_ptr<FecController> fec_controller,const RtpSenderFrameEncryptionConfig & frame_encryption_config,rtc::scoped_refptr<FrameTransformerInterface> frame_transformer)144 RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
145 const std::map<uint32_t, RtpState>& suspended_ssrcs,
146 const std::map<uint32_t, RtpPayloadState>& states,
147 const RtpConfig& rtp_config,
148 int rtcp_report_interval_ms,
149 Transport* send_transport,
150 const RtpSenderObservers& observers,
151 RtcEventLog* event_log,
152 std::unique_ptr<FecController> fec_controller,
153 const RtpSenderFrameEncryptionConfig& frame_encryption_config,
154 rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) {
155 RTC_DCHECK_RUN_ON(&main_thread_);
156 video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>(
157 clock_, suspended_ssrcs, states, rtp_config, rtcp_report_interval_ms,
158 send_transport, observers,
159 // TODO(holmer): Remove this circular dependency by injecting
160 // the parts of RtpTransportControllerSendInterface that are really used.
161 this, event_log, &retransmission_rate_limiter_, std::move(fec_controller),
162 frame_encryption_config.frame_encryptor,
163 frame_encryption_config.crypto_options, std::move(frame_transformer),
164 field_trials_, task_queue_factory_));
165 return video_rtp_senders_.back().get();
166 }
167
DestroyRtpVideoSender(RtpVideoSenderInterface * rtp_video_sender)168 void RtpTransportControllerSend::DestroyRtpVideoSender(
169 RtpVideoSenderInterface* rtp_video_sender) {
170 RTC_DCHECK_RUN_ON(&main_thread_);
171 std::vector<std::unique_ptr<RtpVideoSenderInterface>>::iterator it =
172 video_rtp_senders_.end();
173 for (it = video_rtp_senders_.begin(); it != video_rtp_senders_.end(); ++it) {
174 if (it->get() == rtp_video_sender) {
175 break;
176 }
177 }
178 RTC_DCHECK(it != video_rtp_senders_.end());
179 video_rtp_senders_.erase(it);
180 }
181
UpdateControlState()182 void RtpTransportControllerSend::UpdateControlState() {
183 absl::optional<TargetTransferRate> update = control_handler_->GetUpdate();
184 if (!update)
185 return;
186 retransmission_rate_limiter_.SetMaxRate(update->target_rate.bps());
187 // We won't create control_handler_ until we have an observers.
188 RTC_DCHECK(observer_ != nullptr);
189 observer_->OnTargetTransferRate(*update);
190 }
191
UpdateCongestedState()192 void RtpTransportControllerSend::UpdateCongestedState() {
193 bool congested = transport_feedback_adapter_.GetOutstandingData() >=
194 congestion_window_size_;
195 if (congested != is_congested_) {
196 is_congested_ = congested;
197 pacer_.SetCongested(congested);
198 }
199 }
200
GetWorkerQueue()201 MaybeWorkerThread* RtpTransportControllerSend::GetWorkerQueue() {
202 return &task_queue_;
203 }
204
packet_router()205 PacketRouter* RtpTransportControllerSend::packet_router() {
206 return &packet_router_;
207 }
208
209 NetworkStateEstimateObserver*
network_state_estimate_observer()210 RtpTransportControllerSend::network_state_estimate_observer() {
211 return this;
212 }
213
214 TransportFeedbackObserver*
transport_feedback_observer()215 RtpTransportControllerSend::transport_feedback_observer() {
216 return this;
217 }
218
packet_sender()219 RtpPacketSender* RtpTransportControllerSend::packet_sender() {
220 return &pacer_;
221 }
222
SetAllocatedSendBitrateLimits(BitrateAllocationLimits limits)223 void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
224 BitrateAllocationLimits limits) {
225 RTC_DCHECK_RUN_ON(&task_queue_);
226 streams_config_.min_total_allocated_bitrate = limits.min_allocatable_rate;
227 streams_config_.max_padding_rate = limits.max_padding_rate;
228 streams_config_.max_total_allocated_bitrate = limits.max_allocatable_rate;
229 UpdateStreamsConfig();
230 }
SetPacingFactor(float pacing_factor)231 void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
232 RTC_DCHECK_RUN_ON(&task_queue_);
233 streams_config_.pacing_factor = pacing_factor;
234 UpdateStreamsConfig();
235 }
SetQueueTimeLimit(int limit_ms)236 void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
237 pacer_.SetQueueTimeLimit(TimeDelta::Millis(limit_ms));
238 }
239 StreamFeedbackProvider*
GetStreamFeedbackProvider()240 RtpTransportControllerSend::GetStreamFeedbackProvider() {
241 return &feedback_demuxer_;
242 }
243
RegisterTargetTransferRateObserver(TargetTransferRateObserver * observer)244 void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
245 TargetTransferRateObserver* observer) {
246 task_queue_.RunOrPost([this, observer] {
247 RTC_DCHECK_RUN_ON(&task_queue_);
248 RTC_DCHECK(observer_ == nullptr);
249 observer_ = observer;
250 observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate);
251 MaybeCreateControllers();
252 });
253 }
254
IsRelevantRouteChange(const rtc::NetworkRoute & old_route,const rtc::NetworkRoute & new_route) const255 bool RtpTransportControllerSend::IsRelevantRouteChange(
256 const rtc::NetworkRoute& old_route,
257 const rtc::NetworkRoute& new_route) const {
258 // TODO(bugs.webrtc.org/11438): Experiment with using more information/
259 // other conditions.
260 bool connected_changed = old_route.connected != new_route.connected;
261 bool route_ids_changed =
262 old_route.local.network_id() != new_route.local.network_id() ||
263 old_route.remote.network_id() != new_route.remote.network_id();
264 if (relay_bandwidth_cap_->IsFinite()) {
265 bool relaying_changed = IsRelayed(old_route) != IsRelayed(new_route);
266 return connected_changed || route_ids_changed || relaying_changed;
267 } else {
268 return connected_changed || route_ids_changed;
269 }
270 }
271
OnNetworkRouteChanged(absl::string_view transport_name,const rtc::NetworkRoute & network_route)272 void RtpTransportControllerSend::OnNetworkRouteChanged(
273 absl::string_view transport_name,
274 const rtc::NetworkRoute& network_route) {
275 // Check if the network route is connected.
276
277 if (!network_route.connected) {
278 // TODO(honghaiz): Perhaps handle this in SignalChannelNetworkState and
279 // consider merging these two methods.
280 return;
281 }
282
283 absl::optional<BitrateConstraints> relay_constraint_update =
284 ApplyOrLiftRelayCap(IsRelayed(network_route));
285
286 // Check whether the network route has changed on each transport.
287 auto result = network_routes_.insert(
288 // Explicit conversion of transport_name to std::string here is necessary
289 // to support some platforms that cannot yet deal with implicit
290 // conversion in these types of situations.
291 std::make_pair(std::string(transport_name), network_route));
292 auto kv = result.first;
293 bool inserted = result.second;
294 if (inserted || !(kv->second == network_route)) {
295 RTC_LOG(LS_INFO) << "Network route changed on transport " << transport_name
296 << ": new_route = " << network_route.DebugString();
297 if (!inserted) {
298 RTC_LOG(LS_INFO) << "old_route = " << kv->second.DebugString();
299 }
300 }
301
302 if (inserted) {
303 if (relay_constraint_update.has_value()) {
304 UpdateBitrateConstraints(*relay_constraint_update);
305 }
306 task_queue_.RunOrPost([this, network_route] {
307 RTC_DCHECK_RUN_ON(&task_queue_);
308 transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
309 });
310 // No need to reset BWE if this is the first time the network connects.
311 return;
312 }
313
314 const rtc::NetworkRoute old_route = kv->second;
315 kv->second = network_route;
316
317 // Check if enough conditions of the new/old route has changed
318 // to trigger resetting of bitrates (and a probe).
319 if (IsRelevantRouteChange(old_route, network_route)) {
320 BitrateConstraints bitrate_config = bitrate_configurator_.GetConfig();
321 RTC_LOG(LS_INFO) << "Reset bitrates to min: "
322 << bitrate_config.min_bitrate_bps
323 << " bps, start: " << bitrate_config.start_bitrate_bps
324 << " bps, max: " << bitrate_config.max_bitrate_bps
325 << " bps.";
326 RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0);
327
328 if (event_log_) {
329 event_log_->Log(std::make_unique<RtcEventRouteChange>(
330 network_route.connected, network_route.packet_overhead));
331 }
332 NetworkRouteChange msg;
333 msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
334 msg.constraints = ConvertConstraints(bitrate_config, clock_);
335 task_queue_.RunOrPost([this, msg, network_route] {
336 RTC_DCHECK_RUN_ON(&task_queue_);
337 transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
338 if (reset_feedback_on_route_change_) {
339 transport_feedback_adapter_.SetNetworkRoute(network_route);
340 }
341 if (controller_) {
342 PostUpdates(controller_->OnNetworkRouteChange(msg));
343 } else {
344 UpdateInitialConstraints(msg.constraints);
345 }
346 is_congested_ = false;
347 pacer_.SetCongested(false);
348 });
349 }
350 }
OnNetworkAvailability(bool network_available)351 void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
352 RTC_DCHECK_RUN_ON(&main_thread_);
353 RTC_LOG(LS_VERBOSE) << "SignalNetworkState "
354 << (network_available ? "Up" : "Down");
355 NetworkAvailability msg;
356 msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
357 msg.network_available = network_available;
358 task_queue_.RunOrPost([this, msg]() {
359 RTC_DCHECK_RUN_ON(&task_queue_);
360 if (network_available_ == msg.network_available)
361 return;
362 network_available_ = msg.network_available;
363 if (network_available_) {
364 pacer_.Resume();
365 } else {
366 pacer_.Pause();
367 }
368 is_congested_ = false;
369 pacer_.SetCongested(false);
370
371 if (controller_) {
372 control_handler_->SetNetworkAvailability(network_available_);
373 PostUpdates(controller_->OnNetworkAvailability(msg));
374 UpdateControlState();
375 } else {
376 MaybeCreateControllers();
377 }
378 });
379
380 for (auto& rtp_sender : video_rtp_senders_) {
381 rtp_sender->OnNetworkAvailability(network_available);
382 }
383 }
GetBandwidthObserver()384 RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() {
385 return this;
386 }
GetPacerQueuingDelayMs() const387 int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const {
388 return pacer_.OldestPacketWaitTime().ms();
389 }
GetFirstPacketTime() const390 absl::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime()
391 const {
392 return pacer_.FirstSentPacketTime();
393 }
EnablePeriodicAlrProbing(bool enable)394 void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
395 task_queue_.RunOrPost([this, enable]() {
396 RTC_DCHECK_RUN_ON(&task_queue_);
397 streams_config_.requests_alr_probing = enable;
398 UpdateStreamsConfig();
399 });
400 }
OnSentPacket(const rtc::SentPacket & sent_packet)401 void RtpTransportControllerSend::OnSentPacket(
402 const rtc::SentPacket& sent_packet) {
403 // Normally called on the network thread !
404
405 // We can not use SafeTask here if we are using an owned task queue, because
406 // the safety flag will be destroyed when RtpTransportControllerSend is
407 // destroyed on the worker thread. But we must use SafeTask if we are using
408 // the worker thread, since the worker thread outlive
409 // RtpTransportControllerSend.
410 task_queue_.TaskQueueForPost()->PostTask(
411 task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() {
412 RTC_DCHECK_RUN_ON(&task_queue_);
413 absl::optional<SentPacket> packet_msg =
414 transport_feedback_adapter_.ProcessSentPacket(sent_packet);
415 if (packet_msg) {
416 // Only update outstanding data if:
417 // 1. Packet feedback is used.
418 // 2. The packet has not yet received an acknowledgement.
419 // 3. It is not a retransmission of an earlier packet.
420 UpdateCongestedState();
421 if (controller_)
422 PostUpdates(controller_->OnSentPacket(*packet_msg));
423 }
424 }));
425 }
426
OnReceivedPacket(const ReceivedPacket & packet_msg)427 void RtpTransportControllerSend::OnReceivedPacket(
428 const ReceivedPacket& packet_msg) {
429 task_queue_.RunOrPost([this, packet_msg]() {
430 RTC_DCHECK_RUN_ON(&task_queue_);
431 if (controller_)
432 PostUpdates(controller_->OnReceivedPacket(packet_msg));
433 });
434 }
435
UpdateBitrateConstraints(const BitrateConstraints & updated)436 void RtpTransportControllerSend::UpdateBitrateConstraints(
437 const BitrateConstraints& updated) {
438 TargetRateConstraints msg = ConvertConstraints(updated, clock_);
439 task_queue_.RunOrPost([this, msg]() {
440 RTC_DCHECK_RUN_ON(&task_queue_);
441 if (controller_) {
442 PostUpdates(controller_->OnTargetRateConstraints(msg));
443 } else {
444 UpdateInitialConstraints(msg);
445 }
446 });
447 }
448
SetSdpBitrateParameters(const BitrateConstraints & constraints)449 void RtpTransportControllerSend::SetSdpBitrateParameters(
450 const BitrateConstraints& constraints) {
451 absl::optional<BitrateConstraints> updated =
452 bitrate_configurator_.UpdateWithSdpParameters(constraints);
453 if (updated.has_value()) {
454 UpdateBitrateConstraints(*updated);
455 } else {
456 RTC_LOG(LS_VERBOSE)
457 << "WebRTC.RtpTransportControllerSend.SetSdpBitrateParameters: "
458 "nothing to update";
459 }
460 }
461
SetClientBitratePreferences(const BitrateSettings & preferences)462 void RtpTransportControllerSend::SetClientBitratePreferences(
463 const BitrateSettings& preferences) {
464 absl::optional<BitrateConstraints> updated =
465 bitrate_configurator_.UpdateWithClientPreferences(preferences);
466 if (updated.has_value()) {
467 UpdateBitrateConstraints(*updated);
468 } else {
469 RTC_LOG(LS_VERBOSE)
470 << "WebRTC.RtpTransportControllerSend.SetClientBitratePreferences: "
471 "nothing to update";
472 }
473 }
474
475 absl::optional<BitrateConstraints>
ApplyOrLiftRelayCap(bool is_relayed)476 RtpTransportControllerSend::ApplyOrLiftRelayCap(bool is_relayed) {
477 DataRate cap = is_relayed ? relay_bandwidth_cap_ : DataRate::PlusInfinity();
478 return bitrate_configurator_.UpdateWithRelayCap(cap);
479 }
480
OnTransportOverheadChanged(size_t transport_overhead_bytes_per_packet)481 void RtpTransportControllerSend::OnTransportOverheadChanged(
482 size_t transport_overhead_bytes_per_packet) {
483 RTC_DCHECK_RUN_ON(&main_thread_);
484 if (transport_overhead_bytes_per_packet >= kMaxOverheadBytes) {
485 RTC_LOG(LS_ERROR) << "Transport overhead exceeds " << kMaxOverheadBytes;
486 return;
487 }
488
489 pacer_.SetTransportOverhead(
490 DataSize::Bytes(transport_overhead_bytes_per_packet));
491
492 // TODO(holmer): Call AudioRtpSenders when they have been moved to
493 // RtpTransportControllerSend.
494 for (auto& rtp_video_sender : video_rtp_senders_) {
495 rtp_video_sender->OnTransportOverheadChanged(
496 transport_overhead_bytes_per_packet);
497 }
498 }
499
AccountForAudioPacketsInPacedSender(bool account_for_audio)500 void RtpTransportControllerSend::AccountForAudioPacketsInPacedSender(
501 bool account_for_audio) {
502 pacer_.SetAccountForAudioPackets(account_for_audio);
503 }
504
IncludeOverheadInPacedSender()505 void RtpTransportControllerSend::IncludeOverheadInPacedSender() {
506 pacer_.SetIncludeOverhead();
507 }
508
EnsureStarted()509 void RtpTransportControllerSend::EnsureStarted() {
510 if (!pacer_started_) {
511 pacer_started_ = true;
512 pacer_.EnsureStarted();
513 }
514 }
515
OnReceivedEstimatedBitrate(uint32_t bitrate)516 void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
517 RemoteBitrateReport msg;
518 msg.receive_time = Timestamp::Millis(clock_->TimeInMilliseconds());
519 msg.bandwidth = DataRate::BitsPerSec(bitrate);
520 task_queue_.RunOrPost([this, msg]() {
521 RTC_DCHECK_RUN_ON(&task_queue_);
522 if (controller_)
523 PostUpdates(controller_->OnRemoteBitrateReport(msg));
524 });
525 }
526
OnReceivedRtcpReceiverReport(const ReportBlockList & report_blocks,int64_t rtt_ms,int64_t now_ms)527 void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
528 const ReportBlockList& report_blocks,
529 int64_t rtt_ms,
530 int64_t now_ms) {
531 task_queue_.RunOrPost([this, report_blocks, now_ms, rtt_ms]() {
532 RTC_DCHECK_RUN_ON(&task_queue_);
533 OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
534 RoundTripTimeUpdate report;
535 report.receive_time = Timestamp::Millis(now_ms);
536 report.round_trip_time = TimeDelta::Millis(rtt_ms);
537 report.smoothed = false;
538 if (controller_ && !report.round_trip_time.IsZero())
539 PostUpdates(controller_->OnRoundTripTimeUpdate(report));
540 });
541 }
542
OnAddPacket(const RtpPacketSendInfo & packet_info)543 void RtpTransportControllerSend::OnAddPacket(
544 const RtpPacketSendInfo& packet_info) {
545 Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds());
546
547 task_queue_.RunOrPost([this, packet_info, creation_time]() {
548 RTC_DCHECK_RUN_ON(&task_queue_);
549 feedback_demuxer_.AddPacket(packet_info);
550 transport_feedback_adapter_.AddPacket(
551 packet_info, transport_overhead_bytes_per_packet_, creation_time);
552 });
553 }
554
OnTransportFeedback(const rtcp::TransportFeedback & feedback)555 void RtpTransportControllerSend::OnTransportFeedback(
556 const rtcp::TransportFeedback& feedback) {
557 auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
558 task_queue_.RunOrPost([this, feedback, feedback_time]() {
559 RTC_DCHECK_RUN_ON(&task_queue_);
560 feedback_demuxer_.OnTransportFeedback(feedback);
561 absl::optional<TransportPacketsFeedback> feedback_msg =
562 transport_feedback_adapter_.ProcessTransportFeedback(feedback,
563 feedback_time);
564 if (feedback_msg) {
565 if (controller_)
566 PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
567
568 // Only update outstanding data if any packet is first time acked.
569 UpdateCongestedState();
570 }
571 });
572 }
573
OnRemoteNetworkEstimate(NetworkStateEstimate estimate)574 void RtpTransportControllerSend::OnRemoteNetworkEstimate(
575 NetworkStateEstimate estimate) {
576 if (event_log_) {
577 event_log_->Log(std::make_unique<RtcEventRemoteEstimate>(
578 estimate.link_capacity_lower, estimate.link_capacity_upper));
579 }
580 estimate.update_time = Timestamp::Millis(clock_->TimeInMilliseconds());
581 task_queue_.RunOrPost([this, estimate] {
582 RTC_DCHECK_RUN_ON(&task_queue_);
583 if (controller_)
584 PostUpdates(controller_->OnNetworkStateEstimate(estimate));
585 });
586 }
587
MaybeCreateControllers()588 void RtpTransportControllerSend::MaybeCreateControllers() {
589 RTC_DCHECK(!controller_);
590 RTC_DCHECK(!control_handler_);
591
592 if (!network_available_ || !observer_)
593 return;
594 control_handler_ = std::make_unique<CongestionControlHandler>();
595
596 initial_config_.constraints.at_time =
597 Timestamp::Millis(clock_->TimeInMilliseconds());
598 initial_config_.stream_based_config = streams_config_;
599
600 // TODO(srte): Use fallback controller if no feedback is available.
601 if (controller_factory_override_) {
602 RTC_LOG(LS_INFO) << "Creating overridden congestion controller";
603 controller_ = controller_factory_override_->Create(initial_config_);
604 process_interval_ = controller_factory_override_->GetProcessInterval();
605 } else {
606 RTC_LOG(LS_INFO) << "Creating fallback congestion controller";
607 controller_ = controller_factory_fallback_->Create(initial_config_);
608 process_interval_ = controller_factory_fallback_->GetProcessInterval();
609 }
610 UpdateControllerWithTimeInterval();
611 StartProcessPeriodicTasks();
612 }
613
UpdateInitialConstraints(TargetRateConstraints new_contraints)614 void RtpTransportControllerSend::UpdateInitialConstraints(
615 TargetRateConstraints new_contraints) {
616 if (!new_contraints.starting_rate)
617 new_contraints.starting_rate = initial_config_.constraints.starting_rate;
618 RTC_DCHECK(new_contraints.starting_rate);
619 initial_config_.constraints = new_contraints;
620 }
621
StartProcessPeriodicTasks()622 void RtpTransportControllerSend::StartProcessPeriodicTasks() {
623 RTC_DCHECK_RUN_ON(&task_queue_);
624 if (!pacer_queue_update_task_.Running()) {
625 pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
626 task_queue_.TaskQueueForDelayedTasks(), kPacerQueueUpdateInterval,
627 [this]() {
628 RTC_DCHECK_RUN_ON(&task_queue_);
629 TimeDelta expected_queue_time = pacer_.ExpectedQueueTime();
630 control_handler_->SetPacerQueue(expected_queue_time);
631 UpdateControlState();
632 return kPacerQueueUpdateInterval;
633 });
634 }
635 controller_task_.Stop();
636 if (process_interval_.IsFinite()) {
637 controller_task_ = RepeatingTaskHandle::DelayedStart(
638 task_queue_.TaskQueueForDelayedTasks(), process_interval_, [this]() {
639 RTC_DCHECK_RUN_ON(&task_queue_);
640 UpdateControllerWithTimeInterval();
641 return process_interval_;
642 });
643 }
644 }
645
UpdateControllerWithTimeInterval()646 void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
647 RTC_DCHECK(controller_);
648 ProcessInterval msg;
649 msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
650 if (add_pacing_to_cwin_)
651 msg.pacer_queue = pacer_.QueueSizeData();
652 PostUpdates(controller_->OnProcessInterval(msg));
653 }
654
UpdateStreamsConfig()655 void RtpTransportControllerSend::UpdateStreamsConfig() {
656 streams_config_.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
657 if (controller_)
658 PostUpdates(controller_->OnStreamsConfig(streams_config_));
659 }
660
PostUpdates(NetworkControlUpdate update)661 void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
662 if (update.congestion_window) {
663 congestion_window_size_ = *update.congestion_window;
664 UpdateCongestedState();
665 }
666 if (update.pacer_config) {
667 pacer_.SetPacingRates(update.pacer_config->data_rate(),
668 update.pacer_config->pad_rate());
669 }
670 if (!update.probe_cluster_configs.empty()) {
671 pacer_.CreateProbeClusters(std::move(update.probe_cluster_configs));
672 }
673 if (update.target_rate) {
674 control_handler_->SetTargetRate(*update.target_rate);
675 UpdateControlState();
676 }
677 }
678
OnReceivedRtcpReceiverReportBlocks(const ReportBlockList & report_blocks,int64_t now_ms)679 void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
680 const ReportBlockList& report_blocks,
681 int64_t now_ms) {
682 if (report_blocks.empty())
683 return;
684
685 int total_packets_lost_delta = 0;
686 int total_packets_delta = 0;
687
688 // Compute the packet loss from all report blocks.
689 for (const RTCPReportBlock& report_block : report_blocks) {
690 auto it = last_report_blocks_.find(report_block.source_ssrc);
691 if (it != last_report_blocks_.end()) {
692 auto number_of_packets = report_block.extended_highest_sequence_number -
693 it->second.extended_highest_sequence_number;
694 total_packets_delta += number_of_packets;
695 auto lost_delta = report_block.packets_lost - it->second.packets_lost;
696 total_packets_lost_delta += lost_delta;
697 }
698 last_report_blocks_[report_block.source_ssrc] = report_block;
699 }
700 // Can only compute delta if there has been previous blocks to compare to. If
701 // not, total_packets_delta will be unchanged and there's nothing more to do.
702 if (!total_packets_delta)
703 return;
704 int packets_received_delta = total_packets_delta - total_packets_lost_delta;
705 // To detect lost packets, at least one packet has to be received. This check
706 // is needed to avoid bandwith detection update in
707 // VideoSendStreamTest.SuspendBelowMinBitrate
708
709 if (packets_received_delta < 1)
710 return;
711 Timestamp now = Timestamp::Millis(now_ms);
712 TransportLossReport msg;
713 msg.packets_lost_delta = total_packets_lost_delta;
714 msg.packets_received_delta = packets_received_delta;
715 msg.receive_time = now;
716 msg.start_time = last_report_block_time_;
717 msg.end_time = now;
718 if (controller_)
719 PostUpdates(controller_->OnTransportLossReport(msg));
720 last_report_block_time_ = now;
721 }
722
723 } // namespace webrtc
724