• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/task_queue_selector.h"
6 
7 #include <bit>
8 #include <optional>
9 #include <utility>
10 
11 #include "base/check_op.h"
12 #include "base/task/sequence_manager/associated_thread_id.h"
13 #include "base/task/sequence_manager/task_queue_impl.h"
14 #include "base/task/sequence_manager/work_queue.h"
15 #include "base/task/task_features.h"
16 #include "base/threading/thread_checker.h"
17 #include "base/trace_event/base_tracing.h"
18 
19 namespace base {
20 namespace sequence_manager {
21 namespace internal {
22 
TaskQueueSelector(scoped_refptr<const AssociatedThreadId> associated_thread,const SequenceManager::Settings & settings)23 TaskQueueSelector::TaskQueueSelector(
24     scoped_refptr<const AssociatedThreadId> associated_thread,
25     const SequenceManager::Settings& settings)
26     : associated_thread_(std::move(associated_thread)),
27 #if DCHECK_IS_ON()
28       random_task_selection_(settings.random_task_selection_seed != 0),
29 #endif
30       non_empty_set_counts_(
31           std::vector<int>(settings.priority_settings.priority_count(), 0)),
32       delayed_work_queue_sets_("delayed", this, settings),
33       immediate_work_queue_sets_("immediate", this, settings) {
34 }
35 
36 TaskQueueSelector::~TaskQueueSelector() = default;
37 
AddQueue(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)38 void TaskQueueSelector::AddQueue(internal::TaskQueueImpl* queue,
39                                  TaskQueue::QueuePriority priority) {
40   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
41   DCHECK(queue->IsQueueEnabled());
42   AddQueueImpl(queue, priority);
43 }
44 
RemoveQueue(internal::TaskQueueImpl * queue)45 void TaskQueueSelector::RemoveQueue(internal::TaskQueueImpl* queue) {
46   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
47   if (queue->IsQueueEnabled()) {
48     RemoveQueueImpl(queue);
49   }
50 }
51 
EnableQueue(internal::TaskQueueImpl * queue)52 void TaskQueueSelector::EnableQueue(internal::TaskQueueImpl* queue) {
53   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
54   DCHECK(queue->IsQueueEnabled());
55   AddQueueImpl(queue, queue->GetQueuePriority());
56   if (task_queue_selector_observer_)
57     task_queue_selector_observer_->OnTaskQueueEnabled(queue);
58 }
59 
DisableQueue(internal::TaskQueueImpl * queue)60 void TaskQueueSelector::DisableQueue(internal::TaskQueueImpl* queue) {
61   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
62   DCHECK(!queue->IsQueueEnabled());
63   RemoveQueueImpl(queue);
64 }
65 
SetQueuePriority(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)66 void TaskQueueSelector::SetQueuePriority(internal::TaskQueueImpl* queue,
67                                          TaskQueue::QueuePriority priority) {
68   DCHECK_LT(priority, priority_count());
69   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
70   if (queue->IsQueueEnabled()) {
71     ChangeSetIndex(queue, priority);
72   } else {
73     // Disabled queue is not in any set so we can't use ChangeSetIndex here
74     // and have to assign priority for the queue itself.
75     queue->delayed_work_queue()->AssignSetIndex(priority);
76     queue->immediate_work_queue()->AssignSetIndex(priority);
77   }
78   DCHECK_EQ(priority, queue->GetQueuePriority());
79 }
80 
AddQueueImpl(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)81 void TaskQueueSelector::AddQueueImpl(internal::TaskQueueImpl* queue,
82                                      TaskQueue::QueuePriority priority) {
83 #if DCHECK_IS_ON()
84   DCHECK(!CheckContainsQueueForTest(queue));
85 #endif
86   delayed_work_queue_sets_.AddQueue(queue->delayed_work_queue(), priority);
87   immediate_work_queue_sets_.AddQueue(queue->immediate_work_queue(), priority);
88 #if DCHECK_IS_ON()
89   DCHECK(CheckContainsQueueForTest(queue));
90 #endif
91 }
92 
ChangeSetIndex(internal::TaskQueueImpl * queue,TaskQueue::QueuePriority priority)93 void TaskQueueSelector::ChangeSetIndex(internal::TaskQueueImpl* queue,
94                                        TaskQueue::QueuePriority priority) {
95 #if DCHECK_IS_ON()
96   DCHECK(CheckContainsQueueForTest(queue));
97 #endif
98   delayed_work_queue_sets_.ChangeSetIndex(queue->delayed_work_queue(),
99                                           priority);
100   immediate_work_queue_sets_.ChangeSetIndex(queue->immediate_work_queue(),
101                                             priority);
102 #if DCHECK_IS_ON()
103   DCHECK(CheckContainsQueueForTest(queue));
104 #endif
105 }
106 
RemoveQueueImpl(internal::TaskQueueImpl * queue)107 void TaskQueueSelector::RemoveQueueImpl(internal::TaskQueueImpl* queue) {
108 #if DCHECK_IS_ON()
109   DCHECK(CheckContainsQueueForTest(queue));
110 #endif
111   delayed_work_queue_sets_.RemoveQueue(queue->delayed_work_queue());
112   immediate_work_queue_sets_.RemoveQueue(queue->immediate_work_queue());
113 
114 #if DCHECK_IS_ON()
115   DCHECK(!CheckContainsQueueForTest(queue));
116 #endif
117 }
118 
WorkQueueSetBecameEmpty(size_t set_index)119 void TaskQueueSelector::WorkQueueSetBecameEmpty(size_t set_index) {
120   non_empty_set_counts_[set_index]--;
121   DCHECK_GE(non_empty_set_counts_[set_index], 0);
122 
123   // There are no delayed or immediate tasks for |set_index| so remove from
124   // |active_priority_tracker_|.
125   if (non_empty_set_counts_[set_index] == 0) {
126     active_priority_tracker_.SetActive(
127         static_cast<TaskQueue::QueuePriority>(set_index), false);
128   }
129 }
130 
WorkQueueSetBecameNonEmpty(size_t set_index)131 void TaskQueueSelector::WorkQueueSetBecameNonEmpty(size_t set_index) {
132   non_empty_set_counts_[set_index]++;
133   DCHECK_LE(non_empty_set_counts_[set_index], kMaxNonEmptySetCount);
134 
135   // There is now a delayed or an immediate task for |set_index|, so add to
136   // |active_priority_tracker_|.
137   if (non_empty_set_counts_[set_index] == 1) {
138     bool had_active_priority = active_priority_tracker_.HasActivePriority();
139     TaskQueue::QueuePriority priority =
140         static_cast<TaskQueue::QueuePriority>(set_index);
141     active_priority_tracker_.SetActive(priority, true);
142     if (!had_active_priority && task_queue_selector_observer_) {
143       task_queue_selector_observer_->OnWorkAvailable();
144     }
145   }
146 }
147 
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const148 void TaskQueueSelector::CollectSkippedOverLowerPriorityTasks(
149     const internal::WorkQueue* selected_work_queue,
150     std::vector<const Task*>* result) const {
151   delayed_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
152       selected_work_queue, result);
153   immediate_work_queue_sets_.CollectSkippedOverLowerPriorityTasks(
154       selected_work_queue, result);
155 }
156 
157 #if DCHECK_IS_ON() || !defined(NDEBUG)
CheckContainsQueueForTest(const internal::TaskQueueImpl * queue) const158 bool TaskQueueSelector::CheckContainsQueueForTest(
159     const internal::TaskQueueImpl* queue) const {
160   bool contains_delayed_work_queue =
161       delayed_work_queue_sets_.ContainsWorkQueueForTest(
162           queue->delayed_work_queue());
163 
164   bool contains_immediate_work_queue =
165       immediate_work_queue_sets_.ContainsWorkQueueForTest(
166           queue->immediate_work_queue());
167 
168   DCHECK_EQ(contains_delayed_work_queue, contains_immediate_work_queue);
169   return contains_delayed_work_queue;
170 }
171 #endif
172 
SelectWorkQueueToService(SelectTaskOption option)173 WorkQueue* TaskQueueSelector::SelectWorkQueueToService(
174     SelectTaskOption option) {
175   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
176 
177   auto highest_priority = GetHighestPendingPriority(option);
178   if (!highest_priority.has_value())
179     return nullptr;
180 
181   // Select the priority from which we will select a task. Usually this is
182   // the highest priority for which we have work, unless we are starving a lower
183   // priority.
184   TaskQueue::QueuePriority priority = highest_priority.value();
185 
186   // For selecting an immediate queue only, the highest priority can be used as
187   // a starting priority, but it is required to check work at other priorities.
188   // For the case where a delayed task is at a higher priority than an immediate
189   // task, HighestActivePriority(...) returns the priority of the delayed task
190   // but the resulting queue must be the lower one.
191   if (option == SelectTaskOption::kSkipDelayedTask) {
192     WorkQueue* queue =
193 #if DCHECK_IS_ON()
194         random_task_selection_
195             ? ChooseImmediateOnlyWithPriority<SetOperationRandom>(priority)
196             :
197 #endif
198             ChooseImmediateOnlyWithPriority<SetOperationOldest>(priority);
199     return queue;
200   }
201 
202   WorkQueue* queue =
203 #if DCHECK_IS_ON()
204       random_task_selection_ ? ChooseWithPriority<SetOperationRandom>(priority)
205                              :
206 #endif
207                              ChooseWithPriority<SetOperationOldest>(priority);
208 
209   // If we have selected a delayed task while having an immediate task of the
210   // same priority, increase the starvation count.
211   if (queue->queue_type() == WorkQueue::QueueType::kDelayed &&
212       !immediate_work_queue_sets_.IsSetEmpty(priority)) {
213     immediate_starvation_count_++;
214   } else {
215     immediate_starvation_count_ = 0;
216   }
217   return queue;
218 }
219 
AsValue() const220 Value::Dict TaskQueueSelector::AsValue() const {
221   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
222   Value::Dict state;
223   state.Set("immediate_starvation_count", immediate_starvation_count_);
224   return state;
225 }
226 
SetTaskQueueSelectorObserver(Observer * observer)227 void TaskQueueSelector::SetTaskQueueSelectorObserver(Observer* observer) {
228   task_queue_selector_observer_ = observer;
229 }
230 
231 std::optional<TaskQueue::QueuePriority>
GetHighestPendingPriority(SelectTaskOption option) const232 TaskQueueSelector::GetHighestPendingPriority(SelectTaskOption option) const {
233   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
234   if (!active_priority_tracker_.HasActivePriority())
235     return std::nullopt;
236 
237   TaskQueue::QueuePriority highest_priority =
238       active_priority_tracker_.HighestActivePriority();
239   DCHECK_LT(highest_priority, priority_count());
240   if (option != SelectTaskOption::kSkipDelayedTask)
241     return highest_priority;
242 
243   for (; highest_priority != priority_count(); ++highest_priority) {
244     if (active_priority_tracker_.IsActive(highest_priority) &&
245         !immediate_work_queue_sets_.IsSetEmpty(highest_priority)) {
246       return highest_priority;
247     }
248   }
249 
250   return std::nullopt;
251 }
252 
SetImmediateStarvationCountForTest(int immediate_starvation_count)253 void TaskQueueSelector::SetImmediateStarvationCountForTest(
254     int immediate_starvation_count) {
255   immediate_starvation_count_ = immediate_starvation_count;
256 }
257 
HasTasksWithPriority(TaskQueue::QueuePriority priority) const258 bool TaskQueueSelector::HasTasksWithPriority(
259     TaskQueue::QueuePriority priority) const {
260   return !delayed_work_queue_sets_.IsSetEmpty(priority) ||
261          !immediate_work_queue_sets_.IsSetEmpty(priority);
262 }
263 
264 TaskQueueSelector::ActivePriorityTracker::ActivePriorityTracker() = default;
265 
SetActive(TaskQueue::QueuePriority priority,bool is_active)266 void TaskQueueSelector::ActivePriorityTracker::SetActive(
267     TaskQueue::QueuePriority priority,
268     bool is_active) {
269   DCHECK_LT(priority, SequenceManager::PrioritySettings::kMaxPriorities);
270   DCHECK_NE(IsActive(priority), is_active);
271   if (is_active) {
272     active_priorities_ |= (size_t{1} << static_cast<size_t>(priority));
273   } else {
274     active_priorities_ &= ~(size_t{1} << static_cast<size_t>(priority));
275   }
276 }
277 
278 TaskQueue::QueuePriority
HighestActivePriority() const279 TaskQueueSelector::ActivePriorityTracker::HighestActivePriority() const {
280   DCHECK_NE(active_priorities_, 0u);
281   return static_cast<TaskQueue::QueuePriority>(
282       std::countr_zero(active_priorities_));
283 }
284 
285 }  // namespace internal
286 }  // namespace sequence_manager
287 }  // namespace base
288