1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/task_queue_selector.h"
6
7 #include <bit>
8 #include <optional>
9 #include <utility>
10
11 #include "base/check_op.h"
12 #include "base/task/sequence_manager/associated_thread_id.h"
13 #include "base/task/sequence_manager/task_queue_impl.h"
14 #include "base/task/sequence_manager/work_queue.h"
15 #include "base/task/task_features.h"
16 #include "base/threading/thread_checker.h"
17 #include "base/trace_event/base_tracing.h"
18
19 namespace base {
20 namespace sequence_manager {
21 namespace internal {
22
TaskQueueSelector(scoped_refptr<const AssociatedThreadId> associated_thread,const SequenceManager::Settings & settings)23 TaskQueueSelector::TaskQueueSelector(
24 scoped_refptr<const AssociatedThreadId> associated_thread,
25 const SequenceManager::Settings& settings)
26 : associated_thread_(std::move(associated_thread)),
27 #if DCHECK_IS_ON()
28 random_task_selection_(settings.random_task_selection_seed != 0),
29 #endif
30 non_empty_set_counts_(
31 std::vector<int>(settings.priority_settings.priority_count(), 0)),
32 delayed_work_queue_sets_("delayed", this, settings),
33 immediate_work_queue_sets_("immediate", this, settings) {
34 }
35
36 TaskQueueSelector::~TaskQueueSelector() = default;
37
AddQueue(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)38 void TaskQueueSelector::AddQueue(internal::TaskQueueImpl* queue,
39 TaskQueue::QueuePriority priority) {
40 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
41 DCHECK(queue->IsQueueEnabled());
42 AddQueueImpl(queue, priority);
43 }
44
RemoveQueue(internal::TaskQueueImpl * queue)45 void TaskQueueSelector::RemoveQueue(internal::TaskQueueImpl* queue) {
46 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
47 if (queue->IsQueueEnabled()) {
48 RemoveQueueImpl(queue);
49 }
50 }
51
EnableQueue(internal::TaskQueueImpl * queue)52 void TaskQueueSelector::EnableQueue(internal::TaskQueueImpl* queue) {
53 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
54 DCHECK(queue->IsQueueEnabled());
55 AddQueueImpl(queue, queue->GetQueuePriority());
56 if (task_queue_selector_observer_)
57 task_queue_selector_observer_->OnTaskQueueEnabled(queue);
58 }
59
DisableQueue(internal::TaskQueueImpl * queue)60 void TaskQueueSelector::DisableQueue(internal::TaskQueueImpl* queue) {
61 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
62 DCHECK(!queue->IsQueueEnabled());
63 RemoveQueueImpl(queue);
64 }
65
SetQueuePriority(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)66 void TaskQueueSelector::SetQueuePriority(internal::TaskQueueImpl* queue,
67 TaskQueue::QueuePriority priority) {
68 DCHECK_LT(priority, priority_count());
69 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
70 if (queue->IsQueueEnabled()) {
71 ChangeSetIndex(queue, priority);
72 } else {
73 // Disabled queue is not in any set so we can't use ChangeSetIndex here
74 // and have to assign priority for the queue itself.
75 queue->delayed_work_queue()->AssignSetIndex(priority);
76 queue->immediate_work_queue()->AssignSetIndex(priority);
77 }
78 DCHECK_EQ(priority, queue->GetQueuePriority());
79 }
80
AddQueueImpl(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)81 void TaskQueueSelector::AddQueueImpl(internal::TaskQueueImpl* queue,
82 TaskQueue::QueuePriority priority) {
83 #if DCHECK_IS_ON()
84 DCHECK(!CheckContainsQueueForTest(queue));
85 #endif
86 delayed_work_queue_sets_.AddQueue(queue->delayed_work_queue(), priority);
87 immediate_work_queue_sets_.AddQueue(queue->immediate_work_queue(), priority);
88 #if DCHECK_IS_ON()
89 DCHECK(CheckContainsQueueForTest(queue));
90 #endif
91 }
92
ChangeSetIndex(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)93 void TaskQueueSelector::ChangeSetIndex(internal::TaskQueueImpl* queue,
94 TaskQueue::QueuePriority priority) {
95 #if DCHECK_IS_ON()
96 DCHECK(CheckContainsQueueForTest(queue));
97 #endif
98 delayed_work_queue_sets_.ChangeSetIndex(queue->delayed_work_queue(),
99 priority);
100 immediate_work_queue_sets_.ChangeSetIndex(queue->immediate_work_queue(),
101 priority);
102 #if DCHECK_IS_ON()
103 DCHECK(CheckContainsQueueForTest(queue));
104 #endif
105 }
106
RemoveQueueImpl(internal::TaskQueueImpl * queue)107 void TaskQueueSelector::RemoveQueueImpl(internal::TaskQueueImpl* queue) {
108 #if DCHECK_IS_ON()
109 DCHECK(CheckContainsQueueForTest(queue));
110 #endif
111 delayed_work_queue_sets_.RemoveQueue(queue->delayed_work_queue());
112 immediate_work_queue_sets_.RemoveQueue(queue->immediate_work_queue());
113
114 #if DCHECK_IS_ON()
115 DCHECK(!CheckContainsQueueForTest(queue));
116 #endif
117 }
118
WorkQueueSetBecameEmpty(size_t set_index)119 void TaskQueueSelector::WorkQueueSetBecameEmpty(size_t set_index) {
120 non_empty_set_counts_[set_index]--;
121 DCHECK_GE(non_empty_set_counts_[set_index], 0);
122
123 // There are no delayed or immediate tasks for |set_index| so remove from
124 // |active_priority_tracker_|.
125 if (non_empty_set_counts_[set_index] == 0) {
126 active_priority_tracker_.SetActive(
127 static_cast<TaskQueue::QueuePriority>(set_index), false);
128 }
129 }
130
WorkQueueSetBecameNonEmpty(size_t set_index)131 void TaskQueueSelector::WorkQueueSetBecameNonEmpty(size_t set_index) {
132 non_empty_set_counts_[set_index]++;
133 DCHECK_LE(non_empty_set_counts_[set_index], kMaxNonEmptySetCount);
134
135 // There is now a delayed or an immediate task for |set_index|, so add to
136 // |active_priority_tracker_|.
137 if (non_empty_set_counts_[set_index] == 1) {
138 bool had_active_priority = active_priority_tracker_.HasActivePriority();
139 TaskQueue::QueuePriority priority =
140 static_cast<TaskQueue::QueuePriority>(set_index);
141 active_priority_tracker_.SetActive(priority, true);
142 if (!had_active_priority && task_queue_selector_observer_) {
143 task_queue_selector_observer_->OnWorkAvailable();
144 }
145 }
146 }
147
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const148 void TaskQueueSelector::CollectSkippedOverLowerPriorityTasks(
149 const internal::WorkQueue* selected_work_queue,
150 std::vector<const Task*>* result) const {
151 delayed_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
152 selected_work_queue, result);
153 immediate_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
154 selected_work_queue, result);
155 }
156
157 #if DCHECK_IS_ON() || !defined(NDEBUG)
CheckContainsQueueForTest(const internal::TaskQueueImpl * queue) const158 bool TaskQueueSelector::CheckContainsQueueForTest(
159 const internal::TaskQueueImpl* queue) const {
160 bool contains_delayed_work_queue =
161 delayed_work_queue_sets_.ContainsWorkQueueForTest(
162 queue->delayed_work_queue());
163
164 bool contains_immediate_work_queue =
165 immediate_work_queue_sets_.ContainsWorkQueueForTest(
166 queue->immediate_work_queue());
167
168 DCHECK_EQ(contains_delayed_work_queue, contains_immediate_work_queue);
169 return contains_delayed_work_queue;
170 }
171 #endif
172
SelectWorkQueueToService(SelectTaskOption option)173 WorkQueue* TaskQueueSelector::SelectWorkQueueToService(
174 SelectTaskOption option) {
175 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
176
177 auto highest_priority = GetHighestPendingPriority(option);
178 if (!highest_priority.has_value())
179 return nullptr;
180
181 // Select the priority from which we will select a task. Usually this is
182 // the highest priority for which we have work, unless we are starving a lower
183 // priority.
184 TaskQueue::QueuePriority priority = highest_priority.value();
185
186 // For selecting an immediate queue only, the highest priority can be used as
187 // a starting priority, but it is required to check work at other priorities.
188 // For the case where a delayed task is at a higher priority than an immediate
189 // task, HighestActivePriority(...) returns the priority of the delayed task
190 // but the resulting queue must be the lower one.
191 if (option == SelectTaskOption::kSkipDelayedTask) {
192 WorkQueue* queue =
193 #if DCHECK_IS_ON()
194 random_task_selection_
195 ? ChooseImmediateOnlyWithPriority<SetOperationRandom>(priority)
196 :
197 #endif
198 ChooseImmediateOnlyWithPriority<SetOperationOldest>(priority);
199 return queue;
200 }
201
202 WorkQueue* queue =
203 #if DCHECK_IS_ON()
204 random_task_selection_ ? ChooseWithPriority<SetOperationRandom>(priority)
205 :
206 #endif
207 ChooseWithPriority<SetOperationOldest>(priority);
208
209 // If we have selected a delayed task while having an immediate task of the
210 // same priority, increase the starvation count.
211 if (queue->queue_type() == WorkQueue::QueueType::kDelayed &&
212 !immediate_work_queue_sets_.IsSetEmpty(priority)) {
213 immediate_starvation_count_++;
214 } else {
215 immediate_starvation_count_ = 0;
216 }
217 return queue;
218 }
219
AsValue() const220 Value::Dict TaskQueueSelector::AsValue() const {
221 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
222 Value::Dict state;
223 state.Set("immediate_starvation_count", immediate_starvation_count_);
224 return state;
225 }
226
SetTaskQueueSelectorObserver(Observer * observer)227 void TaskQueueSelector::SetTaskQueueSelectorObserver(Observer* observer) {
228 task_queue_selector_observer_ = observer;
229 }
230
231 std::optional<TaskQueue::QueuePriority>
GetHighestPendingPriority(SelectTaskOption option) const232 TaskQueueSelector::GetHighestPendingPriority(SelectTaskOption option) const {
233 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
234 if (!active_priority_tracker_.HasActivePriority())
235 return std::nullopt;
236
237 TaskQueue::QueuePriority highest_priority =
238 active_priority_tracker_.HighestActivePriority();
239 DCHECK_LT(highest_priority, priority_count());
240 if (option != SelectTaskOption::kSkipDelayedTask)
241 return highest_priority;
242
243 for (; highest_priority != priority_count(); ++highest_priority) {
244 if (active_priority_tracker_.IsActive(highest_priority) &&
245 !immediate_work_queue_sets_.IsSetEmpty(highest_priority)) {
246 return highest_priority;
247 }
248 }
249
250 return std::nullopt;
251 }
252
SetImmediateStarvationCountForTest(int immediate_starvation_count)253 void TaskQueueSelector::SetImmediateStarvationCountForTest(
254 int immediate_starvation_count) {
255 immediate_starvation_count_ = immediate_starvation_count;
256 }
257
HasTasksWithPriority(TaskQueue::QueuePriority priority) const258 bool TaskQueueSelector::HasTasksWithPriority(
259 TaskQueue::QueuePriority priority) const {
260 return !delayed_work_queue_sets_.IsSetEmpty(priority) ||
261 !immediate_work_queue_sets_.IsSetEmpty(priority);
262 }
263
264 TaskQueueSelector::ActivePriorityTracker::ActivePriorityTracker() = default;
265
SetActive(TaskQueue::QueuePriority priority,bool is_active)266 void TaskQueueSelector::ActivePriorityTracker::SetActive(
267 TaskQueue::QueuePriority priority,
268 bool is_active) {
269 DCHECK_LT(priority, SequenceManager::PrioritySettings::kMaxPriorities);
270 DCHECK_NE(IsActive(priority), is_active);
271 if (is_active) {
272 active_priorities_ |= (size_t{1} << static_cast<size_t>(priority));
273 } else {
274 active_priorities_ &= ~(size_t{1} << static_cast<size_t>(priority));
275 }
276 }
277
278 TaskQueue::QueuePriority
HighestActivePriority() const279 TaskQueueSelector::ActivePriorityTracker::HighestActivePriority() const {
280 DCHECK_NE(active_priorities_, 0u);
281 return static_cast<TaskQueue::QueuePriority>(
282 std::countr_zero(active_priorities_));
283 }
284
285 } // namespace internal
286 } // namespace sequence_manager
287 } // namespace base
288