• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "call/rtp_transport_controller_send.h"
11 
12 #include <memory>
13 #include <utility>
14 #include <vector>
15 
16 #include "absl/strings/match.h"
17 #include "absl/strings/string_view.h"
18 #include "absl/types/optional.h"
19 #include "api/task_queue/pending_task_safety_flag.h"
20 #include "api/transport/goog_cc_factory.h"
21 #include "api/transport/network_types.h"
22 #include "api/units/data_rate.h"
23 #include "api/units/time_delta.h"
24 #include "api/units/timestamp.h"
25 #include "call/rtp_video_sender.h"
26 #include "logging/rtc_event_log/events/rtc_event_remote_estimate.h"
27 #include "logging/rtc_event_log/events/rtc_event_route_change.h"
28 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
29 #include "rtc_base/checks.h"
30 #include "rtc_base/logging.h"
31 #include "rtc_base/rate_limiter.h"
32 
33 namespace webrtc {
34 namespace {
35 static const int64_t kRetransmitWindowSizeMs = 500;
36 static const size_t kMaxOverheadBytes = 500;
37 
38 constexpr TimeDelta kPacerQueueUpdateInterval = TimeDelta::Millis(25);
39 
ConvertConstraints(int min_bitrate_bps,int max_bitrate_bps,int start_bitrate_bps,Clock * clock)40 TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
41                                          int max_bitrate_bps,
42                                          int start_bitrate_bps,
43                                          Clock* clock) {
44   TargetRateConstraints msg;
45   msg.at_time = Timestamp::Millis(clock->TimeInMilliseconds());
46   msg.min_data_rate = min_bitrate_bps >= 0
47                           ? DataRate::BitsPerSec(min_bitrate_bps)
48                           : DataRate::Zero();
49   msg.max_data_rate = max_bitrate_bps > 0
50                           ? DataRate::BitsPerSec(max_bitrate_bps)
51                           : DataRate::Infinity();
52   if (start_bitrate_bps > 0)
53     msg.starting_rate = DataRate::BitsPerSec(start_bitrate_bps);
54   return msg;
55 }
56 
ConvertConstraints(const BitrateConstraints & contraints,Clock * clock)57 TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints,
58                                          Clock* clock) {
59   return ConvertConstraints(contraints.min_bitrate_bps,
60                             contraints.max_bitrate_bps,
61                             contraints.start_bitrate_bps, clock);
62 }
63 
IsEnabled(const FieldTrialsView & trials,absl::string_view key)64 bool IsEnabled(const FieldTrialsView& trials, absl::string_view key) {
65   return absl::StartsWith(trials.Lookup(key), "Enabled");
66 }
67 
IsRelayed(const rtc::NetworkRoute & route)68 bool IsRelayed(const rtc::NetworkRoute& route) {
69   return route.local.uses_turn() || route.remote.uses_turn();
70 }
71 }  // namespace
72 
PacerSettings(const FieldTrialsView & trials)73 RtpTransportControllerSend::PacerSettings::PacerSettings(
74     const FieldTrialsView& trials)
75     : holdback_window("holdback_window", TimeDelta::Millis(5)),
76       holdback_packets("holdback_packets", 3) {
77   ParseFieldTrial({&holdback_window, &holdback_packets},
78                   trials.Lookup("WebRTC-TaskQueuePacer"));
79 }
80 
RtpTransportControllerSend(Clock * clock,const RtpTransportConfig & config)81 RtpTransportControllerSend::RtpTransportControllerSend(
82     Clock* clock,
83     const RtpTransportConfig& config)
84     : clock_(clock),
85       event_log_(config.event_log),
86       task_queue_factory_(config.task_queue_factory),
87       bitrate_configurator_(config.bitrate_config),
88       pacer_started_(false),
89       pacer_settings_(*config.trials),
90       pacer_(clock,
91              &packet_router_,
92              *config.trials,
93              config.task_queue_factory,
94              pacer_settings_.holdback_window.Get(),
95              pacer_settings_.holdback_packets.Get(),
96              config.pacer_burst_interval),
97       observer_(nullptr),
98       controller_factory_override_(config.network_controller_factory),
99       controller_factory_fallback_(
100           std::make_unique<GoogCcNetworkControllerFactory>(
101               config.network_state_predictor_factory)),
102       process_interval_(controller_factory_fallback_->GetProcessInterval()),
103       last_report_block_time_(Timestamp::Millis(clock_->TimeInMilliseconds())),
104       reset_feedback_on_route_change_(
105           !IsEnabled(*config.trials, "WebRTC-Bwe-NoFeedbackReset")),
106       add_pacing_to_cwin_(
107           IsEnabled(*config.trials,
108                     "WebRTC-AddPacingToCongestionWindowPushback")),
109       relay_bandwidth_cap_("relay_cap", DataRate::PlusInfinity()),
110       transport_overhead_bytes_per_packet_(0),
111       network_available_(false),
112       congestion_window_size_(DataSize::PlusInfinity()),
113       is_congested_(false),
114       retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
115       task_queue_(*config.trials,
116                   "rtp_send_controller",
117                   config.task_queue_factory),
118       field_trials_(*config.trials) {
119   ParseFieldTrial({&relay_bandwidth_cap_},
120                   config.trials->Lookup("WebRTC-Bwe-NetworkRouteConstraints"));
121   initial_config_.constraints =
122       ConvertConstraints(config.bitrate_config, clock_);
123   initial_config_.event_log = config.event_log;
124   initial_config_.key_value_config = config.trials;
125   RTC_DCHECK(config.bitrate_config.start_bitrate_bps > 0);
126 
127   pacer_.SetPacingRates(
128       DataRate::BitsPerSec(config.bitrate_config.start_bitrate_bps),
129       DataRate::Zero());
130 }
131 
~RtpTransportControllerSend()132 RtpTransportControllerSend::~RtpTransportControllerSend() {
133   RTC_DCHECK_RUN_ON(&main_thread_);
134   RTC_DCHECK(video_rtp_senders_.empty());
135   if (task_queue_.IsCurrent()) {
136     // If these repeated tasks run on a task queue owned by
137     // `task_queue_`, they are stopped when the task queue is deleted.
138     // Otherwise, stop them here.
139     pacer_queue_update_task_.Stop();
140     controller_task_.Stop();
141   }
142 }
143 
CreateRtpVideoSender(const std::map<uint32_t,RtpState> & suspended_ssrcs,const std::map<uint32_t,RtpPayloadState> & states,const RtpConfig & rtp_config,int rtcp_report_interval_ms,Transport * send_transport,const RtpSenderObservers & observers,RtcEventLog * event_log,std::unique_ptr<FecController> fec_controller,const RtpSenderFrameEncryptionConfig & frame_encryption_config,rtc::scoped_refptr<FrameTransformerInterface> frame_transformer)144 RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
145     const std::map<uint32_t, RtpState>& suspended_ssrcs,
146     const std::map<uint32_t, RtpPayloadState>& states,
147     const RtpConfig& rtp_config,
148     int rtcp_report_interval_ms,
149     Transport* send_transport,
150     const RtpSenderObservers& observers,
151     RtcEventLog* event_log,
152     std::unique_ptr<FecController> fec_controller,
153     const RtpSenderFrameEncryptionConfig& frame_encryption_config,
154     rtc::scoped_refptr<FrameTransformerInterface> frame_transformer) {
155   RTC_DCHECK_RUN_ON(&main_thread_);
156   video_rtp_senders_.push_back(std::make_unique<RtpVideoSender>(
157       clock_, suspended_ssrcs, states, rtp_config, rtcp_report_interval_ms,
158       send_transport, observers,
159       // TODO(holmer): Remove this circular dependency by injecting
160       // the parts of RtpTransportControllerSendInterface that are really used.
161       this, event_log, &retransmission_rate_limiter_, std::move(fec_controller),
162       frame_encryption_config.frame_encryptor,
163       frame_encryption_config.crypto_options, std::move(frame_transformer),
164       field_trials_, task_queue_factory_));
165   return video_rtp_senders_.back().get();
166 }
167 
DestroyRtpVideoSender(RtpVideoSenderInterface * rtp_video_sender)168 void RtpTransportControllerSend::DestroyRtpVideoSender(
169     RtpVideoSenderInterface* rtp_video_sender) {
170   RTC_DCHECK_RUN_ON(&main_thread_);
171   std::vector<std::unique_ptr<RtpVideoSenderInterface>>::iterator it =
172       video_rtp_senders_.end();
173   for (it = video_rtp_senders_.begin(); it != video_rtp_senders_.end(); ++it) {
174     if (it->get() == rtp_video_sender) {
175       break;
176     }
177   }
178   RTC_DCHECK(it != video_rtp_senders_.end());
179   video_rtp_senders_.erase(it);
180 }
181 
UpdateControlState()182 void RtpTransportControllerSend::UpdateControlState() {
183   absl::optional<TargetTransferRate> update = control_handler_->GetUpdate();
184   if (!update)
185     return;
186   retransmission_rate_limiter_.SetMaxRate(update->target_rate.bps());
187   // We won't create control_handler_ until we have an observers.
188   RTC_DCHECK(observer_ != nullptr);
189   observer_->OnTargetTransferRate(*update);
190 }
191 
UpdateCongestedState()192 void RtpTransportControllerSend::UpdateCongestedState() {
193   bool congested = transport_feedback_adapter_.GetOutstandingData() >=
194                    congestion_window_size_;
195   if (congested != is_congested_) {
196     is_congested_ = congested;
197     pacer_.SetCongested(congested);
198   }
199 }
200 
GetWorkerQueue()201 MaybeWorkerThread* RtpTransportControllerSend::GetWorkerQueue() {
202   return &task_queue_;
203 }
204 
packet_router()205 PacketRouter* RtpTransportControllerSend::packet_router() {
206   return &packet_router_;
207 }
208 
209 NetworkStateEstimateObserver*
network_state_estimate_observer()210 RtpTransportControllerSend::network_state_estimate_observer() {
211   return this;
212 }
213 
214 TransportFeedbackObserver*
transport_feedback_observer()215 RtpTransportControllerSend::transport_feedback_observer() {
216   return this;
217 }
218 
packet_sender()219 RtpPacketSender* RtpTransportControllerSend::packet_sender() {
220   return &pacer_;
221 }
222 
SetAllocatedSendBitrateLimits(BitrateAllocationLimits limits)223 void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
224     BitrateAllocationLimits limits) {
225   RTC_DCHECK_RUN_ON(&task_queue_);
226   streams_config_.min_total_allocated_bitrate = limits.min_allocatable_rate;
227   streams_config_.max_padding_rate = limits.max_padding_rate;
228   streams_config_.max_total_allocated_bitrate = limits.max_allocatable_rate;
229   UpdateStreamsConfig();
230 }
SetPacingFactor(float pacing_factor)231 void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
232   RTC_DCHECK_RUN_ON(&task_queue_);
233   streams_config_.pacing_factor = pacing_factor;
234   UpdateStreamsConfig();
235 }
SetQueueTimeLimit(int limit_ms)236 void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
237   pacer_.SetQueueTimeLimit(TimeDelta::Millis(limit_ms));
238 }
239 StreamFeedbackProvider*
GetStreamFeedbackProvider()240 RtpTransportControllerSend::GetStreamFeedbackProvider() {
241   return &feedback_demuxer_;
242 }
243 
RegisterTargetTransferRateObserver(TargetTransferRateObserver * observer)244 void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
245     TargetTransferRateObserver* observer) {
246   task_queue_.RunOrPost([this, observer] {
247     RTC_DCHECK_RUN_ON(&task_queue_);
248     RTC_DCHECK(observer_ == nullptr);
249     observer_ = observer;
250     observer_->OnStartRateUpdate(*initial_config_.constraints.starting_rate);
251     MaybeCreateControllers();
252   });
253 }
254 
IsRelevantRouteChange(const rtc::NetworkRoute & old_route,const rtc::NetworkRoute & new_route) const255 bool RtpTransportControllerSend::IsRelevantRouteChange(
256     const rtc::NetworkRoute& old_route,
257     const rtc::NetworkRoute& new_route) const {
258   // TODO(bugs.webrtc.org/11438): Experiment with using more information/
259   // other conditions.
260   bool connected_changed = old_route.connected != new_route.connected;
261   bool route_ids_changed =
262       old_route.local.network_id() != new_route.local.network_id() ||
263       old_route.remote.network_id() != new_route.remote.network_id();
264   if (relay_bandwidth_cap_->IsFinite()) {
265     bool relaying_changed = IsRelayed(old_route) != IsRelayed(new_route);
266     return connected_changed || route_ids_changed || relaying_changed;
267   } else {
268     return connected_changed || route_ids_changed;
269   }
270 }
271 
OnNetworkRouteChanged(absl::string_view transport_name,const rtc::NetworkRoute & network_route)272 void RtpTransportControllerSend::OnNetworkRouteChanged(
273     absl::string_view transport_name,
274     const rtc::NetworkRoute& network_route) {
275   // Check if the network route is connected.
276 
277   if (!network_route.connected) {
278     // TODO(honghaiz): Perhaps handle this in SignalChannelNetworkState and
279     // consider merging these two methods.
280     return;
281   }
282 
283   absl::optional<BitrateConstraints> relay_constraint_update =
284       ApplyOrLiftRelayCap(IsRelayed(network_route));
285 
286   // Check whether the network route has changed on each transport.
287   auto result = network_routes_.insert(
288       // Explicit conversion of transport_name to std::string here is necessary
289       // to support some platforms that cannot yet deal with implicit
290       // conversion in these types of situations.
291       std::make_pair(std::string(transport_name), network_route));
292   auto kv = result.first;
293   bool inserted = result.second;
294   if (inserted || !(kv->second == network_route)) {
295     RTC_LOG(LS_INFO) << "Network route changed on transport " << transport_name
296                      << ": new_route = " << network_route.DebugString();
297     if (!inserted) {
298       RTC_LOG(LS_INFO) << "old_route = " << kv->second.DebugString();
299     }
300   }
301 
302   if (inserted) {
303     if (relay_constraint_update.has_value()) {
304       UpdateBitrateConstraints(*relay_constraint_update);
305     }
306     task_queue_.RunOrPost([this, network_route] {
307       RTC_DCHECK_RUN_ON(&task_queue_);
308       transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
309     });
310     // No need to reset BWE if this is the first time the network connects.
311     return;
312   }
313 
314   const rtc::NetworkRoute old_route = kv->second;
315   kv->second = network_route;
316 
317   // Check if enough conditions of the new/old route has changed
318   // to trigger resetting of bitrates (and a probe).
319   if (IsRelevantRouteChange(old_route, network_route)) {
320     BitrateConstraints bitrate_config = bitrate_configurator_.GetConfig();
321     RTC_LOG(LS_INFO) << "Reset bitrates to min: "
322                      << bitrate_config.min_bitrate_bps
323                      << " bps, start: " << bitrate_config.start_bitrate_bps
324                      << " bps,  max: " << bitrate_config.max_bitrate_bps
325                      << " bps.";
326     RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0);
327 
328     if (event_log_) {
329       event_log_->Log(std::make_unique<RtcEventRouteChange>(
330           network_route.connected, network_route.packet_overhead));
331     }
332     NetworkRouteChange msg;
333     msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
334     msg.constraints = ConvertConstraints(bitrate_config, clock_);
335     task_queue_.RunOrPost([this, msg, network_route] {
336       RTC_DCHECK_RUN_ON(&task_queue_);
337       transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
338       if (reset_feedback_on_route_change_) {
339         transport_feedback_adapter_.SetNetworkRoute(network_route);
340       }
341       if (controller_) {
342         PostUpdates(controller_->OnNetworkRouteChange(msg));
343       } else {
344         UpdateInitialConstraints(msg.constraints);
345       }
346       is_congested_ = false;
347       pacer_.SetCongested(false);
348     });
349   }
350 }
OnNetworkAvailability(bool network_available)351 void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
352   RTC_DCHECK_RUN_ON(&main_thread_);
353   RTC_LOG(LS_VERBOSE) << "SignalNetworkState "
354                       << (network_available ? "Up" : "Down");
355   NetworkAvailability msg;
356   msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
357   msg.network_available = network_available;
358   task_queue_.RunOrPost([this, msg]() {
359     RTC_DCHECK_RUN_ON(&task_queue_);
360     if (network_available_ == msg.network_available)
361       return;
362     network_available_ = msg.network_available;
363     if (network_available_) {
364       pacer_.Resume();
365     } else {
366       pacer_.Pause();
367     }
368     is_congested_ = false;
369     pacer_.SetCongested(false);
370 
371     if (controller_) {
372       control_handler_->SetNetworkAvailability(network_available_);
373       PostUpdates(controller_->OnNetworkAvailability(msg));
374       UpdateControlState();
375     } else {
376       MaybeCreateControllers();
377     }
378   });
379 
380   for (auto& rtp_sender : video_rtp_senders_) {
381     rtp_sender->OnNetworkAvailability(network_available);
382   }
383 }
GetBandwidthObserver()384 RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() {
385   return this;
386 }
GetPacerQueuingDelayMs() const387 int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const {
388   return pacer_.OldestPacketWaitTime().ms();
389 }
GetFirstPacketTime() const390 absl::optional<Timestamp> RtpTransportControllerSend::GetFirstPacketTime()
391     const {
392   return pacer_.FirstSentPacketTime();
393 }
EnablePeriodicAlrProbing(bool enable)394 void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
395   task_queue_.RunOrPost([this, enable]() {
396     RTC_DCHECK_RUN_ON(&task_queue_);
397     streams_config_.requests_alr_probing = enable;
398     UpdateStreamsConfig();
399   });
400 }
OnSentPacket(const rtc::SentPacket & sent_packet)401 void RtpTransportControllerSend::OnSentPacket(
402     const rtc::SentPacket& sent_packet) {
403   // Normally called on the network thread !
404 
405   // We can not use SafeTask here if we are using an owned task queue, because
406   // the safety flag will be destroyed when RtpTransportControllerSend is
407   // destroyed on the worker thread. But we must use SafeTask if we are using
408   // the worker thread, since the worker thread outlive
409   // RtpTransportControllerSend.
410   task_queue_.TaskQueueForPost()->PostTask(
411       task_queue_.MaybeSafeTask(safety_.flag(), [this, sent_packet]() {
412         RTC_DCHECK_RUN_ON(&task_queue_);
413         absl::optional<SentPacket> packet_msg =
414             transport_feedback_adapter_.ProcessSentPacket(sent_packet);
415         if (packet_msg) {
416           // Only update outstanding data if:
417           // 1. Packet feedback is used.
418           // 2. The packet has not yet received an acknowledgement.
419           // 3. It is not a retransmission of an earlier packet.
420           UpdateCongestedState();
421           if (controller_)
422             PostUpdates(controller_->OnSentPacket(*packet_msg));
423         }
424       }));
425 }
426 
OnReceivedPacket(const ReceivedPacket & packet_msg)427 void RtpTransportControllerSend::OnReceivedPacket(
428     const ReceivedPacket& packet_msg) {
429   task_queue_.RunOrPost([this, packet_msg]() {
430     RTC_DCHECK_RUN_ON(&task_queue_);
431     if (controller_)
432       PostUpdates(controller_->OnReceivedPacket(packet_msg));
433   });
434 }
435 
UpdateBitrateConstraints(const BitrateConstraints & updated)436 void RtpTransportControllerSend::UpdateBitrateConstraints(
437     const BitrateConstraints& updated) {
438   TargetRateConstraints msg = ConvertConstraints(updated, clock_);
439   task_queue_.RunOrPost([this, msg]() {
440     RTC_DCHECK_RUN_ON(&task_queue_);
441     if (controller_) {
442       PostUpdates(controller_->OnTargetRateConstraints(msg));
443     } else {
444       UpdateInitialConstraints(msg);
445     }
446   });
447 }
448 
SetSdpBitrateParameters(const BitrateConstraints & constraints)449 void RtpTransportControllerSend::SetSdpBitrateParameters(
450     const BitrateConstraints& constraints) {
451   absl::optional<BitrateConstraints> updated =
452       bitrate_configurator_.UpdateWithSdpParameters(constraints);
453   if (updated.has_value()) {
454     UpdateBitrateConstraints(*updated);
455   } else {
456     RTC_LOG(LS_VERBOSE)
457         << "WebRTC.RtpTransportControllerSend.SetSdpBitrateParameters: "
458            "nothing to update";
459   }
460 }
461 
SetClientBitratePreferences(const BitrateSettings & preferences)462 void RtpTransportControllerSend::SetClientBitratePreferences(
463     const BitrateSettings& preferences) {
464   absl::optional<BitrateConstraints> updated =
465       bitrate_configurator_.UpdateWithClientPreferences(preferences);
466   if (updated.has_value()) {
467     UpdateBitrateConstraints(*updated);
468   } else {
469     RTC_LOG(LS_VERBOSE)
470         << "WebRTC.RtpTransportControllerSend.SetClientBitratePreferences: "
471            "nothing to update";
472   }
473 }
474 
475 absl::optional<BitrateConstraints>
ApplyOrLiftRelayCap(bool is_relayed)476 RtpTransportControllerSend::ApplyOrLiftRelayCap(bool is_relayed) {
477   DataRate cap = is_relayed ? relay_bandwidth_cap_ : DataRate::PlusInfinity();
478   return bitrate_configurator_.UpdateWithRelayCap(cap);
479 }
480 
OnTransportOverheadChanged(size_t transport_overhead_bytes_per_packet)481 void RtpTransportControllerSend::OnTransportOverheadChanged(
482     size_t transport_overhead_bytes_per_packet) {
483   RTC_DCHECK_RUN_ON(&main_thread_);
484   if (transport_overhead_bytes_per_packet >= kMaxOverheadBytes) {
485     RTC_LOG(LS_ERROR) << "Transport overhead exceeds " << kMaxOverheadBytes;
486     return;
487   }
488 
489   pacer_.SetTransportOverhead(
490       DataSize::Bytes(transport_overhead_bytes_per_packet));
491 
492   // TODO(holmer): Call AudioRtpSenders when they have been moved to
493   // RtpTransportControllerSend.
494   for (auto& rtp_video_sender : video_rtp_senders_) {
495     rtp_video_sender->OnTransportOverheadChanged(
496         transport_overhead_bytes_per_packet);
497   }
498 }
499 
AccountForAudioPacketsInPacedSender(bool account_for_audio)500 void RtpTransportControllerSend::AccountForAudioPacketsInPacedSender(
501     bool account_for_audio) {
502   pacer_.SetAccountForAudioPackets(account_for_audio);
503 }
504 
IncludeOverheadInPacedSender()505 void RtpTransportControllerSend::IncludeOverheadInPacedSender() {
506   pacer_.SetIncludeOverhead();
507 }
508 
EnsureStarted()509 void RtpTransportControllerSend::EnsureStarted() {
510   if (!pacer_started_) {
511     pacer_started_ = true;
512     pacer_.EnsureStarted();
513   }
514 }
515 
OnReceivedEstimatedBitrate(uint32_t bitrate)516 void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
517   RemoteBitrateReport msg;
518   msg.receive_time = Timestamp::Millis(clock_->TimeInMilliseconds());
519   msg.bandwidth = DataRate::BitsPerSec(bitrate);
520   task_queue_.RunOrPost([this, msg]() {
521     RTC_DCHECK_RUN_ON(&task_queue_);
522     if (controller_)
523       PostUpdates(controller_->OnRemoteBitrateReport(msg));
524   });
525 }
526 
OnReceivedRtcpReceiverReport(const ReportBlockList & report_blocks,int64_t rtt_ms,int64_t now_ms)527 void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
528     const ReportBlockList& report_blocks,
529     int64_t rtt_ms,
530     int64_t now_ms) {
531   task_queue_.RunOrPost([this, report_blocks, now_ms, rtt_ms]() {
532     RTC_DCHECK_RUN_ON(&task_queue_);
533     OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
534     RoundTripTimeUpdate report;
535     report.receive_time = Timestamp::Millis(now_ms);
536     report.round_trip_time = TimeDelta::Millis(rtt_ms);
537     report.smoothed = false;
538     if (controller_ && !report.round_trip_time.IsZero())
539       PostUpdates(controller_->OnRoundTripTimeUpdate(report));
540   });
541 }
542 
OnAddPacket(const RtpPacketSendInfo & packet_info)543 void RtpTransportControllerSend::OnAddPacket(
544     const RtpPacketSendInfo& packet_info) {
545   Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds());
546 
547   task_queue_.RunOrPost([this, packet_info, creation_time]() {
548     RTC_DCHECK_RUN_ON(&task_queue_);
549     feedback_demuxer_.AddPacket(packet_info);
550     transport_feedback_adapter_.AddPacket(
551         packet_info, transport_overhead_bytes_per_packet_, creation_time);
552   });
553 }
554 
OnTransportFeedback(const rtcp::TransportFeedback & feedback)555 void RtpTransportControllerSend::OnTransportFeedback(
556     const rtcp::TransportFeedback& feedback) {
557   auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
558   task_queue_.RunOrPost([this, feedback, feedback_time]() {
559     RTC_DCHECK_RUN_ON(&task_queue_);
560     feedback_demuxer_.OnTransportFeedback(feedback);
561     absl::optional<TransportPacketsFeedback> feedback_msg =
562         transport_feedback_adapter_.ProcessTransportFeedback(feedback,
563                                                              feedback_time);
564     if (feedback_msg) {
565       if (controller_)
566         PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
567 
568       // Only update outstanding data if any packet is first time acked.
569       UpdateCongestedState();
570     }
571   });
572 }
573 
OnRemoteNetworkEstimate(NetworkStateEstimate estimate)574 void RtpTransportControllerSend::OnRemoteNetworkEstimate(
575     NetworkStateEstimate estimate) {
576   if (event_log_) {
577     event_log_->Log(std::make_unique<RtcEventRemoteEstimate>(
578         estimate.link_capacity_lower, estimate.link_capacity_upper));
579   }
580   estimate.update_time = Timestamp::Millis(clock_->TimeInMilliseconds());
581   task_queue_.RunOrPost([this, estimate] {
582     RTC_DCHECK_RUN_ON(&task_queue_);
583     if (controller_)
584       PostUpdates(controller_->OnNetworkStateEstimate(estimate));
585   });
586 }
587 
MaybeCreateControllers()588 void RtpTransportControllerSend::MaybeCreateControllers() {
589   RTC_DCHECK(!controller_);
590   RTC_DCHECK(!control_handler_);
591 
592   if (!network_available_ || !observer_)
593     return;
594   control_handler_ = std::make_unique<CongestionControlHandler>();
595 
596   initial_config_.constraints.at_time =
597       Timestamp::Millis(clock_->TimeInMilliseconds());
598   initial_config_.stream_based_config = streams_config_;
599 
600   // TODO(srte): Use fallback controller if no feedback is available.
601   if (controller_factory_override_) {
602     RTC_LOG(LS_INFO) << "Creating overridden congestion controller";
603     controller_ = controller_factory_override_->Create(initial_config_);
604     process_interval_ = controller_factory_override_->GetProcessInterval();
605   } else {
606     RTC_LOG(LS_INFO) << "Creating fallback congestion controller";
607     controller_ = controller_factory_fallback_->Create(initial_config_);
608     process_interval_ = controller_factory_fallback_->GetProcessInterval();
609   }
610   UpdateControllerWithTimeInterval();
611   StartProcessPeriodicTasks();
612 }
613 
UpdateInitialConstraints(TargetRateConstraints new_contraints)614 void RtpTransportControllerSend::UpdateInitialConstraints(
615     TargetRateConstraints new_contraints) {
616   if (!new_contraints.starting_rate)
617     new_contraints.starting_rate = initial_config_.constraints.starting_rate;
618   RTC_DCHECK(new_contraints.starting_rate);
619   initial_config_.constraints = new_contraints;
620 }
621 
StartProcessPeriodicTasks()622 void RtpTransportControllerSend::StartProcessPeriodicTasks() {
623   RTC_DCHECK_RUN_ON(&task_queue_);
624   if (!pacer_queue_update_task_.Running()) {
625     pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
626         task_queue_.TaskQueueForDelayedTasks(), kPacerQueueUpdateInterval,
627         [this]() {
628           RTC_DCHECK_RUN_ON(&task_queue_);
629           TimeDelta expected_queue_time = pacer_.ExpectedQueueTime();
630           control_handler_->SetPacerQueue(expected_queue_time);
631           UpdateControlState();
632           return kPacerQueueUpdateInterval;
633         });
634   }
635   controller_task_.Stop();
636   if (process_interval_.IsFinite()) {
637     controller_task_ = RepeatingTaskHandle::DelayedStart(
638         task_queue_.TaskQueueForDelayedTasks(), process_interval_, [this]() {
639           RTC_DCHECK_RUN_ON(&task_queue_);
640           UpdateControllerWithTimeInterval();
641           return process_interval_;
642         });
643   }
644 }
645 
UpdateControllerWithTimeInterval()646 void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
647   RTC_DCHECK(controller_);
648   ProcessInterval msg;
649   msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
650   if (add_pacing_to_cwin_)
651     msg.pacer_queue = pacer_.QueueSizeData();
652   PostUpdates(controller_->OnProcessInterval(msg));
653 }
654 
UpdateStreamsConfig()655 void RtpTransportControllerSend::UpdateStreamsConfig() {
656   streams_config_.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
657   if (controller_)
658     PostUpdates(controller_->OnStreamsConfig(streams_config_));
659 }
660 
PostUpdates(NetworkControlUpdate update)661 void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
662   if (update.congestion_window) {
663     congestion_window_size_ = *update.congestion_window;
664     UpdateCongestedState();
665   }
666   if (update.pacer_config) {
667     pacer_.SetPacingRates(update.pacer_config->data_rate(),
668                           update.pacer_config->pad_rate());
669   }
670   if (!update.probe_cluster_configs.empty()) {
671     pacer_.CreateProbeClusters(std::move(update.probe_cluster_configs));
672   }
673   if (update.target_rate) {
674     control_handler_->SetTargetRate(*update.target_rate);
675     UpdateControlState();
676   }
677 }
678 
OnReceivedRtcpReceiverReportBlocks(const ReportBlockList & report_blocks,int64_t now_ms)679 void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
680     const ReportBlockList& report_blocks,
681     int64_t now_ms) {
682   if (report_blocks.empty())
683     return;
684 
685   int total_packets_lost_delta = 0;
686   int total_packets_delta = 0;
687 
688   // Compute the packet loss from all report blocks.
689   for (const RTCPReportBlock& report_block : report_blocks) {
690     auto it = last_report_blocks_.find(report_block.source_ssrc);
691     if (it != last_report_blocks_.end()) {
692       auto number_of_packets = report_block.extended_highest_sequence_number -
693                                it->second.extended_highest_sequence_number;
694       total_packets_delta += number_of_packets;
695       auto lost_delta = report_block.packets_lost - it->second.packets_lost;
696       total_packets_lost_delta += lost_delta;
697     }
698     last_report_blocks_[report_block.source_ssrc] = report_block;
699   }
700   // Can only compute delta if there has been previous blocks to compare to. If
701   // not, total_packets_delta will be unchanged and there's nothing more to do.
702   if (!total_packets_delta)
703     return;
704   int packets_received_delta = total_packets_delta - total_packets_lost_delta;
705   // To detect lost packets, at least one packet has to be received. This check
706   // is needed to avoid bandwith detection update in
707   // VideoSendStreamTest.SuspendBelowMinBitrate
708 
709   if (packets_received_delta < 1)
710     return;
711   Timestamp now = Timestamp::Millis(now_ms);
712   TransportLossReport msg;
713   msg.packets_lost_delta = total_packets_lost_delta;
714   msg.packets_received_delta = packets_received_delta;
715   msg.receive_time = now;
716   msg.start_time = last_report_block_time_;
717   msg.end_time = now;
718   if (controller_)
719     PostUpdates(controller_->OnTransportLossReport(msg));
720   last_report_block_time_ = now;
721 }
722 
723 }  // namespace webrtc
724