1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/task_queue_selector.h"
6
7 #include <bit>
8 #include <utility>
9
10 #include "base/check_op.h"
11 #include "base/task/sequence_manager/associated_thread_id.h"
12 #include "base/task/sequence_manager/task_queue_impl.h"
13 #include "base/task/sequence_manager/work_queue.h"
14 #include "base/task/task_features.h"
15 #include "base/threading/thread_checker.h"
16 #include "base/trace_event/base_tracing.h"
17 #include "third_party/abseil-cpp/absl/types/optional.h"
18
19 namespace base {
20 namespace sequence_manager {
21 namespace internal {
22
23 std::atomic_int TaskQueueSelector::g_max_delayed_starvation_tasks =
24 TaskQueueSelector::kDefaultMaxDelayedStarvationTasks;
25
TaskQueueSelector(scoped_refptr<const AssociatedThreadId> associated_thread,const SequenceManager::Settings & settings)26 TaskQueueSelector::TaskQueueSelector(
27 scoped_refptr<const AssociatedThreadId> associated_thread,
28 const SequenceManager::Settings& settings)
29 : associated_thread_(std::move(associated_thread)),
30 #if DCHECK_IS_ON()
31 random_task_selection_(settings.random_task_selection_seed != 0),
32 #endif
33 non_empty_set_counts_(
34 std::vector<int>(settings.priority_settings.priority_count(), 0)),
35 delayed_work_queue_sets_("delayed", this, settings),
36 immediate_work_queue_sets_("immediate", this, settings) {
37 }
38
39 TaskQueueSelector::~TaskQueueSelector() = default;
40
41 // static
InitializeFeatures()42 void TaskQueueSelector::InitializeFeatures() {
43 g_max_delayed_starvation_tasks.store(kMaxDelayedStarvationTasksParam.Get(),
44 std::memory_order_relaxed);
45 }
46
AddQueue(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)47 void TaskQueueSelector::AddQueue(internal::TaskQueueImpl* queue,
48 TaskQueue::QueuePriority priority) {
49 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
50 DCHECK(queue->IsQueueEnabled());
51 AddQueueImpl(queue, priority);
52 }
53
RemoveQueue(internal::TaskQueueImpl * queue)54 void TaskQueueSelector::RemoveQueue(internal::TaskQueueImpl* queue) {
55 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
56 if (queue->IsQueueEnabled()) {
57 RemoveQueueImpl(queue);
58 }
59 }
60
EnableQueue(internal::TaskQueueImpl * queue)61 void TaskQueueSelector::EnableQueue(internal::TaskQueueImpl* queue) {
62 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
63 DCHECK(queue->IsQueueEnabled());
64 AddQueueImpl(queue, queue->GetQueuePriority());
65 if (task_queue_selector_observer_)
66 task_queue_selector_observer_->OnTaskQueueEnabled(queue);
67 }
68
DisableQueue(internal::TaskQueueImpl * queue)69 void TaskQueueSelector::DisableQueue(internal::TaskQueueImpl* queue) {
70 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
71 DCHECK(!queue->IsQueueEnabled());
72 RemoveQueueImpl(queue);
73 }
74
SetQueuePriority(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)75 void TaskQueueSelector::SetQueuePriority(internal::TaskQueueImpl* queue,
76 TaskQueue::QueuePriority priority) {
77 DCHECK_LT(priority, priority_count());
78 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
79 if (queue->IsQueueEnabled()) {
80 ChangeSetIndex(queue, priority);
81 } else {
82 // Disabled queue is not in any set so we can't use ChangeSetIndex here
83 // and have to assign priority for the queue itself.
84 queue->delayed_work_queue()->AssignSetIndex(priority);
85 queue->immediate_work_queue()->AssignSetIndex(priority);
86 }
87 DCHECK_EQ(priority, queue->GetQueuePriority());
88 }
89
AddQueueImpl(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)90 void TaskQueueSelector::AddQueueImpl(internal::TaskQueueImpl* queue,
91 TaskQueue::QueuePriority priority) {
92 #if DCHECK_IS_ON()
93 DCHECK(!CheckContainsQueueForTest(queue));
94 #endif
95 delayed_work_queue_sets_.AddQueue(queue->delayed_work_queue(), priority);
96 immediate_work_queue_sets_.AddQueue(queue->immediate_work_queue(), priority);
97 #if DCHECK_IS_ON()
98 DCHECK(CheckContainsQueueForTest(queue));
99 #endif
100 }
101
ChangeSetIndex(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)102 void TaskQueueSelector::ChangeSetIndex(internal::TaskQueueImpl* queue,
103 TaskQueue::QueuePriority priority) {
104 #if DCHECK_IS_ON()
105 DCHECK(CheckContainsQueueForTest(queue));
106 #endif
107 delayed_work_queue_sets_.ChangeSetIndex(queue->delayed_work_queue(),
108 priority);
109 immediate_work_queue_sets_.ChangeSetIndex(queue->immediate_work_queue(),
110 priority);
111 #if DCHECK_IS_ON()
112 DCHECK(CheckContainsQueueForTest(queue));
113 #endif
114 }
115
RemoveQueueImpl(internal::TaskQueueImpl * queue)116 void TaskQueueSelector::RemoveQueueImpl(internal::TaskQueueImpl* queue) {
117 #if DCHECK_IS_ON()
118 DCHECK(CheckContainsQueueForTest(queue));
119 #endif
120 delayed_work_queue_sets_.RemoveQueue(queue->delayed_work_queue());
121 immediate_work_queue_sets_.RemoveQueue(queue->immediate_work_queue());
122
123 #if DCHECK_IS_ON()
124 DCHECK(!CheckContainsQueueForTest(queue));
125 #endif
126 }
127
WorkQueueSetBecameEmpty(size_t set_index)128 void TaskQueueSelector::WorkQueueSetBecameEmpty(size_t set_index) {
129 non_empty_set_counts_[set_index]--;
130 DCHECK_GE(non_empty_set_counts_[set_index], 0);
131
132 // There are no delayed or immediate tasks for |set_index| so remove from
133 // |active_priority_tracker_|.
134 if (non_empty_set_counts_[set_index] == 0) {
135 active_priority_tracker_.SetActive(
136 static_cast<TaskQueue::QueuePriority>(set_index), false);
137 }
138 }
139
WorkQueueSetBecameNonEmpty(size_t set_index)140 void TaskQueueSelector::WorkQueueSetBecameNonEmpty(size_t set_index) {
141 non_empty_set_counts_[set_index]++;
142 DCHECK_LE(non_empty_set_counts_[set_index], kMaxNonEmptySetCount);
143
144 // There is now a delayed or an immediate task for |set_index|, so add to
145 // |active_priority_tracker_|.
146 if (non_empty_set_counts_[set_index] == 1) {
147 TaskQueue::QueuePriority priority =
148 static_cast<TaskQueue::QueuePriority>(set_index);
149 active_priority_tracker_.SetActive(priority, true);
150 }
151 }
152
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const153 void TaskQueueSelector::CollectSkippedOverLowerPriorityTasks(
154 const internal::WorkQueue* selected_work_queue,
155 std::vector<const Task*>* result) const {
156 delayed_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
157 selected_work_queue, result);
158 immediate_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
159 selected_work_queue, result);
160 }
161
162 #if DCHECK_IS_ON() || !defined(NDEBUG)
CheckContainsQueueForTest(const internal::TaskQueueImpl * queue) const163 bool TaskQueueSelector::CheckContainsQueueForTest(
164 const internal::TaskQueueImpl* queue) const {
165 bool contains_delayed_work_queue =
166 delayed_work_queue_sets_.ContainsWorkQueueForTest(
167 queue->delayed_work_queue());
168
169 bool contains_immediate_work_queue =
170 immediate_work_queue_sets_.ContainsWorkQueueForTest(
171 queue->immediate_work_queue());
172
173 DCHECK_EQ(contains_delayed_work_queue, contains_immediate_work_queue);
174 return contains_delayed_work_queue;
175 }
176 #endif
177
SelectWorkQueueToService(SelectTaskOption option)178 WorkQueue* TaskQueueSelector::SelectWorkQueueToService(
179 SelectTaskOption option) {
180 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
181
182 auto highest_priority = GetHighestPendingPriority(option);
183 if (!highest_priority.has_value())
184 return nullptr;
185
186 // Select the priority from which we will select a task. Usually this is
187 // the highest priority for which we have work, unless we are starving a lower
188 // priority.
189 TaskQueue::QueuePriority priority = highest_priority.value();
190
191 // For selecting an immediate queue only, the highest priority can be used as
192 // a starting priority, but it is required to check work at other priorities.
193 // For the case where a delayed task is at a higher priority than an immediate
194 // task, HighestActivePriority(...) returns the priority of the delayed task
195 // but the resulting queue must be the lower one.
196 if (option == SelectTaskOption::kSkipDelayedTask) {
197 WorkQueue* queue =
198 #if DCHECK_IS_ON()
199 random_task_selection_
200 ? ChooseImmediateOnlyWithPriority<SetOperationRandom>(priority)
201 :
202 #endif
203 ChooseImmediateOnlyWithPriority<SetOperationOldest>(priority);
204 return queue;
205 }
206
207 WorkQueue* queue =
208 #if DCHECK_IS_ON()
209 random_task_selection_ ? ChooseWithPriority<SetOperationRandom>(priority)
210 :
211 #endif
212 ChooseWithPriority<SetOperationOldest>(priority);
213
214 // If we have selected a delayed task while having an immediate task of the
215 // same priority, increase the starvation count.
216 if (queue->queue_type() == WorkQueue::QueueType::kDelayed &&
217 !immediate_work_queue_sets_.IsSetEmpty(priority)) {
218 immediate_starvation_count_++;
219 } else {
220 immediate_starvation_count_ = 0;
221 }
222 return queue;
223 }
224
AsValue() const225 Value::Dict TaskQueueSelector::AsValue() const {
226 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
227 Value::Dict state;
228 state.Set("immediate_starvation_count", immediate_starvation_count_);
229 return state;
230 }
231
SetTaskQueueSelectorObserver(Observer * observer)232 void TaskQueueSelector::SetTaskQueueSelectorObserver(Observer* observer) {
233 task_queue_selector_observer_ = observer;
234 }
235
236 absl::optional<TaskQueue::QueuePriority>
GetHighestPendingPriority(SelectTaskOption option) const237 TaskQueueSelector::GetHighestPendingPriority(SelectTaskOption option) const {
238 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
239 if (!active_priority_tracker_.HasActivePriority())
240 return absl::nullopt;
241
242 TaskQueue::QueuePriority highest_priority =
243 active_priority_tracker_.HighestActivePriority();
244 DCHECK_LT(highest_priority, priority_count());
245 if (option != SelectTaskOption::kSkipDelayedTask)
246 return highest_priority;
247
248 for (; highest_priority != priority_count(); ++highest_priority) {
249 if (active_priority_tracker_.IsActive(highest_priority) &&
250 !immediate_work_queue_sets_.IsSetEmpty(highest_priority)) {
251 return highest_priority;
252 }
253 }
254
255 return absl::nullopt;
256 }
257
SetImmediateStarvationCountForTest(int immediate_starvation_count)258 void TaskQueueSelector::SetImmediateStarvationCountForTest(
259 int immediate_starvation_count) {
260 immediate_starvation_count_ = immediate_starvation_count;
261 }
262
HasTasksWithPriority(TaskQueue::QueuePriority priority) const263 bool TaskQueueSelector::HasTasksWithPriority(
264 TaskQueue::QueuePriority priority) const {
265 return !delayed_work_queue_sets_.IsSetEmpty(priority) ||
266 !immediate_work_queue_sets_.IsSetEmpty(priority);
267 }
268
269 TaskQueueSelector::ActivePriorityTracker::ActivePriorityTracker() = default;
270
SetActive(TaskQueue::QueuePriority priority,bool is_active)271 void TaskQueueSelector::ActivePriorityTracker::SetActive(
272 TaskQueue::QueuePriority priority,
273 bool is_active) {
274 DCHECK_LT(priority, SequenceManager::PrioritySettings::kMaxPriorities);
275 DCHECK_NE(IsActive(priority), is_active);
276 if (is_active) {
277 active_priorities_ |= (size_t{1} << static_cast<size_t>(priority));
278 } else {
279 active_priorities_ &= ~(size_t{1} << static_cast<size_t>(priority));
280 }
281 }
282
283 TaskQueue::QueuePriority
HighestActivePriority() const284 TaskQueueSelector::ActivePriorityTracker::HighestActivePriority() const {
285 DCHECK_NE(active_priorities_, 0u);
286 return static_cast<TaskQueue::QueuePriority>(
287 std::countr_zero(active_priorities_));
288 }
289
290 } // namespace internal
291 } // namespace sequence_manager
292 } // namespace base
293