1 // Copyright 2018 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_ 6 #define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_ 7 8 #include <stddef.h> 9 10 #include <memory> 11 #include <set> 12 13 #include "base/callback.h" 14 #include "base/containers/circular_deque.h" 15 #include "base/macros.h" 16 #include "base/memory/weak_ptr.h" 17 #include "base/message_loop/message_loop.h" 18 #include "base/pending_task.h" 19 #include "base/task/sequence_manager/enqueue_order.h" 20 #include "base/task/sequence_manager/intrusive_heap.h" 21 #include "base/task/sequence_manager/lazily_deallocated_deque.h" 22 #include "base/task/sequence_manager/sequenced_task_source.h" 23 #include "base/task/sequence_manager/task_queue.h" 24 #include "base/threading/thread_checker.h" 25 #include "base/trace_event/trace_event.h" 26 #include "base/trace_event/trace_event_argument.h" 27 28 namespace base { 29 namespace sequence_manager { 30 31 class LazyNow; 32 class TimeDomain; 33 34 namespace internal { 35 36 class SequenceManagerImpl; 37 class WorkQueue; 38 class WorkQueueSets; 39 40 struct IncomingImmediateWorkList { 41 IncomingImmediateWorkList* next = nullptr; 42 TaskQueueImpl* queue = nullptr; 43 internal::EnqueueOrder order; 44 }; 45 46 // TaskQueueImpl has four main queues: 47 // 48 // Immediate (non-delayed) tasks: 49 // |immediate_incoming_queue| - PostTask enqueues tasks here. 50 // |immediate_work_queue| - SequenceManager takes immediate tasks here. 51 // 52 // Delayed tasks 53 // |delayed_incoming_queue| - PostDelayedTask enqueues tasks here. 54 // |delayed_work_queue| - SequenceManager takes delayed tasks here. 55 // 56 // The |immediate_incoming_queue| can be accessed from any thread, the other 57 // queues are main-thread only. To reduce the overhead of locking, 58 // |immediate_work_queue| is swapped with |immediate_incoming_queue| when 59 // |immediate_work_queue| becomes empty. 60 // 61 // Delayed tasks are initially posted to |delayed_incoming_queue| and a wake-up 62 // is scheduled with the TimeDomain. When the delay has elapsed, the TimeDomain 63 // calls UpdateDelayedWorkQueue and ready delayed tasks are moved into the 64 // |delayed_work_queue|. Note the EnqueueOrder (used for ordering) for a delayed 65 // task is not set until it's moved into the |delayed_work_queue|. 66 // 67 // TaskQueueImpl uses the WorkQueueSets and the TaskQueueSelector to implement 68 // prioritization. Task selection is done by the TaskQueueSelector and when a 69 // queue is selected, it round-robins between the |immediate_work_queue| and 70 // |delayed_work_queue|. The reason for this is we want to make sure delayed 71 // tasks (normally the most common type) don't starve out immediate work. 72 class BASE_EXPORT TaskQueueImpl { 73 public: 74 TaskQueueImpl(SequenceManagerImpl* sequence_manager, 75 TimeDomain* time_domain, 76 const TaskQueue::Spec& spec); 77 78 ~TaskQueueImpl(); 79 80 // Represents a time at which a task wants to run. Tasks scheduled for the 81 // same point in time will be ordered by their sequence numbers. 82 struct DelayedWakeUp { 83 TimeTicks time; 84 int sequence_num; 85 86 bool operator!=(const DelayedWakeUp& other) const { 87 return time != other.time || other.sequence_num != sequence_num; 88 } 89 90 bool operator==(const DelayedWakeUp& other) const { 91 return !(*this != other); 92 } 93 94 bool operator<=(const DelayedWakeUp& other) const { 95 if (time == other.time) { 96 // Debug gcc builds can compare an element against itself. 97 DCHECK(sequence_num != other.sequence_num || this == &other); 98 // |PostedTask::sequence_num| is int and might wrap around to 99 // a negative number when casted from EnqueueOrder. 100 // This way of comparison handles that properly. 101 return (sequence_num - other.sequence_num) <= 0; 102 } 103 return time < other.time; 104 } 105 }; 106 107 class BASE_EXPORT Task : public TaskQueue::Task { 108 public: 109 Task(TaskQueue::PostedTask task, 110 TimeTicks desired_run_time, 111 EnqueueOrder sequence_number); 112 113 Task(TaskQueue::PostedTask task, 114 TimeTicks desired_run_time, 115 EnqueueOrder sequence_number, 116 EnqueueOrder enqueue_order); 117 delayed_wake_up()118 DelayedWakeUp delayed_wake_up() const { 119 // Since we use |sequence_num| in DelayedWakeUp for ordering purposes 120 // and integer overflow handling is type-sensitive it's worth to protect 121 // it from an unnoticed potential change in the PendingTask base class. 122 static_assert(std::is_same<decltype(sequence_num), int>::value, ""); 123 return DelayedWakeUp{delayed_run_time, sequence_num}; 124 } 125 enqueue_order()126 EnqueueOrder enqueue_order() const { 127 DCHECK(enqueue_order_); 128 return enqueue_order_; 129 } 130 set_enqueue_order(EnqueueOrder enqueue_order)131 void set_enqueue_order(EnqueueOrder enqueue_order) { 132 DCHECK(!enqueue_order_); 133 enqueue_order_ = enqueue_order; 134 } 135 enqueue_order_set()136 bool enqueue_order_set() const { return enqueue_order_; } 137 138 private: 139 // Similar to sequence number, but ultimately the |enqueue_order_| is what 140 // the scheduler uses for task ordering. For immediate tasks |enqueue_order| 141 // is set when posted, but for delayed tasks it's not defined until they are 142 // enqueued on the |delayed_work_queue_|. This is because otherwise delayed 143 // tasks could run before an immediate task posted after the delayed task. 144 EnqueueOrder enqueue_order_; 145 }; 146 147 // A result retuned by PostDelayedTask. When scheduler failed to post a task 148 // due to being shutdown a task is returned to be destroyed outside the lock. 149 struct PostTaskResult { 150 PostTaskResult(); 151 PostTaskResult(bool success, TaskQueue::PostedTask task); 152 PostTaskResult(PostTaskResult&& move_from); 153 PostTaskResult(const PostTaskResult& copy_from) = delete; 154 ~PostTaskResult(); 155 156 static PostTaskResult Success(); 157 static PostTaskResult Fail(TaskQueue::PostedTask task); 158 159 bool success; 160 TaskQueue::PostedTask task; 161 }; 162 163 // Types of queues TaskQueueImpl is maintaining internally. 164 enum class WorkQueueType { kImmediate, kDelayed }; 165 166 // Non-nestable tasks may get deferred but such queue is being maintained on 167 // SequenceManager side, so we need to keep information how to requeue it. 168 struct DeferredNonNestableTask { 169 internal::TaskQueueImpl::Task task; 170 internal::TaskQueueImpl* task_queue; 171 WorkQueueType work_queue_type; 172 }; 173 174 using OnNextWakeUpChangedCallback = RepeatingCallback<void(TimeTicks)>; 175 using OnTaskStartedHandler = 176 RepeatingCallback<void(const TaskQueue::Task&, 177 const TaskQueue::TaskTiming&)>; 178 using OnTaskCompletedHandler = 179 RepeatingCallback<void(const TaskQueue::Task&, 180 const TaskQueue::TaskTiming&)>; 181 182 // TaskQueue implementation. 183 const char* GetName() const; 184 bool RunsTasksInCurrentSequence() const; 185 PostTaskResult PostDelayedTask(TaskQueue::PostedTask task); 186 // Require a reference to enclosing task queue for lifetime control. 187 std::unique_ptr<TaskQueue::QueueEnabledVoter> CreateQueueEnabledVoter( 188 scoped_refptr<TaskQueue> owning_task_queue); 189 bool IsQueueEnabled() const; 190 bool IsEmpty() const; 191 size_t GetNumberOfPendingTasks() const; 192 bool HasTaskToRunImmediately() const; 193 Optional<TimeTicks> GetNextScheduledWakeUp(); 194 Optional<DelayedWakeUp> GetNextScheduledWakeUpImpl(); 195 void SetQueuePriority(TaskQueue::QueuePriority priority); 196 TaskQueue::QueuePriority GetQueuePriority() const; 197 void AddTaskObserver(MessageLoop::TaskObserver* task_observer); 198 void RemoveTaskObserver(MessageLoop::TaskObserver* task_observer); 199 void SetTimeDomain(TimeDomain* time_domain); 200 TimeDomain* GetTimeDomain() const; 201 void SetBlameContext(trace_event::BlameContext* blame_context); 202 void InsertFence(TaskQueue::InsertFencePosition position); 203 void InsertFenceAt(TimeTicks time); 204 void RemoveFence(); 205 bool HasActiveFence(); 206 bool BlockedByFence() const; 207 // Implementation of TaskQueue::SetObserver. 208 void SetOnNextWakeUpChangedCallback(OnNextWakeUpChangedCallback callback); 209 210 void UnregisterTaskQueue(); 211 212 // Returns true if a (potentially hypothetical) task with the specified 213 // |enqueue_order| could run on the queue. Must be called from the main 214 // thread. 215 bool CouldTaskRun(EnqueueOrder enqueue_order) const; 216 217 // Must only be called from the thread this task queue was created on. 218 void ReloadImmediateWorkQueueIfEmpty(); 219 220 void AsValueInto(TimeTicks now, trace_event::TracedValue* state) const; 221 GetQuiescenceMonitored()222 bool GetQuiescenceMonitored() const { return should_monitor_quiescence_; } GetShouldNotifyObservers()223 bool GetShouldNotifyObservers() const { return should_notify_observers_; } 224 225 void NotifyWillProcessTask(const PendingTask& pending_task); 226 void NotifyDidProcessTask(const PendingTask& pending_task); 227 228 // Check for available tasks in immediate work queues. 229 // Used to check if we need to generate notifications about delayed work. 230 bool HasPendingImmediateWork(); 231 delayed_work_queue()232 WorkQueue* delayed_work_queue() { 233 return main_thread_only().delayed_work_queue.get(); 234 } 235 delayed_work_queue()236 const WorkQueue* delayed_work_queue() const { 237 return main_thread_only().delayed_work_queue.get(); 238 } 239 immediate_work_queue()240 WorkQueue* immediate_work_queue() { 241 return main_thread_only().immediate_work_queue.get(); 242 } 243 immediate_work_queue()244 const WorkQueue* immediate_work_queue() const { 245 return main_thread_only().immediate_work_queue.get(); 246 } 247 248 // Protected by SequenceManagerImpl's AnyThread lock. immediate_work_list_storage()249 IncomingImmediateWorkList* immediate_work_list_storage() { 250 return &immediate_work_list_storage_; 251 } 252 253 // Enqueues any delayed tasks which should be run now on the 254 // |delayed_work_queue|. 255 // Must be called from the main thread. 256 void WakeUpForDelayedWork(LazyNow* lazy_now); 257 heap_handle()258 HeapHandle heap_handle() const { return main_thread_only().heap_handle; } 259 set_heap_handle(HeapHandle heap_handle)260 void set_heap_handle(HeapHandle heap_handle) { 261 main_thread_only().heap_handle = heap_handle; 262 } 263 264 // Pushes |task| onto the front of the specified work queue. Caution must be 265 // taken with this API because you could easily starve out other work. 266 // TODO(kraynov): Simplify non-nestable task logic https://crbug.com/845437. 267 void RequeueDeferredNonNestableTask(DeferredNonNestableTask task); 268 269 void PushImmediateIncomingTaskForTest(TaskQueueImpl::Task&& task); 270 271 class QueueEnabledVoterImpl : public TaskQueue::QueueEnabledVoter { 272 public: 273 explicit QueueEnabledVoterImpl(scoped_refptr<TaskQueue> task_queue); 274 ~QueueEnabledVoterImpl() override; 275 276 // QueueEnabledVoter implementation. 277 void SetQueueEnabled(bool enabled) override; 278 GetTaskQueueForTest()279 TaskQueueImpl* GetTaskQueueForTest() const { 280 return task_queue_->GetTaskQueueImpl(); 281 } 282 283 private: 284 friend class TaskQueueImpl; 285 286 scoped_refptr<TaskQueue> task_queue_; 287 bool enabled_; 288 }; 289 290 // Iterates over |delayed_incoming_queue| removing canceled tasks. 291 void SweepCanceledDelayedTasks(TimeTicks now); 292 293 // Allows wrapping TaskQueue to set a handler to subscribe for notifications 294 // about started and completed tasks. 295 void SetOnTaskStartedHandler(OnTaskStartedHandler handler); 296 void OnTaskStarted(const TaskQueue::Task& task, 297 const TaskQueue::TaskTiming& task_timing); 298 void SetOnTaskCompletedHandler(OnTaskCompletedHandler handler); 299 void OnTaskCompleted(const TaskQueue::Task& task, 300 const TaskQueue::TaskTiming& task_timing); 301 bool RequiresTaskTiming() const; 302 303 WeakPtr<SequenceManagerImpl> GetSequenceManagerWeakPtr(); 304 305 scoped_refptr<GracefulQueueShutdownHelper> GetGracefulQueueShutdownHelper(); 306 307 // Returns true if this queue is unregistered or task queue manager is deleted 308 // and this queue can be safely deleted on any thread. 309 bool IsUnregistered() const; 310 311 // Disables queue for testing purposes, when a QueueEnabledVoter can't be 312 // constructed due to not having TaskQueue. 313 void SetQueueEnabledForTest(bool enabled); 314 315 protected: 316 void SetDelayedWakeUpForTesting(Optional<DelayedWakeUp> wake_up); 317 318 private: 319 friend class WorkQueue; 320 friend class WorkQueueTest; 321 322 struct AnyThread { 323 AnyThread(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain); 324 ~AnyThread(); 325 326 // SequenceManagerImpl, TimeDomain and Observer are maintained in two 327 // copies: inside AnyThread and inside MainThreadOnly. They can be changed 328 // only from main thread, so it should be locked before accessing from other 329 // threads. 330 SequenceManagerImpl* sequence_manager; 331 TimeDomain* time_domain; 332 // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged. 333 OnNextWakeUpChangedCallback on_next_wake_up_changed_callback; 334 }; 335 336 struct MainThreadOnly { 337 MainThreadOnly(SequenceManagerImpl* sequence_manager, 338 TaskQueueImpl* task_queue, 339 TimeDomain* time_domain); 340 ~MainThreadOnly(); 341 342 // Another copy of SequenceManagerImpl, TimeDomain and Observer 343 // for lock-free access from the main thread. 344 // See description inside struct AnyThread for details. 345 SequenceManagerImpl* sequence_manager; 346 TimeDomain* time_domain; 347 // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged. 348 OnNextWakeUpChangedCallback on_next_wake_up_changed_callback; 349 350 std::unique_ptr<WorkQueue> delayed_work_queue; 351 std::unique_ptr<WorkQueue> immediate_work_queue; 352 std::priority_queue<TaskQueueImpl::Task> delayed_incoming_queue; 353 ObserverList<MessageLoop::TaskObserver> task_observers; 354 size_t set_index; 355 HeapHandle heap_handle; 356 int is_enabled_refcount; 357 int voter_refcount; 358 trace_event::BlameContext* blame_context; // Not owned. 359 EnqueueOrder current_fence; 360 Optional<TimeTicks> delayed_fence; 361 OnTaskStartedHandler on_task_started_handler; 362 OnTaskCompletedHandler on_task_completed_handler; 363 // Last reported wake up, used only in UpdateWakeUp to avoid 364 // excessive calls. 365 Optional<DelayedWakeUp> scheduled_wake_up; 366 // If false, queue will be disabled. Used only for tests. 367 bool is_enabled_for_test; 368 }; 369 370 PostTaskResult PostImmediateTaskImpl(TaskQueue::PostedTask task); 371 PostTaskResult PostDelayedTaskImpl(TaskQueue::PostedTask task); 372 373 // Push the task onto the |delayed_incoming_queue|. Lock-free main thread 374 // only fast path. 375 void PushOntoDelayedIncomingQueueFromMainThread(Task pending_task, 376 TimeTicks now); 377 378 // Push the task onto the |delayed_incoming_queue|. Slow path from other 379 // threads. 380 void PushOntoDelayedIncomingQueueLocked(Task pending_task); 381 382 void ScheduleDelayedWorkTask(Task pending_task); 383 384 void MoveReadyImmediateTasksToImmediateWorkQueueLocked(); 385 386 // Push the task onto the |immediate_incoming_queue| and for auto pumped 387 // queues it calls MaybePostDoWorkOnMainRunner if the Incoming queue was 388 // empty. 389 void PushOntoImmediateIncomingQueueLocked(Task task); 390 391 using TaskDeque = circular_deque<Task>; 392 393 // Extracts all the tasks from the immediate incoming queue and swaps it with 394 // |queue| which must be empty. 395 // Can be called from any thread. 396 void ReloadEmptyImmediateQueue(TaskDeque* queue); 397 398 void TraceQueueSize() const; 399 static void QueueAsValueInto(const TaskDeque& queue, 400 TimeTicks now, 401 trace_event::TracedValue* state); 402 static void QueueAsValueInto(const std::priority_queue<Task>& queue, 403 TimeTicks now, 404 trace_event::TracedValue* state); 405 static void TaskAsValueInto(const Task& task, 406 TimeTicks now, 407 trace_event::TracedValue* state); 408 409 void RemoveQueueEnabledVoter(const QueueEnabledVoterImpl* voter); 410 void OnQueueEnabledVoteChanged(bool enabled); 411 void EnableOrDisableWithSelector(bool enable); 412 413 // Schedules delayed work on time domain and calls the observer. 414 void UpdateDelayedWakeUp(LazyNow* lazy_now); 415 void UpdateDelayedWakeUpImpl(LazyNow* lazy_now, 416 Optional<DelayedWakeUp> wake_up); 417 418 // Activate a delayed fence if a time has come. 419 void ActivateDelayedFenceIfNeeded(TimeTicks now); 420 421 const char* name_; 422 423 const PlatformThreadId thread_id_; 424 425 mutable Lock any_thread_lock_; 426 AnyThread any_thread_; any_thread()427 struct AnyThread& any_thread() { 428 any_thread_lock_.AssertAcquired(); 429 return any_thread_; 430 } any_thread()431 const struct AnyThread& any_thread() const { 432 any_thread_lock_.AssertAcquired(); 433 return any_thread_; 434 } 435 436 ThreadChecker main_thread_checker_; 437 MainThreadOnly main_thread_only_; main_thread_only()438 MainThreadOnly& main_thread_only() { 439 DCHECK(main_thread_checker_.CalledOnValidThread()); 440 return main_thread_only_; 441 } main_thread_only()442 const MainThreadOnly& main_thread_only() const { 443 DCHECK(main_thread_checker_.CalledOnValidThread()); 444 return main_thread_only_; 445 } 446 447 mutable Lock immediate_incoming_queue_lock_; 448 TaskDeque immediate_incoming_queue_; immediate_incoming_queue()449 TaskDeque& immediate_incoming_queue() { 450 immediate_incoming_queue_lock_.AssertAcquired(); 451 return immediate_incoming_queue_; 452 } immediate_incoming_queue()453 const TaskDeque& immediate_incoming_queue() const { 454 immediate_incoming_queue_lock_.AssertAcquired(); 455 return immediate_incoming_queue_; 456 } 457 458 // Protected by SequenceManagerImpl's AnyThread lock. 459 IncomingImmediateWorkList immediate_work_list_storage_; 460 461 const bool should_monitor_quiescence_; 462 const bool should_notify_observers_; 463 464 DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl); 465 }; 466 467 } // namespace internal 468 } // namespace sequence_manager 469 } // namespace base 470 471 #endif // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_ 472