1 /*
2 * Copyright 2020 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "call/adaptation/resource_adaptation_processor.h"
12
13 #include <algorithm>
14 #include <string>
15 #include <utility>
16
17 #include "absl/algorithm/container.h"
18 #include "api/video/video_adaptation_counters.h"
19 #include "call/adaptation/video_stream_adapter.h"
20 #include "rtc_base/logging.h"
21 #include "rtc_base/ref_counted_object.h"
22 #include "rtc_base/strings/string_builder.h"
23 #include "rtc_base/synchronization/sequence_checker.h"
24 #include "rtc_base/task_utils/to_queued_task.h"
25
26 namespace webrtc {
27
ResourceListenerDelegate(ResourceAdaptationProcessor * processor)28 ResourceAdaptationProcessor::ResourceListenerDelegate::ResourceListenerDelegate(
29 ResourceAdaptationProcessor* processor)
30 : resource_adaptation_queue_(nullptr), processor_(processor) {}
31
32 void ResourceAdaptationProcessor::ResourceListenerDelegate::
SetResourceAdaptationQueue(TaskQueueBase * resource_adaptation_queue)33 SetResourceAdaptationQueue(TaskQueueBase* resource_adaptation_queue) {
34 RTC_DCHECK(!resource_adaptation_queue_);
35 RTC_DCHECK(resource_adaptation_queue);
36 resource_adaptation_queue_ = resource_adaptation_queue;
37 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
38 }
39
40 void ResourceAdaptationProcessor::ResourceListenerDelegate::
OnProcessorDestroyed()41 OnProcessorDestroyed() {
42 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
43 processor_ = nullptr;
44 }
45
46 void ResourceAdaptationProcessor::ResourceListenerDelegate::
OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,ResourceUsageState usage_state)47 OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,
48 ResourceUsageState usage_state) {
49 if (!resource_adaptation_queue_->IsCurrent()) {
50 resource_adaptation_queue_->PostTask(ToQueuedTask(
51 [this_ref = rtc::scoped_refptr<ResourceListenerDelegate>(this),
52 resource, usage_state] {
53 this_ref->OnResourceUsageStateMeasured(resource, usage_state);
54 }));
55 return;
56 }
57 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
58 if (processor_) {
59 processor_->OnResourceUsageStateMeasured(resource, usage_state);
60 }
61 }
62
63 ResourceAdaptationProcessor::MitigationResultAndLogMessage::
MitigationResultAndLogMessage()64 MitigationResultAndLogMessage()
65 : result(MitigationResult::kAdaptationApplied), message() {}
66
67 ResourceAdaptationProcessor::MitigationResultAndLogMessage::
MitigationResultAndLogMessage(MitigationResult result,std::string message)68 MitigationResultAndLogMessage(MitigationResult result, std::string message)
69 : result(result), message(std::move(message)) {}
70
ResourceAdaptationProcessor(VideoStreamEncoderObserver * encoder_stats_observer,VideoStreamAdapter * stream_adapter)71 ResourceAdaptationProcessor::ResourceAdaptationProcessor(
72 VideoStreamEncoderObserver* encoder_stats_observer,
73 VideoStreamAdapter* stream_adapter)
74 : resource_adaptation_queue_(nullptr),
75 resource_listener_delegate_(
76 new rtc::RefCountedObject<ResourceListenerDelegate>(this)),
77 encoder_stats_observer_(encoder_stats_observer),
78 resources_(),
79 stream_adapter_(stream_adapter),
80 last_reported_source_restrictions_(),
81 previous_mitigation_results_(),
82 processing_in_progress_(false) {
83 RTC_DCHECK(stream_adapter_);
84 }
85
~ResourceAdaptationProcessor()86 ResourceAdaptationProcessor::~ResourceAdaptationProcessor() {
87 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
88 RTC_DCHECK(resources_.empty())
89 << "There are resource(s) attached to a ResourceAdaptationProcessor "
90 << "being destroyed.";
91 stream_adapter_->RemoveRestrictionsListener(this);
92 resource_listener_delegate_->OnProcessorDestroyed();
93 }
94
SetResourceAdaptationQueue(TaskQueueBase * resource_adaptation_queue)95 void ResourceAdaptationProcessor::SetResourceAdaptationQueue(
96 TaskQueueBase* resource_adaptation_queue) {
97 RTC_DCHECK(!resource_adaptation_queue_);
98 RTC_DCHECK(resource_adaptation_queue);
99 resource_adaptation_queue_ = resource_adaptation_queue;
100 resource_listener_delegate_->SetResourceAdaptationQueue(
101 resource_adaptation_queue);
102 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
103 // Now that we have the adaptation queue we can attach as adaptation listener.
104 stream_adapter_->AddRestrictionsListener(this);
105 }
106
AddResourceLimitationsListener(ResourceLimitationsListener * limitations_listener)107 void ResourceAdaptationProcessor::AddResourceLimitationsListener(
108 ResourceLimitationsListener* limitations_listener) {
109 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
110 RTC_DCHECK(std::find(resource_limitations_listeners_.begin(),
111 resource_limitations_listeners_.end(),
112 limitations_listener) ==
113 resource_limitations_listeners_.end());
114 resource_limitations_listeners_.push_back(limitations_listener);
115 }
RemoveResourceLimitationsListener(ResourceLimitationsListener * limitations_listener)116 void ResourceAdaptationProcessor::RemoveResourceLimitationsListener(
117 ResourceLimitationsListener* limitations_listener) {
118 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
119 auto it =
120 std::find(resource_limitations_listeners_.begin(),
121 resource_limitations_listeners_.end(), limitations_listener);
122 RTC_DCHECK(it != resource_limitations_listeners_.end());
123 resource_limitations_listeners_.erase(it);
124 }
125
AddResource(rtc::scoped_refptr<Resource> resource)126 void ResourceAdaptationProcessor::AddResource(
127 rtc::scoped_refptr<Resource> resource) {
128 RTC_DCHECK(resource);
129 {
130 MutexLock crit(&resources_lock_);
131 RTC_DCHECK(absl::c_find(resources_, resource) == resources_.end())
132 << "Resource \"" << resource->Name() << "\" was already registered.";
133 resources_.push_back(resource);
134 }
135 resource->SetResourceListener(resource_listener_delegate_);
136 }
137
138 std::vector<rtc::scoped_refptr<Resource>>
GetResources() const139 ResourceAdaptationProcessor::GetResources() const {
140 MutexLock crit(&resources_lock_);
141 return resources_;
142 }
143
RemoveResource(rtc::scoped_refptr<Resource> resource)144 void ResourceAdaptationProcessor::RemoveResource(
145 rtc::scoped_refptr<Resource> resource) {
146 RTC_DCHECK(resource);
147 RTC_LOG(INFO) << "Removing resource \"" << resource->Name() << "\".";
148 resource->SetResourceListener(nullptr);
149 {
150 MutexLock crit(&resources_lock_);
151 auto it = absl::c_find(resources_, resource);
152 RTC_DCHECK(it != resources_.end()) << "Resource \"" << resource->Name()
153 << "\" was not a registered resource.";
154 resources_.erase(it);
155 }
156 RemoveLimitationsImposedByResource(std::move(resource));
157 }
158
RemoveLimitationsImposedByResource(rtc::scoped_refptr<Resource> resource)159 void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
160 rtc::scoped_refptr<Resource> resource) {
161 if (!resource_adaptation_queue_->IsCurrent()) {
162 resource_adaptation_queue_->PostTask(ToQueuedTask(
163 [this, resource]() { RemoveLimitationsImposedByResource(resource); }));
164 return;
165 }
166 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
167 auto resource_adaptation_limits =
168 adaptation_limits_by_resources_.find(resource);
169 if (resource_adaptation_limits != adaptation_limits_by_resources_.end()) {
170 VideoStreamAdapter::RestrictionsWithCounters adaptation_limits =
171 resource_adaptation_limits->second;
172 adaptation_limits_by_resources_.erase(resource_adaptation_limits);
173 if (adaptation_limits_by_resources_.empty()) {
174 // Only the resource being removed was adapted so clear restrictions.
175 stream_adapter_->ClearRestrictions();
176 return;
177 }
178
179 VideoStreamAdapter::RestrictionsWithCounters most_limited =
180 FindMostLimitedResources().second;
181
182 if (adaptation_limits.counters.Total() <= most_limited.counters.Total()) {
183 // The removed limitations were less limited than the most limited
184 // resource. Don't change the current restrictions.
185 return;
186 }
187
188 // Apply the new most limited resource as the next restrictions.
189 Adaptation adapt_to = stream_adapter_->GetAdaptationTo(
190 most_limited.counters, most_limited.restrictions);
191 RTC_DCHECK_EQ(adapt_to.status(), Adaptation::Status::kValid);
192 stream_adapter_->ApplyAdaptation(adapt_to, nullptr);
193
194 RTC_LOG(INFO) << "Most limited resource removed. Restoring restrictions to "
195 "next most limited restrictions: "
196 << most_limited.restrictions.ToString() << " with counters "
197 << most_limited.counters.ToString();
198 }
199 }
200
OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,ResourceUsageState usage_state)201 void ResourceAdaptationProcessor::OnResourceUsageStateMeasured(
202 rtc::scoped_refptr<Resource> resource,
203 ResourceUsageState usage_state) {
204 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
205 RTC_DCHECK(resource);
206 // |resource| could have been removed after signalling.
207 {
208 MutexLock crit(&resources_lock_);
209 if (absl::c_find(resources_, resource) == resources_.end()) {
210 RTC_LOG(INFO) << "Ignoring signal from removed resource \""
211 << resource->Name() << "\".";
212 return;
213 }
214 }
215 MitigationResultAndLogMessage result_and_message;
216 switch (usage_state) {
217 case ResourceUsageState::kOveruse:
218 result_and_message = OnResourceOveruse(resource);
219 break;
220 case ResourceUsageState::kUnderuse:
221 result_and_message = OnResourceUnderuse(resource);
222 break;
223 }
224 // Maybe log the result of the operation.
225 auto it = previous_mitigation_results_.find(resource.get());
226 if (it != previous_mitigation_results_.end() &&
227 it->second == result_and_message.result) {
228 // This resource has previously reported the same result and we haven't
229 // successfully adapted since - don't log to avoid spam.
230 return;
231 }
232 RTC_LOG(INFO) << "Resource \"" << resource->Name() << "\" signalled "
233 << ResourceUsageStateToString(usage_state) << ". "
234 << result_and_message.message;
235 if (result_and_message.result == MitigationResult::kAdaptationApplied) {
236 previous_mitigation_results_.clear();
237 } else {
238 previous_mitigation_results_.insert(
239 std::make_pair(resource.get(), result_and_message.result));
240 }
241 }
242
243 ResourceAdaptationProcessor::MitigationResultAndLogMessage
OnResourceUnderuse(rtc::scoped_refptr<Resource> reason_resource)244 ResourceAdaptationProcessor::OnResourceUnderuse(
245 rtc::scoped_refptr<Resource> reason_resource) {
246 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
247 RTC_DCHECK(!processing_in_progress_);
248 processing_in_progress_ = true;
249 // How can this stream be adapted up?
250 Adaptation adaptation = stream_adapter_->GetAdaptationUp(reason_resource);
251 if (adaptation.status() != Adaptation::Status::kValid) {
252 processing_in_progress_ = false;
253 rtc::StringBuilder message;
254 message << "Not adapting up because VideoStreamAdapter returned "
255 << Adaptation::StatusToString(adaptation.status());
256 return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter,
257 message.Release());
258 }
259 // Check that resource is most limited.
260 std::vector<rtc::scoped_refptr<Resource>> most_limited_resources;
261 VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions;
262 std::tie(most_limited_resources, most_limited_restrictions) =
263 FindMostLimitedResources();
264
265 // If the most restricted resource is less limited than current restrictions
266 // then proceed with adapting up.
267 if (!most_limited_resources.empty() &&
268 most_limited_restrictions.counters.Total() >=
269 stream_adapter_->adaptation_counters().Total()) {
270 // If |reason_resource| is not one of the most limiting resources then abort
271 // adaptation.
272 if (absl::c_find(most_limited_resources, reason_resource) ==
273 most_limited_resources.end()) {
274 processing_in_progress_ = false;
275 rtc::StringBuilder message;
276 message << "Resource \"" << reason_resource->Name()
277 << "\" was not the most limited resource.";
278 return MitigationResultAndLogMessage(
279 MitigationResult::kNotMostLimitedResource, message.Release());
280 }
281
282 if (most_limited_resources.size() > 1) {
283 // If there are multiple most limited resources, all must signal underuse
284 // before the adaptation is applied.
285 UpdateResourceLimitations(reason_resource, adaptation.restrictions(),
286 adaptation.counters());
287 processing_in_progress_ = false;
288 rtc::StringBuilder message;
289 message << "Resource \"" << reason_resource->Name()
290 << "\" was not the only most limited resource.";
291 return MitigationResultAndLogMessage(
292 MitigationResult::kSharedMostLimitedResource, message.Release());
293 }
294 }
295 // Apply adaptation.
296 stream_adapter_->ApplyAdaptation(adaptation, reason_resource);
297 processing_in_progress_ = false;
298 rtc::StringBuilder message;
299 message << "Adapted up successfully. Unfiltered adaptations: "
300 << stream_adapter_->adaptation_counters().ToString();
301 return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied,
302 message.Release());
303 }
304
305 ResourceAdaptationProcessor::MitigationResultAndLogMessage
OnResourceOveruse(rtc::scoped_refptr<Resource> reason_resource)306 ResourceAdaptationProcessor::OnResourceOveruse(
307 rtc::scoped_refptr<Resource> reason_resource) {
308 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
309 RTC_DCHECK(!processing_in_progress_);
310 processing_in_progress_ = true;
311 // How can this stream be adapted up?
312 Adaptation adaptation = stream_adapter_->GetAdaptationDown();
313 if (adaptation.min_pixel_limit_reached()) {
314 encoder_stats_observer_->OnMinPixelLimitReached();
315 }
316 if (adaptation.status() != Adaptation::Status::kValid) {
317 processing_in_progress_ = false;
318 rtc::StringBuilder message;
319 message << "Not adapting down because VideoStreamAdapter returned "
320 << Adaptation::StatusToString(adaptation.status());
321 return MitigationResultAndLogMessage(MitigationResult::kRejectedByAdapter,
322 message.Release());
323 }
324 // Apply adaptation.
325 UpdateResourceLimitations(reason_resource, adaptation.restrictions(),
326 adaptation.counters());
327 stream_adapter_->ApplyAdaptation(adaptation, reason_resource);
328 processing_in_progress_ = false;
329 rtc::StringBuilder message;
330 message << "Adapted down successfully. Unfiltered adaptations: "
331 << stream_adapter_->adaptation_counters().ToString();
332 return MitigationResultAndLogMessage(MitigationResult::kAdaptationApplied,
333 message.Release());
334 }
335
336 std::pair<std::vector<rtc::scoped_refptr<Resource>>,
337 VideoStreamAdapter::RestrictionsWithCounters>
FindMostLimitedResources() const338 ResourceAdaptationProcessor::FindMostLimitedResources() const {
339 std::vector<rtc::scoped_refptr<Resource>> most_limited_resources;
340 VideoStreamAdapter::RestrictionsWithCounters most_limited_restrictions{
341 VideoSourceRestrictions(), VideoAdaptationCounters()};
342
343 for (const auto& resource_and_adaptation_limit_ :
344 adaptation_limits_by_resources_) {
345 const auto& restrictions_with_counters =
346 resource_and_adaptation_limit_.second;
347 if (restrictions_with_counters.counters.Total() >
348 most_limited_restrictions.counters.Total()) {
349 most_limited_restrictions = restrictions_with_counters;
350 most_limited_resources.clear();
351 most_limited_resources.push_back(resource_and_adaptation_limit_.first);
352 } else if (most_limited_restrictions.counters ==
353 restrictions_with_counters.counters) {
354 most_limited_resources.push_back(resource_and_adaptation_limit_.first);
355 }
356 }
357 return std::make_pair(std::move(most_limited_resources),
358 most_limited_restrictions);
359 }
360
UpdateResourceLimitations(rtc::scoped_refptr<Resource> reason_resource,const VideoSourceRestrictions & restrictions,const VideoAdaptationCounters & counters)361 void ResourceAdaptationProcessor::UpdateResourceLimitations(
362 rtc::scoped_refptr<Resource> reason_resource,
363 const VideoSourceRestrictions& restrictions,
364 const VideoAdaptationCounters& counters) {
365 auto& adaptation_limits = adaptation_limits_by_resources_[reason_resource];
366 if (adaptation_limits.restrictions == restrictions &&
367 adaptation_limits.counters == counters) {
368 return;
369 }
370 adaptation_limits = {restrictions, counters};
371
372 std::map<rtc::scoped_refptr<Resource>, VideoAdaptationCounters> limitations;
373 for (const auto& p : adaptation_limits_by_resources_) {
374 limitations.insert(std::make_pair(p.first, p.second.counters));
375 }
376 for (auto limitations_listener : resource_limitations_listeners_) {
377 limitations_listener->OnResourceLimitationChanged(reason_resource,
378 limitations);
379 }
380 }
381
OnVideoSourceRestrictionsUpdated(VideoSourceRestrictions restrictions,const VideoAdaptationCounters & adaptation_counters,rtc::scoped_refptr<Resource> reason,const VideoSourceRestrictions & unfiltered_restrictions)382 void ResourceAdaptationProcessor::OnVideoSourceRestrictionsUpdated(
383 VideoSourceRestrictions restrictions,
384 const VideoAdaptationCounters& adaptation_counters,
385 rtc::scoped_refptr<Resource> reason,
386 const VideoSourceRestrictions& unfiltered_restrictions) {
387 RTC_DCHECK_RUN_ON(resource_adaptation_queue_);
388 if (reason) {
389 UpdateResourceLimitations(reason, unfiltered_restrictions,
390 adaptation_counters);
391 } else if (adaptation_counters.Total() == 0) {
392 // Adaptations are cleared.
393 adaptation_limits_by_resources_.clear();
394 previous_mitigation_results_.clear();
395 for (auto limitations_listener : resource_limitations_listeners_) {
396 limitations_listener->OnResourceLimitationChanged(nullptr, {});
397 }
398 }
399 }
400
401 } // namespace webrtc
402