• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_SELECTOR_H_
6 #define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_SELECTOR_H_
7 
8 #include <stddef.h>
9 
10 #include <atomic>
11 #include <vector>
12 
13 #include "base/base_export.h"
14 #include "base/dcheck_is_on.h"
15 #include "base/memory/raw_ptr.h"
16 #include "base/pending_task.h"
17 #include "base/task/sequence_manager/sequence_manager.h"
18 #include "base/task/sequence_manager/sequenced_task_source.h"
19 #include "base/task/sequence_manager/task_order.h"
20 #include "base/task/sequence_manager/work_queue_sets.h"
21 #include "base/values.h"
22 #include "third_party/abseil-cpp/absl/types/optional.h"
23 
24 namespace base {
25 namespace sequence_manager {
26 namespace internal {
27 
28 class AssociatedThreadId;
29 
30 // TaskQueueSelector is used by the SchedulerHelper to enable prioritization
31 // of particular task queues.
32 class BASE_EXPORT TaskQueueSelector : public WorkQueueSets::Observer {
33  public:
34   using SelectTaskOption = SequencedTaskSource::SelectTaskOption;
35 
36   TaskQueueSelector(scoped_refptr<const AssociatedThreadId> associated_thread,
37                     const SequenceManager::Settings& settings);
38 
39   TaskQueueSelector(const TaskQueueSelector&) = delete;
40   TaskQueueSelector& operator=(const TaskQueueSelector&) = delete;
41   ~TaskQueueSelector() override;
42 
43   static void InitializeFeatures();
44 
45   // Called to register a queue that can be selected. This function is called
46   // on the main thread.
47   void AddQueue(internal::TaskQueueImpl* queue,
48                 TaskQueue::QueuePriority priority);
49 
50   // The specified work will no longer be considered for selection. This
51   // function is called on the main thread.
52   void RemoveQueue(internal::TaskQueueImpl* queue);
53 
54   // Make |queue| eligible for selection. This function is called on the main
55   // thread. Must only be called if |queue| is disabled.
56   void EnableQueue(internal::TaskQueueImpl* queue);
57 
58   // Disable selection from |queue|. Must only be called if |queue| is enabled.
59   void DisableQueue(internal::TaskQueueImpl* queue);
60 
61   // Called get or set the priority of |queue|.
62   void SetQueuePriority(internal::TaskQueueImpl* queue,
63                         TaskQueue::QueuePriority priority);
64 
65   // Called to choose the work queue from which the next task should be taken
66   // and run. Return the queue to service if there is one or null otherwise.
67   // This function is called on the main thread.
68   WorkQueue* SelectWorkQueueToService(
69       SelectTaskOption option = SelectTaskOption::kDefault);
70 
71   // Serialize the selector state for tracing/debugging.
72   Value::Dict AsValue() const;
73 
74   class BASE_EXPORT Observer {
75    public:
76     virtual ~Observer() = default;
77 
78     // Called when |queue| transitions from disabled to enabled.
79     virtual void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) = 0;
80   };
81 
82   // Called once to set the Observer. This function is called
83   // on the main thread. If |observer| is null, then no callbacks will occur.
84   void SetTaskQueueSelectorObserver(Observer* observer);
85 
86   // Returns the priority of the most important pending task if one exists.
87   // O(1).
88   absl::optional<TaskQueue::QueuePriority> GetHighestPendingPriority(
89       SelectTaskOption option = SelectTaskOption::kDefault) const;
90 
91   // WorkQueueSets::Observer implementation:
92   void WorkQueueSetBecameEmpty(size_t set_index) override;
93   void WorkQueueSetBecameNonEmpty(size_t set_index) override;
94 
95   // Populates |result| with tasks with lower priority than the first task from
96   // |selected_work_queue| which could otherwise run now.
97   void CollectSkippedOverLowerPriorityTasks(
98       const internal::WorkQueue* selected_work_queue,
99       std::vector<const Task*>* result) const;
100 
101  protected:
delayed_work_queue_sets()102   WorkQueueSets* delayed_work_queue_sets() { return &delayed_work_queue_sets_; }
103 
immediate_work_queue_sets()104   WorkQueueSets* immediate_work_queue_sets() {
105     return &immediate_work_queue_sets_;
106   }
107 
108   // This method will force select an immediate task if those are being
109   // starved by delayed tasks.
110   void SetImmediateStarvationCountForTest(int immediate_starvation_count);
111 
112   // Maximum number of delayed tasks tasks which can be run while there's a
113   // waiting non-delayed task.
114   static const int kDefaultMaxDelayedStarvationTasks = 3;
115 
116   // Tracks which priorities are currently active, meaning there are pending
117   // runnable tasks with that priority. Because there are only a handful of
118   // priorities, and because we always run tasks in order from highest to lowest
119   // priority, we can use a single integer to represent enabled priorities,
120   // using a bit per priority.
121   class BASE_EXPORT ActivePriorityTracker {
122    public:
123     ActivePriorityTracker();
124 
HasActivePriority()125     bool HasActivePriority() const { return active_priorities_ != 0; }
126 
IsActive(TaskQueue::QueuePriority priority)127     bool IsActive(TaskQueue::QueuePriority priority) const {
128       return active_priorities_ & (size_t{1} << static_cast<size_t>(priority));
129     }
130 
131     void SetActive(TaskQueue::QueuePriority priority, bool is_active);
132 
133     TaskQueue::QueuePriority HighestActivePriority() const;
134 
135    private:
136     static_assert(SequenceManager::PrioritySettings::kMaxPriorities <
137                       sizeof(size_t) * 8,
138                   "The number of priorities must be strictly less than the "
139                   "number of bits of |active_priorities_|!");
140     size_t active_priorities_ = 0;
141   };
142 
143   /*
144    * SetOperation is used to configure ChooseWithPriority() and must have:
145    *
146    * static absl::optional<WorkQueueAndTaskOrder>
147    * GetWithPriority(const WorkQueueSets& sets,
148    *                 TaskQueue::QueuePriority priority);
149    */
150 
151   // The default
152   struct SetOperationOldest {
GetWithPrioritySetOperationOldest153     static absl::optional<WorkQueueAndTaskOrder> GetWithPriority(
154         const WorkQueueSets& sets,
155         TaskQueue::QueuePriority priority) {
156       return sets.GetOldestQueueAndTaskOrderInSet(priority);
157     }
158   };
159 
160 #if DCHECK_IS_ON()
161   struct SetOperationRandom {
GetWithPrioritySetOperationRandom162     static absl::optional<WorkQueueAndTaskOrder> GetWithPriority(
163         const WorkQueueSets& sets,
164         TaskQueue::QueuePriority priority) {
165       return sets.GetRandomQueueAndTaskOrderInSet(priority);
166     }
167   };
168 #endif  // DCHECK_IS_ON()
169 
170   template <typename SetOperation>
ChooseWithPriority(TaskQueue::QueuePriority priority)171   WorkQueue* ChooseWithPriority(TaskQueue::QueuePriority priority) const {
172     // Select an immediate work queue if we are starving immediate tasks.
173     if (immediate_starvation_count_ >= g_max_delayed_starvation_tasks) {
174       WorkQueue* queue =
175           ChooseImmediateOnlyWithPriority<SetOperation>(priority);
176       if (queue)
177         return queue;
178       return ChooseDelayedOnlyWithPriority<SetOperation>(priority);
179     }
180     return ChooseImmediateOrDelayedTaskWithPriority<SetOperation>(priority);
181   }
182 
183   template <typename SetOperation>
ChooseImmediateOnlyWithPriority(TaskQueue::QueuePriority priority)184   WorkQueue* ChooseImmediateOnlyWithPriority(
185       TaskQueue::QueuePriority priority) const {
186     if (auto queue_and_order = SetOperation::GetWithPriority(
187             immediate_work_queue_sets_, priority)) {
188       return queue_and_order->queue;
189     }
190     return nullptr;
191   }
192 
193   template <typename SetOperation>
ChooseDelayedOnlyWithPriority(TaskQueue::QueuePriority priority)194   WorkQueue* ChooseDelayedOnlyWithPriority(
195       TaskQueue::QueuePriority priority) const {
196     if (auto queue_and_order =
197             SetOperation::GetWithPriority(delayed_work_queue_sets_, priority)) {
198       return queue_and_order->queue;
199     }
200     return nullptr;
201   }
202 
203  private:
priority_count()204   size_t priority_count() const { return non_empty_set_counts_.size(); }
205 
206   void ChangeSetIndex(internal::TaskQueueImpl* queue,
207                       TaskQueue::QueuePriority priority);
208   void AddQueueImpl(internal::TaskQueueImpl* queue,
209                     TaskQueue::QueuePriority priority);
210   void RemoveQueueImpl(internal::TaskQueueImpl* queue);
211 
212 #if DCHECK_IS_ON() || !defined(NDEBUG)
213   bool CheckContainsQueueForTest(const internal::TaskQueueImpl* queue) const;
214 #endif
215 
216   template <typename SetOperation>
ChooseImmediateOrDelayedTaskWithPriority(TaskQueue::QueuePriority priority)217   WorkQueue* ChooseImmediateOrDelayedTaskWithPriority(
218       TaskQueue::QueuePriority priority) const {
219     if (auto immediate_queue_and_order = SetOperation::GetWithPriority(
220             immediate_work_queue_sets_, priority)) {
221       if (auto delayed_queue_and_order = SetOperation::GetWithPriority(
222               delayed_work_queue_sets_, priority)) {
223         return immediate_queue_and_order->order < delayed_queue_and_order->order
224                    ? immediate_queue_and_order->queue
225                    : delayed_queue_and_order->queue;
226       }
227       return immediate_queue_and_order->queue;
228     }
229     return ChooseDelayedOnlyWithPriority<SetOperation>(priority);
230   }
231 
232   // Returns true if there are pending tasks with priority |priority|.
233   bool HasTasksWithPriority(TaskQueue::QueuePriority priority) const;
234 
235   const scoped_refptr<const AssociatedThreadId> associated_thread_;
236 
237 #if DCHECK_IS_ON()
238   const bool random_task_selection_ = false;
239 #endif
240 
241   // Count of the number of sets (delayed or immediate) for each priority.
242   // Should only contain 0, 1 or 2.
243   std::vector<int> non_empty_set_counts_;
244 
245   static constexpr const int kMaxNonEmptySetCount = 2;
246   // An atomic is used here because InitializeFeatures() can race with
247   // SequenceManager reading this.
248   static std::atomic_int g_max_delayed_starvation_tasks;
249 
250   // List of active priorities, which is used to work out which priority to run
251   // next.
252   ActivePriorityTracker active_priority_tracker_;
253 
254   WorkQueueSets delayed_work_queue_sets_;
255   WorkQueueSets immediate_work_queue_sets_;
256   int immediate_starvation_count_ = 0;
257 
258   raw_ptr<Observer> task_queue_selector_observer_ = nullptr;  // Not owned.
259 };
260 
261 }  // namespace internal
262 }  // namespace sequence_manager
263 }  // namespace base
264 
265 #endif  // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_SELECTOR_H_
266