• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *  Copyright 2020 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "call/adaptation/resource_adaptation_processor.h"
12 
13 #include <algorithm>
14 #include <string>
15 #include <utility>
16 
17 #include "absl/algorithm/container.h"
18 #include "api/video/video_adaptation_counters.h"
19 #include "call/adaptation/video_stream_adapter.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/ref_counted_object.h"
22 #include "rtc_base/strings/string_builder.h"
23 #include "rtc_base/synchronization/sequence_checker.h"
24 #include "rtc_base/task_utils/to_queued_task.h"
25 
26 namespace webrtc {
27 
ResourceListenerDelegate(ResourceAdaptationProcessor * processor)28 ResourceAdaptationProcessor::ResourceListenerDelegate::ResourceListenerDelegate(
29     ResourceAdaptationProcessor* processor)
30     : resource_adaptation_queue_(nullptr), processor_(processor) {}
31 
32 void ResourceAdaptationProcessor::ResourceListenerDelegate::
SetResourceAdaptationQueue(TaskQueueBase * resource_adaptation_queue)33     SetResourceAdaptationQueue(TaskQueueBase* resource_adaptation_queue) {
34   RTC_DCHECK(!resource_adaptation_queue_);
35   RTC_DCHECK(resource_adaptation_queue);
36   resource_adaptation_queue_ = resource_adaptation_queue;
37   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
38 }
39 
40 void ResourceAdaptationProcessor::ResourceListenerDelegate::
OnProcessorDestroyed()41     OnProcessorDestroyed() {
42   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
43   processor_ = nullptr;
44 }
45 
46 void ResourceAdaptationProcessor::ResourceListenerDelegate::
OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,ResourceUsageState usage_state)47     OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,
48                                  ResourceUsageState usage_state) {
49   if (!resource_adaptation_queue_->IsCurrent()) {
50     resource_adaptation_queue_->PostTask(ToQueuedTask(
51         [this_ref = rtc::scoped_refptr<ResourceListenerDelegate>(this),
52          resource, usage_state] {
53           this_ref->OnResourceUsageStateMeasured(resource, usage_state);
54         }));
55     return;
56   }
57   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
58   if (processor_) {
59     processor_->OnResourceUsageStateMeasured(resource, usage_state);
60   }
61 }
62 
63 ResourceAdaptationProcessor::MitigationResultAndLogMessage::
MitigationResultAndLogMessage()64     MitigationResultAndLogMessage()
65     : result(MitigationResult::kAdaptationApplied), message() {}
66 
67 ResourceAdaptationProcessor::MitigationResultAndLogMessage::
MitigationResultAndLogMessage(MitigationResult result,std::string message)68     MitigationResultAndLogMessage(MitigationResult result, std::string message)
69     : result(result), message(std::move(message)) {}
70 
ResourceAdaptationProcessor(VideoStreamEncoderObserver * encoder_stats_observer,VideoStreamAdapter * stream_adapter)71 ResourceAdaptationProcessor::ResourceAdaptationProcessor(
72     VideoStreamEncoderObserver* encoder_stats_observer,
73     VideoStreamAdapter* stream_adapter)
74     : resource_adaptation_queue_(nullptr),
75       resource_listener_delegate_(
76           new rtc::RefCountedObject<ResourceListenerDelegate>(this)),
77       encoder_stats_observer_(encoder_stats_observer),
78       resources_(),
79       stream_adapter_(stream_adapter),
80       last_reported_source_restrictions_(),
81       previous_mitigation_results_(),
82       processing_in_progress_(false) {
83   RTC_DCHECK(stream_adapter_);
84 }
85 
~ResourceAdaptationProcessor()86 ResourceAdaptationProcessor::~ResourceAdaptationProcessor() {
87   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
88   RTC_DCHECK(resources_.empty())
89       << "There are resource(s) attached to a ResourceAdaptationProcessor "
90       << "being destroyed.";
91   stream_adapter_->RemoveRestrictionsListener(this);
92   resource_listener_delegate_->OnProcessorDestroyed();
93 }
94 
SetResourceAdaptationQueue(TaskQueueBase * resource_adaptation_queue)95 void ResourceAdaptationProcessor::SetResourceAdaptationQueue(
96     TaskQueueBase* resource_adaptation_queue) {
97   RTC_DCHECK(!resource_adaptation_queue_);
98   RTC_DCHECK(resource_adaptation_queue);
99   resource_adaptation_queue_ = resource_adaptation_queue;
100   resource_listener_delegate_->SetResourceAdaptationQueue(
101       resource_adaptation_queue);
102   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
103   // Now that we have the adaptation queue we can attach as adaptation listener.
104   stream_adapter_->AddRestrictionsListener(this);
105 }
106 
AddResourceLimitationsListener(ResourceLimitationsListener * limitations_listener)107 void ResourceAdaptationProcessor::AddResourceLimitationsListener(
108     ResourceLimitationsListener* limitations_listener) {
109   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
110   RTC_DCHECK(std::find(resource_limitations_listeners_.begin(),
111                        resource_limitations_listeners_.end(),
112                        limitations_listener) ==
113              resource_limitations_listeners_.end());
114   resource_limitations_listeners_.push_back(limitations_listener);
115 }
RemoveResourceLimitationsListener(ResourceLimitationsListener * limitations_listener)116 void ResourceAdaptationProcessor::RemoveResourceLimitationsListener(
117     ResourceLimitationsListener* limitations_listener) {
118   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
119   auto it =
120       std::find(resource_limitations_listeners_.begin(),
121                 resource_limitations_listeners_.end(), limitations_listener);
122   RTC_DCHECK(it != resource_limitations_listeners_.end());
123   resource_limitations_listeners_.erase(it);
124 }
125 
AddResource(rtc::scoped_refptr<Resource> resource)126 void ResourceAdaptationProcessor::AddResource(
127     rtc::scoped_refptr<Resource> resource) {
128   RTC_DCHECK(resource);
129   {
130     MutexLock crit(&resources_lock_);
131     RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
132         << "Resource \"" << resource->Name() << "\" was already registered.";
133     resources_.push_back(resource);
134   }
135   resource->SetResourceListener(resource_listener_delegate_);
136 }
137 
138 std::vector<rtc::scoped_refptr<Resource>>
GetResources() const139 ResourceAdaptationProcessor::GetResources() const {
140   MutexLock crit(&resources_lock_);
141   return resources_;
142 }
143 
RemoveResource(rtc::scoped_refptr<Resource> resource)144 void ResourceAdaptationProcessor::RemoveResource(
145     rtc::scoped_refptr<Resource> resource) {
146   RTC_DCHECK(resource);
147   RTC_LOG(INFO) << "Removing resource \"" << resource->Name() << "\".";
148   resource->SetResourceListener(nullptr);
149   {
150     MutexLock crit(&resources_lock_);
151     auto it = absl::c_find(resources_, resource);
152     RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
153                                        << "\" was not a registered resource.";
154     resources_.erase(it);
155   }
156   RemoveLimitationsImposedByResource(std::move(resource));
157 }
158 
RemoveLimitationsImposedByResource(rtc::scoped_refptr<Resource> resource)159 void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
160     rtc::scoped_refptr<Resource> resource) {
161   if (!resource_adaptation_queue_->IsCurrent()) {
162     resource_adaptation_queue_->PostTask(ToQueuedTask(
163         [this, resource]() { RemoveLimitationsImposedByResource(resource); }));
164     return;
165   }
166   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
167   auto resource_adaptation_limits =
168       adaptation_limits_by_resources_.find(resource);
169   if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) {
170     VideoStreamAdapter::RestrictionsWithCounters adaptation_limits =
171         resource_adaptation_limits->second;
172     adaptation_limits_by_resources_.erase(resource_adaptation_limits);
173     if (adaptation_limits_by_resources_.empty()) {
174       // Only the resource being removed was adapted so clear restrictions.
175       stream_adapter_->ClearRestrictions();
176       return;
177     }
178 
179     VideoStreamAdapter::RestrictionsWithCounters most_limited =
180         FindMostLimitedResources().second;
181 
182     if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) {
183       // The removed limitations were less limited than the most limited
184       // resource. Don't change the current restrictions.
185       return;
186     }
187 
188     // Apply the new most limited resource as the next restrictions.
189     Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
190         most_limited.counters, most_limited.restrictions);
191     RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
192     stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
193 
194     RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
195                      "next most limited restrictions: "
196                   << most_limited.restrictions.ToString() << " with counters "
197                   << most_limited.counters.ToString();
198   }
199 }
200 
OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,ResourceUsageState usage_state)201 void ResourceAdaptationProcessor::OnResourceUsageStateMeasured(
202     rtc::scoped_refptr<Resource> resource,
203     ResourceUsageState usage_state) {
204   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
205   RTC_DCHECK(resource);
206   // |resource| could have been removed after signalling.
207   {
208     MutexLock crit(&resources_lock_);
209     if (absl::c_find(resources_, resource) == resources_.end()) {
210       RTC_LOG(INFO) << "Ignoring signal from removed resource \""
211                     << resource->Name() << "\".";
212       return;
213     }
214   }
215   MitigationResultAndLogMessage result_and_message;
216   switch (usage_state) {
217     case ResourceUsageState::kOveruse:
218       result_and_message = OnResourceOveruse(resource);
219       break;
220     case ResourceUsageState::kUnderuse:
221       result_and_message = OnResourceUnderuse(resource);
222       break;
223   }
224   // Maybe log the result of the operation.
225   auto it = previous_mitigation_results_.find(resource.get());
226   if (it != previous_mitigation_results_.end() &&
227       it->second == result_and_message.result) {
228     // This resource has previously reported the same result and we haven't
229     // successfully adapted since - don't log to avoid spam.
230     return;
231   }
232   RTC_LOG(INFO) << "Resource \"" << resource->Name() << "\" signalled "
233                 << ResourceUsageStateToString(usage_state) << ". "
234                 << result_and_message.message;
235   if (result_and_message.result == MitigationResult::kAdaptationApplied) {
236     previous_mitigation_results_.clear();
237   } else {
238     previous_mitigation_results_.insert(
239         std::make_pair(resource.get(), result_and_message.result));
240   }
241 }
242 
243 ResourceAdaptationProcessor::MitigationResultAndLogMessage
OnResourceUnderuse(rtc::scoped_refptr<Resource> reason_resource)244 ResourceAdaptationProcessor::OnResourceUnderuse(
245     rtc::scoped_refptr<Resource> reason_resource) {
246   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
247   RTC_DCHECK(!processing_in_progress_);
248   processing_in_progress_ = true;
249   // How can this stream be adapted up?
250   Adaptation adaptation = stream_adapter_->GetAdaptationUp(reason_resource);
251   if (adaptation.status() != Adaptation::Status::kValid) {
252     processing_in_progress_ = false;
253     rtc::StringBuilder message;
254     message << "Not adapting up because VideoStreamAdapter returned "
255             << Adaptation::StatusToString(adaptation.status());
256     return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter,
257                                          message.Release());
258   }
259   // Check that resource is most limited.
260   std::vector<rtc::scoped_refptr<Resource>> most_limited_resources;
261   VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions;
262   std::tie(most_limited_resources, most_limited_restrictions) =
263       FindMostLimitedResources();
264 
265   // If the most restricted resource is less limited than current restrictions
266   // then proceed with adapting up.
267   if (!most_limited_resources.empty() &&
268       most_limited_restrictions.counters.Total() >=
269           stream_adapter_->adaptation_counters().Total()) {
270     // If |reason_resource| is not one of the most limiting resources then abort
271     // adaptation.
272     if (absl::c_find(most_limited_resources, reason_resource) ==
273         most_limited_resources.end()) {
274       processing_in_progress_ = false;
275       rtc::StringBuilder message;
276       message << "Resource \"" << reason_resource->Name()
277               << "\" was not the most limited resource.";
278       return MitigationResultAndLogMessage(
279           MitigationResult::kNotMostLimitedResource, message.Release());
280     }
281 
282     if (most_limited_resources.size() > 1) {
283       // If there are multiple most limited resources, all must signal underuse
284       // before the adaptation is applied.
285       UpdateResourceLimitations(reason_resource, adaptation.restrictions(),
286                                 adaptation.counters());
287       processing_in_progress_ = false;
288       rtc::StringBuilder message;
289       message << "Resource \"" << reason_resource->Name()
290               << "\" was not the only most limited resource.";
291       return MitigationResultAndLogMessage(
292           MitigationResult::kSharedMostLimitedResource, message.Release());
293     }
294   }
295   // Apply adaptation.
296   stream_adapter_->ApplyAdaptation(adaptation, reason_resource);
297   processing_in_progress_ = false;
298   rtc::StringBuilder message;
299   message << "Adapted up successfully. Unfiltered adaptations: "
300           << stream_adapter_->adaptation_counters().ToString();
301   return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied,
302                                        message.Release());
303 }
304 
305 ResourceAdaptationProcessor::MitigationResultAndLogMessage
OnResourceOveruse(rtc::scoped_refptr<Resource> reason_resource)306 ResourceAdaptationProcessor::OnResourceOveruse(
307     rtc::scoped_refptr<Resource> reason_resource) {
308   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
309   RTC_DCHECK(!processing_in_progress_);
310   processing_in_progress_ = true;
311   // How can this stream be adapted up?
312   Adaptation adaptation = stream_adapter_->GetAdaptationDown();
313   if (adaptation.min_pixel_limit_reached()) {
314     encoder_stats_observer_->OnMinPixelLimitReached();
315   }
316   if (adaptation.status() != Adaptation::Status::kValid) {
317     processing_in_progress_ = false;
318     rtc::StringBuilder message;
319     message << "Not adapting down because VideoStreamAdapter returned "
320             << Adaptation::StatusToString(adaptation.status());
321     return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter,
322                                          message.Release());
323   }
324   // Apply adaptation.
325   UpdateResourceLimitations(reason_resource, adaptation.restrictions(),
326                             adaptation.counters());
327   stream_adapter_->ApplyAdaptation(adaptation, reason_resource);
328   processing_in_progress_ = false;
329   rtc::StringBuilder message;
330   message << "Adapted down successfully. Unfiltered adaptations: "
331           << stream_adapter_->adaptation_counters().ToString();
332   return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied,
333                                        message.Release());
334 }
335 
336 std::pair<std::vector<rtc::scoped_refptr<Resource>>,
337           VideoStreamAdapter::RestrictionsWithCounters>
FindMostLimitedResources() const338 ResourceAdaptationProcessor::FindMostLimitedResources() const {
339   std::vector<rtc::scoped_refptr<Resource>> most_limited_resources;
340   VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions{
341       VideoSourceRestrictions(), VideoAdaptationCounters()};
342 
343   for (const auto& resource_and_adaptation_limit_ :
344        adaptation_limits_by_resources_) {
345     const auto& restrictions_with_counters =
346         resource_and_adaptation_limit_.second;
347     if (restrictions_with_counters.counters.Total() >
348         most_limited_restrictions.counters.Total()) {
349       most_limited_restrictions = restrictions_with_counters;
350       most_limited_resources.clear();
351       most_limited_resources.push_back(resource_and_adaptation_limit_.first);
352     } else if (most_limited_restrictions.counters ==
353                restrictions_with_counters.counters) {
354       most_limited_resources.push_back(resource_and_adaptation_limit_.first);
355     }
356   }
357   return std::make_pair(std::move(most_limited_resources),
358                         most_limited_restrictions);
359 }
360 
UpdateResourceLimitations(rtc::scoped_refptr<Resource> reason_resource,const VideoSourceRestrictions & restrictions,const VideoAdaptationCounters & counters)361 void ResourceAdaptationProcessor::UpdateResourceLimitations(
362     rtc::scoped_refptr<Resource> reason_resource,
363     const VideoSourceRestrictions& restrictions,
364     const VideoAdaptationCounters& counters) {
365   auto& adaptation_limits = adaptation_limits_by_resources_[reason_resource];
366   if (adaptation_limits.restrictions == restrictions &&
367       adaptation_limits.counters == counters) {
368     return;
369   }
370   adaptation_limits = {restrictions, counters};
371 
372   std::map<rtc::scoped_refptr<Resource>, VideoAdaptationCounters> limitations;
373   for (const auto& p : adaptation_limits_by_resources_) {
374     limitations.insert(std::make_pair(p.first, p.second.counters));
375   }
376   for (auto limitations_listener : resource_limitations_listeners_) {
377     limitations_listener->OnResourceLimitationChanged(reason_resource,
378                                                       limitations);
379   }
380 }
381 
OnVideoSourceRestrictionsUpdated(VideoSourceRestrictions restrictions,const VideoAdaptationCounters & adaptation_counters,rtc::scoped_refptr<Resource> reason,const VideoSourceRestrictions & unfiltered_restrictions)382 void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated(
383     VideoSourceRestrictions restrictions,
384     const VideoAdaptationCounters& adaptation_counters,
385     rtc::scoped_refptr<Resource> reason,
386     const VideoSourceRestrictions& unfiltered_restrictions) {
387   RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
388   if (reason) {
389     UpdateResourceLimitations(reason, unfiltered_restrictions,
390                               adaptation_counters);
391   } else if (adaptation_counters.Total() == 0) {
392     // Adaptations are cleared.
393     adaptation_limits_by_resources_.clear();
394     previous_mitigation_results_.clear();
395     for (auto limitations_listener : resource_limitations_listeners_) {
396       limitations_listener->OnResourceLimitationChanged(nullptr, {});
397     }
398   }
399 }
400 
401 }  // namespace webrtc
402