1 // Copyright 2013 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_ 6 #define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_ 7 8 #include "base/base_export.h" 9 #include "base/callback.h" 10 #include "base/macros.h" 11 #include "base/memory/ref_counted.h" 12 #include "base/pending_task.h" 13 #include "base/sequence_checker.h" 14 #include "base/synchronization/lock.h" 15 #include "base/time/time.h" 16 17 namespace base { 18 19 class BasicPostTaskPerfTest; 20 21 namespace internal { 22 23 // Implements a queue of tasks posted to the message loop running on the current 24 // thread. This class takes care of synchronizing posting tasks from different 25 // threads and together with MessageLoop ensures clean shutdown. 26 class BASE_EXPORT IncomingTaskQueue 27 : public RefCountedThreadSafe<IncomingTaskQueue> { 28 public: 29 // TODO(gab): Move this to SequencedTaskSource::Observer in 30 // https://chromium-review.googlesource.com/c/chromium/src/+/1088762. 31 class Observer { 32 public: 33 virtual ~Observer() = default; 34 35 // Notifies this Observer that it is about to enqueue |task|. The Observer 36 // may alter |task| as a result (e.g. add metadata to the PendingTask 37 // struct). This may be called while holding a lock and shouldn't perform 38 // logic requiring synchronization (override DidQueueTask() for that). 39 virtual void WillQueueTask(PendingTask* task) = 0; 40 41 // Notifies this Observer that a task was queued in the IncomingTaskQueue it 42 // observes. |was_empty| is true if the task source was empty (i.e. 43 // |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked 44 // from any thread. 45 virtual void DidQueueTask(bool was_empty) = 0; 46 }; 47 48 // Provides a read and remove only view into a task queue. 49 class ReadAndRemoveOnlyQueue { 50 public: 51 ReadAndRemoveOnlyQueue() = default; 52 virtual ~ReadAndRemoveOnlyQueue() = default; 53 54 // Returns the next task. HasTasks() is assumed to be true. 55 virtual const PendingTask& Peek() = 0; 56 57 // Removes and returns the next task. HasTasks() is assumed to be true. 58 virtual PendingTask Pop() = 0; 59 60 // Whether this queue has tasks. 61 virtual bool HasTasks() = 0; 62 63 // Removes all tasks. 64 virtual void Clear() = 0; 65 66 private: 67 DISALLOW_COPY_AND_ASSIGN(ReadAndRemoveOnlyQueue); 68 }; 69 70 // Provides a read-write task queue. 71 class Queue : public ReadAndRemoveOnlyQueue { 72 public: 73 Queue() = default; 74 ~Queue() override = default; 75 76 // Adds the task to the end of the queue. 77 virtual void Push(PendingTask pending_task) = 0; 78 79 private: 80 DISALLOW_COPY_AND_ASSIGN(Queue); 81 }; 82 83 // Constructs an IncomingTaskQueue which will invoke |task_queue_observer| 84 // when tasks are queued. |task_queue_observer| will be bound to this 85 // IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw 86 // pointer since IncomingTaskQueue is ref-counted. For the same reasons, 87 // |task_queue_observer| needs to support being invoked racily during 88 // shutdown). 89 explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer); 90 91 // Appends a task to the incoming queue. Posting of all tasks is routed though 92 // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting 93 // task is properly synchronized between different threads. 94 // 95 // Returns true if the task was successfully added to the queue, otherwise 96 // returns false. In all cases, the ownership of |task| is transferred to the 97 // called method. 98 bool AddToIncomingQueue(const Location& from_here, 99 OnceClosure task, 100 TimeDelta delay, 101 Nestable nestable); 102 103 // Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be 104 // undone. Note that the registered IncomingTaskQueue::Observer may still 105 // racily receive a few DidQueueTask() calls while the Shutdown() signal 106 // propagates to other threads and it needs to support that. 107 void Shutdown(); 108 triage_tasks()109 ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; } 110 delayed_tasks()111 Queue& delayed_tasks() { return delayed_tasks_; } 112 deferred_tasks()113 Queue& deferred_tasks() { return deferred_tasks_; } 114 HasPendingHighResolutionTasks()115 bool HasPendingHighResolutionTasks() const { 116 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); 117 return delayed_tasks_.HasPendingHighResolutionTasks(); 118 } 119 120 // Reports UMA metrics about its queues before the MessageLoop goes to sleep 121 // per being idle. 122 void ReportMetricsOnIdle() const; 123 124 private: 125 friend class base::BasicPostTaskPerfTest; 126 friend class RefCountedThreadSafe<IncomingTaskQueue>; 127 128 // These queues below support the previous MessageLoop behavior of 129 // maintaining three queue queues to process tasks: 130 // 131 // TriageQueue 132 // The first queue to receive all tasks for the processing sequence (when 133 // reloading from the thread-safe |incoming_queue_|). Tasks are generally 134 // either dispatched immediately or sent to the queues below. 135 // 136 // DelayedQueue 137 // The queue for holding tasks that should be run later and sorted by expected 138 // run time. 139 // 140 // DeferredQueue 141 // The queue for holding tasks that couldn't be run while the MessageLoop was 142 // nested. These are generally processed during the idle stage. 143 // 144 // Many of these do not share implementations even though they look like they 145 // could because of small quirks (reloading semantics) or differing underlying 146 // data strucutre (TaskQueue vs DelayedTaskQueue). 147 148 // The starting point for all tasks on the sequence processing the tasks. 149 class TriageQueue : public ReadAndRemoveOnlyQueue { 150 public: 151 TriageQueue(IncomingTaskQueue* outer); 152 ~TriageQueue() override; 153 154 // ReadAndRemoveOnlyQueue: 155 // The methods below will attempt to reload from the incoming queue if the 156 // queue itself is empty (Clear() has special logic to reload only once 157 // should destructors post more tasks). 158 const PendingTask& Peek() override; 159 PendingTask Pop() override; 160 // Whether this queue has tasks after reloading from the incoming queue. 161 bool HasTasks() override; 162 void Clear() override; 163 164 private: 165 void ReloadFromIncomingQueueIfEmpty(); 166 167 IncomingTaskQueue* const outer_; 168 TaskQueue queue_; 169 170 DISALLOW_COPY_AND_ASSIGN(TriageQueue); 171 }; 172 173 class DelayedQueue : public Queue { 174 public: 175 DelayedQueue(); 176 ~DelayedQueue() override; 177 178 // Queue: 179 const PendingTask& Peek() override; 180 PendingTask Pop() override; 181 // Whether this queue has tasks after sweeping the cancelled ones in front. 182 bool HasTasks() override; 183 void Clear() override; 184 void Push(PendingTask pending_task) override; 185 186 size_t Size() const; HasPendingHighResolutionTasks()187 bool HasPendingHighResolutionTasks() const { 188 return pending_high_res_tasks_ > 0; 189 } 190 191 private: 192 DelayedTaskQueue queue_; 193 194 // Number of high resolution tasks in |queue_|. 195 int pending_high_res_tasks_ = 0; 196 197 SEQUENCE_CHECKER(sequence_checker_); 198 199 DISALLOW_COPY_AND_ASSIGN(DelayedQueue); 200 }; 201 202 class DeferredQueue : public Queue { 203 public: 204 DeferredQueue(); 205 ~DeferredQueue() override; 206 207 // Queue: 208 const PendingTask& Peek() override; 209 PendingTask Pop() override; 210 bool HasTasks() override; 211 void Clear() override; 212 void Push(PendingTask pending_task) override; 213 214 private: 215 TaskQueue queue_; 216 217 SEQUENCE_CHECKER(sequence_checker_); 218 219 DISALLOW_COPY_AND_ASSIGN(DeferredQueue); 220 }; 221 222 virtual ~IncomingTaskQueue(); 223 224 // Adds a task to |incoming_queue_|. The caller retains ownership of 225 // |pending_task|, but this function will reset the value of 226 // |pending_task->task|. This is needed to ensure that the posting call stack 227 // does not retain |pending_task->task| beyond this function call. 228 bool PostPendingTask(PendingTask* pending_task); 229 230 // Does the real work of posting a pending task. Returns true if 231 // |incoming_queue_| was empty before |pending_task| was posted. 232 bool PostPendingTaskLockRequired(PendingTask* pending_task); 233 234 // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called 235 // from the sequence processing the tasks. 236 void ReloadWorkQueue(TaskQueue* work_queue); 237 238 // Checks calls made only on the MessageLoop thread. 239 SEQUENCE_CHECKER(sequence_checker_); 240 241 const std::unique_ptr<Observer> task_queue_observer_; 242 243 // Queue for initial triaging of tasks on the |sequence_checker_| sequence. 244 TriageQueue triage_tasks_; 245 246 // Queue for delayed tasks on the |sequence_checker_| sequence. 247 DelayedQueue delayed_tasks_; 248 249 // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence. 250 DeferredQueue deferred_tasks_; 251 252 // Synchronizes access to all members below this line. 253 base::Lock incoming_queue_lock_; 254 255 // An incoming queue of tasks that are acquired under a mutex for processing 256 // on this instance's thread. These tasks have not yet been been pushed to 257 // |triage_tasks_|. 258 TaskQueue incoming_queue_; 259 260 // True if new tasks should be accepted. 261 bool accept_new_tasks_ = true; 262 263 // The next sequence number to use for delayed tasks. 264 int next_sequence_num_ = 0; 265 266 // True if the outgoing queue (|triage_tasks_|) is empty. Toggled under 267 // |incoming_queue_lock_| in ReloadWorkQueue() so that 268 // PostPendingTaskLockRequired() can tell, without accessing the thread unsafe 269 // |triage_tasks_|, if the IncomingTaskQueue has been made non-empty by a 270 // PostTask() (and needs to inform its Observer). 271 bool triage_queue_empty_ = true; 272 273 DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue); 274 }; 275 276 } // namespace internal 277 } // namespace base 278 279 #endif // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_ 280