• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/work_queue_sets.h"
6 
7 #include "base/check_op.h"
8 #include "base/task/sequence_manager/task_order.h"
9 #include "base/task/sequence_manager/work_queue.h"
10 #include "third_party/abseil-cpp/absl/types/optional.h"
11 
12 namespace base {
13 namespace sequence_manager {
14 namespace internal {
15 
WorkQueueSets(const char * name,Observer * observer,const SequenceManager::Settings & settings)16 WorkQueueSets::WorkQueueSets(const char* name,
17                              Observer* observer,
18                              const SequenceManager::Settings& settings)
19     : name_(name),
20       work_queue_heaps_(settings.priority_settings.priority_count()),
21 #if DCHECK_IS_ON()
22       last_rand_(settings.random_task_selection_seed),
23 #endif
24       observer_(observer) {
25 }
26 
27 WorkQueueSets::~WorkQueueSets() = default;
28 
AddQueue(WorkQueue * work_queue,size_t set_index)29 void WorkQueueSets::AddQueue(WorkQueue* work_queue, size_t set_index) {
30   DCHECK(!work_queue->work_queue_sets());
31   DCHECK_LT(set_index, work_queue_heaps_.size());
32   DCHECK(!work_queue->heap_handle().IsValid());
33   absl::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
34   work_queue->AssignToWorkQueueSets(this);
35   work_queue->AssignSetIndex(set_index);
36   if (!key)
37     return;
38   bool was_empty = work_queue_heaps_[set_index].empty();
39   work_queue_heaps_[set_index].insert({*key, work_queue});
40   if (was_empty)
41     observer_->WorkQueueSetBecameNonEmpty(set_index);
42 }
43 
RemoveQueue(WorkQueue * work_queue)44 void WorkQueueSets::RemoveQueue(WorkQueue* work_queue) {
45   DCHECK_EQ(this, work_queue->work_queue_sets());
46   work_queue->AssignToWorkQueueSets(nullptr);
47   if (!work_queue->heap_handle().IsValid())
48     return;
49   size_t set_index = work_queue->work_queue_set_index();
50   DCHECK_LT(set_index, work_queue_heaps_.size());
51   work_queue_heaps_[set_index].erase(work_queue->heap_handle());
52   if (work_queue_heaps_[set_index].empty())
53     observer_->WorkQueueSetBecameEmpty(set_index);
54   DCHECK(!work_queue->heap_handle().IsValid());
55 }
56 
ChangeSetIndex(WorkQueue * work_queue,size_t set_index)57 void WorkQueueSets::ChangeSetIndex(WorkQueue* work_queue, size_t set_index) {
58   DCHECK_EQ(this, work_queue->work_queue_sets());
59   DCHECK_LT(set_index, work_queue_heaps_.size());
60   absl::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
61   size_t old_set = work_queue->work_queue_set_index();
62   DCHECK_LT(old_set, work_queue_heaps_.size());
63   DCHECK_NE(old_set, set_index);
64   work_queue->AssignSetIndex(set_index);
65   DCHECK_EQ(key.has_value(), work_queue->heap_handle().IsValid());
66   if (!key)
67     return;
68   work_queue_heaps_[old_set].erase(work_queue->heap_handle());
69   bool was_empty = work_queue_heaps_[set_index].empty();
70   work_queue_heaps_[set_index].insert({*key, work_queue});
71   if (work_queue_heaps_[old_set].empty())
72     observer_->WorkQueueSetBecameEmpty(old_set);
73   if (was_empty)
74     observer_->WorkQueueSetBecameNonEmpty(set_index);
75 }
76 
OnQueuesFrontTaskChanged(WorkQueue * work_queue)77 void WorkQueueSets::OnQueuesFrontTaskChanged(WorkQueue* work_queue) {
78   size_t set_index = work_queue->work_queue_set_index();
79   DCHECK_EQ(this, work_queue->work_queue_sets());
80   DCHECK_LT(set_index, work_queue_heaps_.size());
81   DCHECK(work_queue->heap_handle().IsValid());
82   DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
83   if (auto key = work_queue->GetFrontTaskOrder()) {
84     // O(log n)
85     work_queue_heaps_[set_index].Replace(work_queue->heap_handle(),
86                                          {*key, work_queue});
87   } else {
88     // O(log n)
89     work_queue_heaps_[set_index].erase(work_queue->heap_handle());
90     DCHECK(!work_queue->heap_handle().IsValid());
91     if (work_queue_heaps_[set_index].empty())
92       observer_->WorkQueueSetBecameEmpty(set_index);
93   }
94 }
95 
OnTaskPushedToEmptyQueue(WorkQueue * work_queue)96 void WorkQueueSets::OnTaskPushedToEmptyQueue(WorkQueue* work_queue) {
97   // NOTE if this function changes, we need to keep |WorkQueueSets::AddQueue| in
98   // sync.
99   DCHECK_EQ(this, work_queue->work_queue_sets());
100   absl::optional<TaskOrder> key = work_queue->GetFrontTaskOrder();
101   DCHECK(key);
102   size_t set_index = work_queue->work_queue_set_index();
103   DCHECK_LT(set_index, work_queue_heaps_.size())
104       << " set_index = " << set_index;
105   // |work_queue| should not be in work_queue_heaps_[set_index].
106   DCHECK(!work_queue->heap_handle().IsValid());
107   bool was_empty = work_queue_heaps_[set_index].empty();
108   work_queue_heaps_[set_index].insert({*key, work_queue});
109   if (was_empty)
110     observer_->WorkQueueSetBecameNonEmpty(set_index);
111 }
112 
OnPopMinQueueInSet(WorkQueue * work_queue)113 void WorkQueueSets::OnPopMinQueueInSet(WorkQueue* work_queue) {
114   // Assume that `work_queue` contains the lowest `TaskOrder`.
115   size_t set_index = work_queue->work_queue_set_index();
116   DCHECK_EQ(this, work_queue->work_queue_sets());
117   DCHECK_LT(set_index, work_queue_heaps_.size());
118   DCHECK(!work_queue_heaps_[set_index].empty()) << " set_index = " << set_index;
119   DCHECK_EQ(work_queue_heaps_[set_index].top().value, work_queue)
120       << " set_index = " << set_index;
121   DCHECK(work_queue->heap_handle().IsValid());
122   if (auto key = work_queue->GetFrontTaskOrder()) {
123     // O(log n)
124     work_queue_heaps_[set_index].ReplaceTop({*key, work_queue});
125   } else {
126     // O(log n)
127     work_queue_heaps_[set_index].pop();
128     DCHECK(!work_queue->heap_handle().IsValid());
129     DCHECK(work_queue_heaps_[set_index].empty() ||
130            work_queue_heaps_[set_index].top().value != work_queue);
131     if (work_queue_heaps_[set_index].empty()) {
132       observer_->WorkQueueSetBecameEmpty(set_index);
133     }
134   }
135 }
136 
OnQueueBlocked(WorkQueue * work_queue)137 void WorkQueueSets::OnQueueBlocked(WorkQueue* work_queue) {
138   DCHECK_EQ(this, work_queue->work_queue_sets());
139   HeapHandle heap_handle = work_queue->heap_handle();
140   if (!heap_handle.IsValid())
141     return;
142   size_t set_index = work_queue->work_queue_set_index();
143   DCHECK_LT(set_index, work_queue_heaps_.size());
144   work_queue_heaps_[set_index].erase(heap_handle);
145   if (work_queue_heaps_[set_index].empty())
146     observer_->WorkQueueSetBecameEmpty(set_index);
147 }
148 
149 absl::optional<WorkQueueAndTaskOrder>
GetOldestQueueAndTaskOrderInSet(size_t set_index) const150 WorkQueueSets::GetOldestQueueAndTaskOrderInSet(size_t set_index) const {
151   DCHECK_LT(set_index, work_queue_heaps_.size());
152   if (work_queue_heaps_[set_index].empty())
153     return absl::nullopt;
154   const OldestTaskOrder& oldest = work_queue_heaps_[set_index].top();
155   DCHECK(oldest.value->heap_handle().IsValid());
156 #if DCHECK_IS_ON()
157   absl::optional<TaskOrder> order = oldest.value->GetFrontTaskOrder();
158   DCHECK(order && oldest.key == *order);
159 #endif
160   return WorkQueueAndTaskOrder(*oldest.value, oldest.key);
161 }
162 
163 #if DCHECK_IS_ON()
164 absl::optional<WorkQueueAndTaskOrder>
GetRandomQueueAndTaskOrderInSet(size_t set_index) const165 WorkQueueSets::GetRandomQueueAndTaskOrderInSet(size_t set_index) const {
166   DCHECK_LT(set_index, work_queue_heaps_.size());
167   if (work_queue_heaps_[set_index].empty())
168     return absl::nullopt;
169   const OldestTaskOrder& chosen =
170       work_queue_heaps_[set_index].begin()[static_cast<long>(
171           Random() % work_queue_heaps_[set_index].size())];
172 #if DCHECK_IS_ON()
173   absl::optional<TaskOrder> key = chosen.value->GetFrontTaskOrder();
174   DCHECK(key && chosen.key == *key);
175 #endif
176   return WorkQueueAndTaskOrder(*chosen.value, chosen.key);
177 }
178 #endif
179 
IsSetEmpty(size_t set_index) const180 bool WorkQueueSets::IsSetEmpty(size_t set_index) const {
181   DCHECK_LT(set_index, work_queue_heaps_.size())
182       << " set_index = " << set_index;
183   return work_queue_heaps_[set_index].empty();
184 }
185 
186 #if DCHECK_IS_ON() || !defined(NDEBUG)
ContainsWorkQueueForTest(const WorkQueue * work_queue) const187 bool WorkQueueSets::ContainsWorkQueueForTest(
188     const WorkQueue* work_queue) const {
189   absl::optional<TaskOrder> task_order = work_queue->GetFrontTaskOrder();
190 
191   for (const auto& heap : work_queue_heaps_) {
192     for (const OldestTaskOrder& heap_value_pair : heap) {
193       if (heap_value_pair.value == work_queue) {
194         DCHECK(task_order);
195         DCHECK(heap_value_pair.key == *task_order);
196         DCHECK_EQ(this, work_queue->work_queue_sets());
197         return true;
198       }
199     }
200   }
201 
202   if (work_queue->work_queue_sets() == this) {
203     DCHECK(!task_order);
204     return true;
205   }
206 
207   return false;
208 }
209 #endif
210 
CollectSkippedOverLowerPriorityTasks(const internal::WorkQueue * selected_work_queue,std::vector<const Task * > * result) const211 void WorkQueueSets::CollectSkippedOverLowerPriorityTasks(
212     const internal::WorkQueue* selected_work_queue,
213     std::vector<const Task*>* result) const {
214   absl::optional<TaskOrder> task_order =
215       selected_work_queue->GetFrontTaskOrder();
216   CHECK(task_order);
217   for (size_t priority = selected_work_queue->work_queue_set_index() + 1;
218        priority < work_queue_heaps_.size(); priority++) {
219     for (const OldestTaskOrder& pair : work_queue_heaps_[priority]) {
220       pair.value->CollectTasksOlderThan(*task_order, result);
221     }
222   }
223 }
224 
225 }  // namespace internal
226 }  // namespace sequence_manager
227 }  // namespace base
228