• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/work_queue_sets.h"
6 
7 #include <optional>
8 
9 #include "base/check_op.h"
10 #include "base/task/sequence_manager/task_order.h"
11 #include "base/task/sequence_manager/work_queue.h"
12 
13 namespace base {
14 namespace sequence_manager {
15 namespace internal {
16 
WorkQueueSets(const char * name,Observer * observer,const SequenceManager::Settings & settings)17 WorkQueueSets::WorkQueueSets(const char* name,
18                              Observer* observer,
19                              const SequenceManager::Settings& settings)
20     : name_(name),
21       work_queue_heaps_(settings.priority_settings.priority_count()),
22 #if DCHECK_IS_ON()
23       last_rand_(settings.random_task_selection_seed),
24 #endif
25       observer_(observer) {
26 }
27 
28 WorkQueueSets::~WorkQueueSets() = default;
29 
AddQueue(WorkQueue * work_queue,size_t set_index)30 void WorkQueueSets::AddQueue(WorkQueue* work_queue, size_t set_index) {
31   DCHECK(!work_queue->work_queue_sets());
32   DCHECK_LT(set_index, work_queue_heaps_.size());
33   DCHECK(!work_queue->heap_handle().IsValid());
34   std::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
35   work_queue->AssignToWorkQueueSets(this);
36   work_queue->AssignSetIndex(set_index);
37   if (!key)
38     return;
39   bool was_empty = work_queue_heaps_[set_index].empty();
40   work_queue_heaps_[set_index].insert({*key, work_queue});
41   if (was_empty)
42     observer_->WorkQueueSetBecameNonEmpty(set_index);
43 }
44 
RemoveQueue(WorkQueue * work_queue)45 void WorkQueueSets::RemoveQueue(WorkQueue* work_queue) {
46   DCHECK_EQ(this, work_queue->work_queue_sets());
47   work_queue->AssignToWorkQueueSets(nullptr);
48   if (!work_queue->heap_handle().IsValid())
49     return;
50   size_t set_index = work_queue->work_queue_set_index();
51   DCHECK_LT(set_index, work_queue_heaps_.size());
52   work_queue_heaps_[set_index].erase(work_queue->heap_handle());
53   if (work_queue_heaps_[set_index].empty())
54     observer_->WorkQueueSetBecameEmpty(set_index);
55   DCHECK(!work_queue->heap_handle().IsValid());
56 }
57 
ChangeSetIndex(WorkQueue * work_queue,size_t set_index)58 void WorkQueueSets::ChangeSetIndex(WorkQueue* work_queue, size_t set_index) {
59   DCHECK_EQ(this, work_queue->work_queue_sets());
60   DCHECK_LT(set_index, work_queue_heaps_.size());
61   std::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
62   size_t old_set = work_queue->work_queue_set_index();
63   DCHECK_LT(old_set, work_queue_heaps_.size());
64   DCHECK_NE(old_set, set_index);
65   work_queue->AssignSetIndex(set_index);
66   DCHECK_EQ(key.has_value(), work_queue->heap_handle().IsValid());
67   if (!key)
68     return;
69   work_queue_heaps_[old_set].erase(work_queue->heap_handle());
70   bool was_empty = work_queue_heaps_[set_index].empty();
71   work_queue_heaps_[set_index].insert({*key, work_queue});
72   // Invoke `WorkQueueSetBecameNonEmpty()` before `WorkQueueSetBecameEmpty()` so
73   // `observer_` doesn't momentarily observe that all work queue sets are empty.
74   // TaskQueueSelectorTest.TestDisableEnable will fail if the order changes.
75   if (was_empty)
76     observer_->WorkQueueSetBecameNonEmpty(set_index);
77   if (work_queue_heaps_[old_set].empty()) {
78     observer_->WorkQueueSetBecameEmpty(old_set);
79   }
80 }
81 
OnQueuesFrontTaskChanged(WorkQueue * work_queue)82 void WorkQueueSets::OnQueuesFrontTaskChanged(WorkQueue* work_queue) {
83   size_t set_index = work_queue->work_queue_set_index();
84   DCHECK_EQ(this, work_queue->work_queue_sets());
85   DCHECK_LT(set_index, work_queue_heaps_.size());
86   DCHECK(work_queue->heap_handle().IsValid());
87   DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
88   if (auto key = work_queue->GetFrontTaskOrder()) {
89     // O(log n)
90     work_queue_heaps_[set_index].Replace(work_queue->heap_handle(),
91                                          {*key, work_queue});
92   } else {
93     // O(log n)
94     work_queue_heaps_[set_index].erase(work_queue->heap_handle());
95     DCHECK(!work_queue->heap_handle().IsValid());
96     if (work_queue_heaps_[set_index].empty())
97       observer_->WorkQueueSetBecameEmpty(set_index);
98   }
99 }
100 
OnTaskPushedToEmptyQueue(WorkQueue * work_queue)101 void WorkQueueSets::OnTaskPushedToEmptyQueue(WorkQueue* work_queue) {
102   // NOTE if this function changes, we need to keep |WorkQueueSets::AddQueue| in
103   // sync.
104   DCHECK_EQ(this, work_queue->work_queue_sets());
105   std::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
106   DCHECK(key);
107   size_t set_index = work_queue->work_queue_set_index();
108   DCHECK_LT(set_index, work_queue_heaps_.size())
109       << " set_index = " << set_index;
110   // |work_queue| should not be in work_queue_heaps_[set_index].
111   DCHECK(!work_queue->heap_handle().IsValid());
112   bool was_empty = work_queue_heaps_[set_index].empty();
113   work_queue_heaps_[set_index].insert({*key, work_queue});
114   if (was_empty)
115     observer_->WorkQueueSetBecameNonEmpty(set_index);
116 }
117 
OnPopMinQueueInSet(WorkQueue * work_queue)118 void WorkQueueSets::OnPopMinQueueInSet(WorkQueue* work_queue) {
119   // Assume that `work_queue` contains the lowest `TaskOrder`.
120   size_t set_index = work_queue->work_queue_set_index();
121   DCHECK_EQ(this, work_queue->work_queue_sets());
122   DCHECK_LT(set_index, work_queue_heaps_.size());
123   DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
124   DCHECK_EQ(work_queue_heaps_[set_index].top().value, work_queue)
125       << " set_index = " << set_index;
126   DCHECK(work_queue->heap_handle().IsValid());
127   if (auto key = work_queue->GetFrontTaskOrder()) {
128     // O(log n)
129     work_queue_heaps_[set_index].ReplaceTop({*key, work_queue});
130   } else {
131     // O(log n)
132     work_queue_heaps_[set_index].pop();
133     DCHECK(!work_queue->heap_handle().IsValid());
134     DCHECK(work_queue_heaps_[set_index].empty() ||
135            work_queue_heaps_[set_index].top().value != work_queue);
136     if (work_queue_heaps_[set_index].empty()) {
137       observer_->WorkQueueSetBecameEmpty(set_index);
138     }
139   }
140 }
141 
OnQueueBlocked(WorkQueue * work_queue)142 void WorkQueueSets::OnQueueBlocked(WorkQueue* work_queue) {
143   DCHECK_EQ(this, work_queue->work_queue_sets());
144   HeapHandle heap_handle = work_queue->heap_handle();
145   if (!heap_handle.IsValid())
146     return;
147   size_t set_index = work_queue->work_queue_set_index();
148   DCHECK_LT(set_index, work_queue_heaps_.size());
149   work_queue_heaps_[set_index].erase(heap_handle);
150   if (work_queue_heaps_[set_index].empty())
151     observer_->WorkQueueSetBecameEmpty(set_index);
152 }
153 
154 std::optional<WorkQueueAndTaskOrder>
GetOldestQueueAndTaskOrderInSet(size_t set_index) const155 WorkQueueSets::GetOldestQueueAndTaskOrderInSet(size_t set_index) const {
156   DCHECK_LT(set_index, work_queue_heaps_.size());
157   if (work_queue_heaps_[set_index].empty())
158     return std::nullopt;
159   const OldestTaskOrder& oldest = work_queue_heaps_[set_index].top();
160   DCHECK(oldest.value->heap_handle().IsValid());
161 #if DCHECK_IS_ON()
162   std::optional<TaskOrder> order = oldest.value->GetFrontTaskOrder();
163   DCHECK(order && oldest.key == *order);
164 #endif
165   return WorkQueueAndTaskOrder(*oldest.value, oldest.key);
166 }
167 
168 #if DCHECK_IS_ON()
169 std::optional<WorkQueueAndTaskOrder>
GetRandomQueueAndTaskOrderInSet(size_t set_index) const170 WorkQueueSets::GetRandomQueueAndTaskOrderInSet(size_t set_index) const {
171   DCHECK_LT(set_index, work_queue_heaps_.size());
172   if (work_queue_heaps_[set_index].empty())
173     return std::nullopt;
174   const OldestTaskOrder& chosen =
175       work_queue_heaps_[set_index].begin()[static_cast<long>(
176           Random() % work_queue_heaps_[set_index].size())];
177 #if DCHECK_IS_ON()
178   std::optional<TaskOrder> key = chosen.value->GetFrontTaskOrder();
179   DCHECK(key && chosen.key == *key);
180 #endif
181   return WorkQueueAndTaskOrder(*chosen.value, chosen.key);
182 }
183 #endif
184 
IsSetEmpty(size_t set_index) const185 bool WorkQueueSets::IsSetEmpty(size_t set_index) const {
186   DCHECK_LT(set_index, work_queue_heaps_.size())
187       << " set_index = " << set_index;
188   return work_queue_heaps_[set_index].empty();
189 }
190 
191 #if DCHECK_IS_ON() || !defined(NDEBUG)
ContainsWorkQueueForTest(const WorkQueue * work_queue) const192 bool WorkQueueSets::ContainsWorkQueueForTest(
193     const WorkQueue* work_queue) const {
194   std::optional<TaskOrder> task_order = work_queue->GetFrontTaskOrder();
195 
196   for (const auto& heap : work_queue_heaps_) {
197     for (const OldestTaskOrder& heap_value_pair : heap) {
198       if (heap_value_pair.value == work_queue) {
199         DCHECK(task_order);
200         DCHECK(heap_value_pair.key == *task_order);
201         DCHECK_EQ(this, work_queue->work_queue_sets());
202         return true;
203       }
204     }
205   }
206 
207   if (work_queue->work_queue_sets() == this) {
208     DCHECK(!task_order);
209     return true;
210   }
211 
212   return false;
213 }
214 #endif
215 
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const216 void WorkQueueSets::CollectSkippedOverLowerPriorityTasks(
217     const internal::WorkQueue* selected_work_queue,
218     std::vector<const Task*>* result) const {
219   std::optional<TaskOrder> task_order =
220       selected_work_queue->GetFrontTaskOrder();
221   CHECK(task_order);
222   for (size_t priority = selected_work_queue->work_queue_set_index() + 1;
223        priority < work_queue_heaps_.size(); priority++) {
224     for (const OldestTaskOrder& pair : work_queue_heaps_[priority]) {
225       pair.value->CollectTasksOlderThan(*task_order, result);
226     }
227   }
228 }
229 
230 }  // namespace internal
231 }  // namespace sequence_manager
232 }  // namespace base
233