• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/task_queue_selector.h"
6 
7 #include <bit>
8 #include <utility>
9 
10 #include "base/check_op.h"
11 #include "base/task/sequence_manager/associated_thread_id.h"
12 #include "base/task/sequence_manager/task_queue_impl.h"
13 #include "base/task/sequence_manager/work_queue.h"
14 #include "base/task/task_features.h"
15 #include "base/threading/thread_checker.h"
16 #include "base/trace_event/base_tracing.h"
17 #include "third_party/abseil-cpp/absl/types/optional.h"
18 
19 namespace base {
20 namespace sequence_manager {
21 namespace internal {
22 
23 std::atomic_int TaskQueueSelector::g_max_delayed_starvation_tasks =
24     TaskQueueSelector::kDefaultMaxDelayedStarvationTasks;
25 
TaskQueueSelector(scoped_refptr<const AssociatedThreadId> associated_thread,const SequenceManager::Settings & settings)26 TaskQueueSelector::TaskQueueSelector(
27     scoped_refptr<const AssociatedThreadId> associated_thread,
28     const SequenceManager::Settings& settings)
29     : associated_thread_(std::move(associated_thread)),
30 #if DCHECK_IS_ON()
31       random_task_selection_(settings.random_task_selection_seed != 0),
32 #endif
33       non_empty_set_counts_(
34           std::vector<int>(settings.priority_settings.priority_count(), 0)),
35       delayed_work_queue_sets_("delayed", this, settings),
36       immediate_work_queue_sets_("immediate", this, settings) {
37 }
38 
39 TaskQueueSelector::~TaskQueueSelector() = default;
40 
41 // static
InitializeFeatures()42 void TaskQueueSelector::InitializeFeatures() {
43   g_max_delayed_starvation_tasks.store(kMaxDelayedStarvationTasksParam.Get(),
44                                        std::memory_order_relaxed);
45 }
46 
AddQueue(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)47 void TaskQueueSelector::AddQueue(internal::TaskQueueImpl* queue,
48                                  TaskQueue::QueuePriority priority) {
49   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
50   DCHECK(queue->IsQueueEnabled());
51   AddQueueImpl(queue, priority);
52 }
53 
RemoveQueue(internal::TaskQueueImpl * queue)54 void TaskQueueSelector::RemoveQueue(internal::TaskQueueImpl* queue) {
55   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
56   if (queue->IsQueueEnabled()) {
57     RemoveQueueImpl(queue);
58   }
59 }
60 
EnableQueue(internal::TaskQueueImpl * queue)61 void TaskQueueSelector::EnableQueue(internal::TaskQueueImpl* queue) {
62   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
63   DCHECK(queue->IsQueueEnabled());
64   AddQueueImpl(queue, queue->GetQueuePriority());
65   if (task_queue_selector_observer_)
66     task_queue_selector_observer_->OnTaskQueueEnabled(queue);
67 }
68 
DisableQueue(internal::TaskQueueImpl * queue)69 void TaskQueueSelector::DisableQueue(internal::TaskQueueImpl* queue) {
70   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
71   DCHECK(!queue->IsQueueEnabled());
72   RemoveQueueImpl(queue);
73 }
74 
SetQueuePriority(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)75 void TaskQueueSelector::SetQueuePriority(internal::TaskQueueImpl* queue,
76                                          TaskQueue::QueuePriority priority) {
77   DCHECK_LT(priority, priority_count());
78   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
79   if (queue->IsQueueEnabled()) {
80     ChangeSetIndex(queue, priority);
81   } else {
82     // Disabled queue is not in any set so we can't use ChangeSetIndex here
83     // and have to assign priority for the queue itself.
84     queue->delayed_work_queue()->AssignSetIndex(priority);
85     queue->immediate_work_queue()->AssignSetIndex(priority);
86   }
87   DCHECK_EQ(priority, queue->GetQueuePriority());
88 }
89 
AddQueueImpl(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)90 void TaskQueueSelector::AddQueueImpl(internal::TaskQueueImpl* queue,
91                                      TaskQueue::QueuePriority priority) {
92 #if DCHECK_IS_ON()
93   DCHECK(!CheckContainsQueueForTest(queue));
94 #endif
95   delayed_work_queue_sets_.AddQueue(queue->delayed_work_queue(), priority);
96   immediate_work_queue_sets_.AddQueue(queue->immediate_work_queue(), priority);
97 #if DCHECK_IS_ON()
98   DCHECK(CheckContainsQueueForTest(queue));
99 #endif
100 }
101 
ChangeSetIndex(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)102 void TaskQueueSelector::ChangeSetIndex(internal::TaskQueueImpl* queue,
103                                        TaskQueue::QueuePriority priority) {
104 #if DCHECK_IS_ON()
105   DCHECK(CheckContainsQueueForTest(queue));
106 #endif
107   delayed_work_queue_sets_.ChangeSetIndex(queue->delayed_work_queue(),
108                                           priority);
109   immediate_work_queue_sets_.ChangeSetIndex(queue->immediate_work_queue(),
110                                             priority);
111 #if DCHECK_IS_ON()
112   DCHECK(CheckContainsQueueForTest(queue));
113 #endif
114 }
115 
RemoveQueueImpl(internal::TaskQueueImpl * queue)116 void TaskQueueSelector::RemoveQueueImpl(internal::TaskQueueImpl* queue) {
117 #if DCHECK_IS_ON()
118   DCHECK(CheckContainsQueueForTest(queue));
119 #endif
120   delayed_work_queue_sets_.RemoveQueue(queue->delayed_work_queue());
121   immediate_work_queue_sets_.RemoveQueue(queue->immediate_work_queue());
122 
123 #if DCHECK_IS_ON()
124   DCHECK(!CheckContainsQueueForTest(queue));
125 #endif
126 }
127 
WorkQueueSetBecameEmpty(size_t set_index)128 void TaskQueueSelector::WorkQueueSetBecameEmpty(size_t set_index) {
129   non_empty_set_counts_[set_index]--;
130   DCHECK_GE(non_empty_set_counts_[set_index], 0);
131 
132   // There are no delayed or immediate tasks for |set_index| so remove from
133   // |active_priority_tracker_|.
134   if (non_empty_set_counts_[set_index] == 0) {
135     active_priority_tracker_.SetActive(
136         static_cast<TaskQueue::QueuePriority>(set_index), false);
137   }
138 }
139 
WorkQueueSetBecameNonEmpty(size_t set_index)140 void TaskQueueSelector::WorkQueueSetBecameNonEmpty(size_t set_index) {
141   non_empty_set_counts_[set_index]++;
142   DCHECK_LE(non_empty_set_counts_[set_index], kMaxNonEmptySetCount);
143 
144   // There is now a delayed or an immediate task for |set_index|, so add to
145   // |active_priority_tracker_|.
146   if (non_empty_set_counts_[set_index] == 1) {
147     TaskQueue::QueuePriority priority =
148         static_cast<TaskQueue::QueuePriority>(set_index);
149     active_priority_tracker_.SetActive(priority, true);
150   }
151 }
152 
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const153 void TaskQueueSelector::CollectSkippedOverLowerPriorityTasks(
154     const internal::WorkQueue* selected_work_queue,
155     std::vector<const Task*>* result) const {
156   delayed_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
157       selected_work_queue, result);
158   immediate_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
159       selected_work_queue, result);
160 }
161 
162 #if DCHECK_IS_ON() || !defined(NDEBUG)
CheckContainsQueueForTest(const internal::TaskQueueImpl * queue) const163 bool TaskQueueSelector::CheckContainsQueueForTest(
164     const internal::TaskQueueImpl* queue) const {
165   bool contains_delayed_work_queue =
166       delayed_work_queue_sets_.ContainsWorkQueueForTest(
167           queue->delayed_work_queue());
168 
169   bool contains_immediate_work_queue =
170       immediate_work_queue_sets_.ContainsWorkQueueForTest(
171           queue->immediate_work_queue());
172 
173   DCHECK_EQ(contains_delayed_work_queue, contains_immediate_work_queue);
174   return contains_delayed_work_queue;
175 }
176 #endif
177 
SelectWorkQueueToService(SelectTaskOption option)178 WorkQueue* TaskQueueSelector::SelectWorkQueueToService(
179     SelectTaskOption option) {
180   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
181 
182   auto highest_priority = GetHighestPendingPriority(option);
183   if (!highest_priority.has_value())
184     return nullptr;
185 
186   // Select the priority from which we will select a task. Usually this is
187   // the highest priority for which we have work, unless we are starving a lower
188   // priority.
189   TaskQueue::QueuePriority priority = highest_priority.value();
190 
191   // For selecting an immediate queue only, the highest priority can be used as
192   // a starting priority, but it is required to check work at other priorities.
193   // For the case where a delayed task is at a higher priority than an immediate
194   // task, HighestActivePriority(...) returns the priority of the delayed task
195   // but the resulting queue must be the lower one.
196   if (option == SelectTaskOption::kSkipDelayedTask) {
197     WorkQueue* queue =
198 #if DCHECK_IS_ON()
199         random_task_selection_
200             ? ChooseImmediateOnlyWithPriority<SetOperationRandom>(priority)
201             :
202 #endif
203             ChooseImmediateOnlyWithPriority<SetOperationOldest>(priority);
204     return queue;
205   }
206 
207   WorkQueue* queue =
208 #if DCHECK_IS_ON()
209       random_task_selection_ ? ChooseWithPriority<SetOperationRandom>(priority)
210                              :
211 #endif
212                              ChooseWithPriority<SetOperationOldest>(priority);
213 
214   // If we have selected a delayed task while having an immediate task of the
215   // same priority, increase the starvation count.
216   if (queue->queue_type() == WorkQueue::QueueType::kDelayed &&
217       !immediate_work_queue_sets_.IsSetEmpty(priority)) {
218     immediate_starvation_count_++;
219   } else {
220     immediate_starvation_count_ = 0;
221   }
222   return queue;
223 }
224 
AsValue() const225 Value::Dict TaskQueueSelector::AsValue() const {
226   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
227   Value::Dict state;
228   state.Set("immediate_starvation_count", immediate_starvation_count_);
229   return state;
230 }
231 
SetTaskQueueSelectorObserver(Observer * observer)232 void TaskQueueSelector::SetTaskQueueSelectorObserver(Observer* observer) {
233   task_queue_selector_observer_ = observer;
234 }
235 
236 absl::optional<TaskQueue::QueuePriority>
GetHighestPendingPriority(SelectTaskOption option) const237 TaskQueueSelector::GetHighestPendingPriority(SelectTaskOption option) const {
238   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
239   if (!active_priority_tracker_.HasActivePriority())
240     return absl::nullopt;
241 
242   TaskQueue::QueuePriority highest_priority =
243       active_priority_tracker_.HighestActivePriority();
244   DCHECK_LT(highest_priority, priority_count());
245   if (option != SelectTaskOption::kSkipDelayedTask)
246     return highest_priority;
247 
248   for (; highest_priority != priority_count(); ++highest_priority) {
249     if (active_priority_tracker_.IsActive(highest_priority) &&
250         !immediate_work_queue_sets_.IsSetEmpty(highest_priority)) {
251       return highest_priority;
252     }
253   }
254 
255   return absl::nullopt;
256 }
257 
SetImmediateStarvationCountForTest(int immediate_starvation_count)258 void TaskQueueSelector::SetImmediateStarvationCountForTest(
259     int immediate_starvation_count) {
260   immediate_starvation_count_ = immediate_starvation_count;
261 }
262 
HasTasksWithPriority(TaskQueue::QueuePriority priority) const263 bool TaskQueueSelector::HasTasksWithPriority(
264     TaskQueue::QueuePriority priority) const {
265   return !delayed_work_queue_sets_.IsSetEmpty(priority) ||
266          !immediate_work_queue_sets_.IsSetEmpty(priority);
267 }
268 
269 TaskQueueSelector::ActivePriorityTracker::ActivePriorityTracker() = default;
270 
SetActive(TaskQueue::QueuePriority priority,bool is_active)271 void TaskQueueSelector::ActivePriorityTracker::SetActive(
272     TaskQueue::QueuePriority priority,
273     bool is_active) {
274   DCHECK_LT(priority, SequenceManager::PrioritySettings::kMaxPriorities);
275   DCHECK_NE(IsActive(priority), is_active);
276   if (is_active) {
277     active_priorities_ |= (size_t{1} << static_cast<size_t>(priority));
278   } else {
279     active_priorities_ &= ~(size_t{1} << static_cast<size_t>(priority));
280   }
281 }
282 
283 TaskQueue::QueuePriority
HighestActivePriority() const284 TaskQueueSelector::ActivePriorityTracker::HighestActivePriority() const {
285   DCHECK_NE(active_priorities_, 0u);
286   return static_cast<TaskQueue::QueuePriority>(
287       std::countr_zero(active_priorities_));
288 }
289 
290 }  // namespace internal
291 }  // namespace sequence_manager
292 }  // namespace base
293