1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/work_queue_sets.h"
6
7 #include "base/check_op.h"
8 #include "base/task/sequence_manager/task_order.h"
9 #include "base/task/sequence_manager/work_queue.h"
10 #include "third_party/abseil-cpp/absl/types/optional.h"
11
12 namespace base {
13 namespace sequence_manager {
14 namespace internal {
15
WorkQueueSets(const char * name,Observer * observer,const SequenceManager::Settings & settings)16 WorkQueueSets::WorkQueueSets(const char* name,
17 Observer* observer,
18 const SequenceManager::Settings& settings)
19 : name_(name),
20 work_queue_heaps_(settings.priority_settings.priority_count()),
21 #if DCHECK_IS_ON()
22 last_rand_(settings.random_task_selection_seed),
23 #endif
24 observer_(observer) {
25 }
26
27 WorkQueueSets::~WorkQueueSets() = default;
28
AddQueue(WorkQueue * work_queue,size_t set_index)29 void WorkQueueSets::AddQueue(WorkQueue* work_queue, size_t set_index) {
30 DCHECK(!work_queue->work_queue_sets());
31 DCHECK_LT(set_index, work_queue_heaps_.size());
32 DCHECK(!work_queue->heap_handle().IsValid());
33 absl::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
34 work_queue->AssignToWorkQueueSets(this);
35 work_queue->AssignSetIndex(set_index);
36 if (!key)
37 return;
38 bool was_empty = work_queue_heaps_[set_index].empty();
39 work_queue_heaps_[set_index].insert({*key, work_queue});
40 if (was_empty)
41 observer_->WorkQueueSetBecameNonEmpty(set_index);
42 }
43
RemoveQueue(WorkQueue * work_queue)44 void WorkQueueSets::RemoveQueue(WorkQueue* work_queue) {
45 DCHECK_EQ(this, work_queue->work_queue_sets());
46 work_queue->AssignToWorkQueueSets(nullptr);
47 if (!work_queue->heap_handle().IsValid())
48 return;
49 size_t set_index = work_queue->work_queue_set_index();
50 DCHECK_LT(set_index, work_queue_heaps_.size());
51 work_queue_heaps_[set_index].erase(work_queue->heap_handle());
52 if (work_queue_heaps_[set_index].empty())
53 observer_->WorkQueueSetBecameEmpty(set_index);
54 DCHECK(!work_queue->heap_handle().IsValid());
55 }
56
ChangeSetIndex(WorkQueue * work_queue,size_t set_index)57 void WorkQueueSets::ChangeSetIndex(WorkQueue* work_queue, size_t set_index) {
58 DCHECK_EQ(this, work_queue->work_queue_sets());
59 DCHECK_LT(set_index, work_queue_heaps_.size());
60 absl::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
61 size_t old_set = work_queue->work_queue_set_index();
62 DCHECK_LT(old_set, work_queue_heaps_.size());
63 DCHECK_NE(old_set, set_index);
64 work_queue->AssignSetIndex(set_index);
65 DCHECK_EQ(key.has_value(), work_queue->heap_handle().IsValid());
66 if (!key)
67 return;
68 work_queue_heaps_[old_set].erase(work_queue->heap_handle());
69 bool was_empty = work_queue_heaps_[set_index].empty();
70 work_queue_heaps_[set_index].insert({*key, work_queue});
71 if (work_queue_heaps_[old_set].empty())
72 observer_->WorkQueueSetBecameEmpty(old_set);
73 if (was_empty)
74 observer_->WorkQueueSetBecameNonEmpty(set_index);
75 }
76
OnQueuesFrontTaskChanged(WorkQueue * work_queue)77 void WorkQueueSets::OnQueuesFrontTaskChanged(WorkQueue* work_queue) {
78 size_t set_index = work_queue->work_queue_set_index();
79 DCHECK_EQ(this, work_queue->work_queue_sets());
80 DCHECK_LT(set_index, work_queue_heaps_.size());
81 DCHECK(work_queue->heap_handle().IsValid());
82 DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
83 if (auto key = work_queue->GetFrontTaskOrder()) {
84 // O(log n)
85 work_queue_heaps_[set_index].Replace(work_queue->heap_handle(),
86 {*key, work_queue});
87 } else {
88 // O(log n)
89 work_queue_heaps_[set_index].erase(work_queue->heap_handle());
90 DCHECK(!work_queue->heap_handle().IsValid());
91 if (work_queue_heaps_[set_index].empty())
92 observer_->WorkQueueSetBecameEmpty(set_index);
93 }
94 }
95
OnTaskPushedToEmptyQueue(WorkQueue * work_queue)96 void WorkQueueSets::OnTaskPushedToEmptyQueue(WorkQueue* work_queue) {
97 // NOTE if this function changes, we need to keep |WorkQueueSets::AddQueue| in
98 // sync.
99 DCHECK_EQ(this, work_queue->work_queue_sets());
100 absl::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
101 DCHECK(key);
102 size_t set_index = work_queue->work_queue_set_index();
103 DCHECK_LT(set_index, work_queue_heaps_.size())
104 << " set_index = " << set_index;
105 // |work_queue| should not be in work_queue_heaps_[set_index].
106 DCHECK(!work_queue->heap_handle().IsValid());
107 bool was_empty = work_queue_heaps_[set_index].empty();
108 work_queue_heaps_[set_index].insert({*key, work_queue});
109 if (was_empty)
110 observer_->WorkQueueSetBecameNonEmpty(set_index);
111 }
112
OnPopMinQueueInSet(WorkQueue * work_queue)113 void WorkQueueSets::OnPopMinQueueInSet(WorkQueue* work_queue) {
114 // Assume that `work_queue` contains the lowest `TaskOrder`.
115 size_t set_index = work_queue->work_queue_set_index();
116 DCHECK_EQ(this, work_queue->work_queue_sets());
117 DCHECK_LT(set_index, work_queue_heaps_.size());
118 DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
119 DCHECK_EQ(work_queue_heaps_[set_index].top().value, work_queue)
120 << " set_index = " << set_index;
121 DCHECK(work_queue->heap_handle().IsValid());
122 if (auto key = work_queue->GetFrontTaskOrder()) {
123 // O(log n)
124 work_queue_heaps_[set_index].ReplaceTop({*key, work_queue});
125 } else {
126 // O(log n)
127 work_queue_heaps_[set_index].pop();
128 DCHECK(!work_queue->heap_handle().IsValid());
129 DCHECK(work_queue_heaps_[set_index].empty() ||
130 work_queue_heaps_[set_index].top().value != work_queue);
131 if (work_queue_heaps_[set_index].empty()) {
132 observer_->WorkQueueSetBecameEmpty(set_index);
133 }
134 }
135 }
136
OnQueueBlocked(WorkQueue * work_queue)137 void WorkQueueSets::OnQueueBlocked(WorkQueue* work_queue) {
138 DCHECK_EQ(this, work_queue->work_queue_sets());
139 HeapHandle heap_handle = work_queue->heap_handle();
140 if (!heap_handle.IsValid())
141 return;
142 size_t set_index = work_queue->work_queue_set_index();
143 DCHECK_LT(set_index, work_queue_heaps_.size());
144 work_queue_heaps_[set_index].erase(heap_handle);
145 if (work_queue_heaps_[set_index].empty())
146 observer_->WorkQueueSetBecameEmpty(set_index);
147 }
148
149 absl::optional<WorkQueueAndTaskOrder>
GetOldestQueueAndTaskOrderInSet(size_t set_index) const150 WorkQueueSets::GetOldestQueueAndTaskOrderInSet(size_t set_index) const {
151 DCHECK_LT(set_index, work_queue_heaps_.size());
152 if (work_queue_heaps_[set_index].empty())
153 return absl::nullopt;
154 const OldestTaskOrder& oldest = work_queue_heaps_[set_index].top();
155 DCHECK(oldest.value->heap_handle().IsValid());
156 #if DCHECK_IS_ON()
157 absl::optional<TaskOrder> order = oldest.value->GetFrontTaskOrder();
158 DCHECK(order && oldest.key == *order);
159 #endif
160 return WorkQueueAndTaskOrder(*oldest.value, oldest.key);
161 }
162
163 #if DCHECK_IS_ON()
164 absl::optional<WorkQueueAndTaskOrder>
GetRandomQueueAndTaskOrderInSet(size_t set_index) const165 WorkQueueSets::GetRandomQueueAndTaskOrderInSet(size_t set_index) const {
166 DCHECK_LT(set_index, work_queue_heaps_.size());
167 if (work_queue_heaps_[set_index].empty())
168 return absl::nullopt;
169 const OldestTaskOrder& chosen =
170 work_queue_heaps_[set_index].begin()[static_cast<long>(
171 Random() % work_queue_heaps_[set_index].size())];
172 #if DCHECK_IS_ON()
173 absl::optional<TaskOrder> key = chosen.value->GetFrontTaskOrder();
174 DCHECK(key && chosen.key == *key);
175 #endif
176 return WorkQueueAndTaskOrder(*chosen.value, chosen.key);
177 }
178 #endif
179
IsSetEmpty(size_t set_index) const180 bool WorkQueueSets::IsSetEmpty(size_t set_index) const {
181 DCHECK_LT(set_index, work_queue_heaps_.size())
182 << " set_index = " << set_index;
183 return work_queue_heaps_[set_index].empty();
184 }
185
186 #if DCHECK_IS_ON() || !defined(NDEBUG)
ContainsWorkQueueForTest(const WorkQueue * work_queue) const187 bool WorkQueueSets::ContainsWorkQueueForTest(
188 const WorkQueue* work_queue) const {
189 absl::optional<TaskOrder> task_order = work_queue->GetFrontTaskOrder();
190
191 for (const auto& heap : work_queue_heaps_) {
192 for (const OldestTaskOrder& heap_value_pair : heap) {
193 if (heap_value_pair.value == work_queue) {
194 DCHECK(task_order);
195 DCHECK(heap_value_pair.key == *task_order);
196 DCHECK_EQ(this, work_queue->work_queue_sets());
197 return true;
198 }
199 }
200 }
201
202 if (work_queue->work_queue_sets() == this) {
203 DCHECK(!task_order);
204 return true;
205 }
206
207 return false;
208 }
209 #endif
210
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const211 void WorkQueueSets::CollectSkippedOverLowerPriorityTasks(
212 const internal::WorkQueue* selected_work_queue,
213 std::vector<const Task*>* result) const {
214 absl::optional<TaskOrder> task_order =
215 selected_work_queue->GetFrontTaskOrder();
216 CHECK(task_order);
217 for (size_t priority = selected_work_queue->work_queue_set_index() + 1;
218 priority < work_queue_heaps_.size(); priority++) {
219 for (const OldestTaskOrder& pair : work_queue_heaps_[priority]) {
220 pair.value->CollectTasksOlderThan(*task_order, result);
221 }
222 }
223 }
224
225 } // namespace internal
226 } // namespace sequence_manager
227 } // namespace base
228