1 // Copyright 2018 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_H_ 6 #define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_H_ 7 8 #include <cstdint> 9 #include <memory> 10 #include <type_traits> 11 12 #include "base/base_export.h" 13 #include "base/check.h" 14 #include "base/memory/weak_ptr.h" 15 #include "base/task/common/checked_lock.h" 16 #include "base/task/common/lazy_now.h" 17 #include "base/task/sequence_manager/tasks.h" 18 #include "base/task/single_thread_task_runner.h" 19 #include "base/task/task_observer.h" 20 #include "base/threading/platform_thread.h" 21 #include "base/time/time.h" 22 #include "base/trace_event/base_tracing.h" 23 #include "base/trace_event/base_tracing_forward.h" 24 #include "third_party/abseil-cpp/absl/types/optional.h" 25 26 namespace perfetto { 27 class EventContext; 28 } 29 30 namespace base { 31 32 class TaskObserver; 33 34 namespace sequence_manager { 35 36 using QueueName = ::perfetto::protos::pbzero::SequenceManagerTask::QueueName; 37 38 namespace internal { 39 class AssociatedThreadId; 40 class SequenceManagerImpl; 41 class TaskQueueImpl; 42 } // namespace internal 43 44 // TODO(kraynov): Make TaskQueue to actually be an interface for TaskQueueImpl 45 // and stop using ref-counting because we're no longer tied to task runner 46 // lifecycle and there's no other need for ref-counting either. 47 // NOTE: When TaskQueue gets automatically deleted on zero ref-count, 48 // TaskQueueImpl gets gracefully shutdown. It means that it doesn't get 49 // unregistered immediately and might accept some last minute tasks until 50 // SequenceManager will unregister it at some point. It's done to ensure that 51 // task queue always gets unregistered on the main thread. 52 class BASE_EXPORT TaskQueue : public RefCountedThreadSafe<TaskQueue> { 53 public: 54 // Interface that lets a task queue be throttled by changing the wake up time 55 // and optionally, by inserting fences. A wake up in this context is a 56 // notification at a given time that lets this TaskQueue know of newly ripe 57 // delayed tasks if it's enabled. By delaying the desired wake up time to a 58 // different allowed wake up time, the Throttler can hold off delayed tasks 59 // that would otherwise by allowed to run sooner. 60 class BASE_EXPORT Throttler { 61 public: 62 // Invoked when the TaskQueue's next allowed wake up time is reached and is 63 // enabled, even if blocked by a fence. That wake up is defined by the last 64 // value returned from GetNextAllowedWakeUp(). 65 // This is always called on the thread this TaskQueue is associated with. 66 virtual void OnWakeUp(LazyNow* lazy_now) = 0; 67 68 // Invoked when the TaskQueue newly gets a pending immediate task and is 69 // enabled, even if blocked by a fence. Redundant calls are possible when 70 // the TaskQueue already had a pending immediate task. 71 // The implementation may use this to: 72 // - Restrict task execution by inserting/updating a fence. 73 // - Update the TaskQueue's next delayed wake up via UpdateWakeUp(). 74 // This allows the Throttler to perform additional operations later from 75 // OnWakeUp(). 76 // This is always called on the thread this TaskQueue is associated with. 77 virtual void OnHasImmediateTask() = 0; 78 79 // Invoked when the TaskQueue is enabled and wants to know when to schedule 80 // the next delayed wake-up (which happens at least every time this queue is 81 // about to cause the next wake up) provided |next_desired_wake_up|, the 82 // wake-up for the next pending delayed task in this queue (pending delayed 83 // tasks that are ripe may be ignored), or nullopt if there's no pending 84 // delayed task. |has_ready_task| indicates whether there are immediate 85 // tasks or ripe delayed tasks. The implementation should return the next 86 // allowed wake up, or nullopt if no future wake-up is necessary. 87 // This is always called on the thread this TaskQueue is associated with. 88 virtual absl::optional<WakeUp> GetNextAllowedWakeUp( 89 LazyNow* lazy_now, 90 absl::optional<WakeUp> next_desired_wake_up, 91 bool has_ready_task) = 0; 92 93 protected: 94 ~Throttler() = default; 95 }; 96 97 // Shuts down the queue. All tasks currently queued will be discarded. 98 virtual void ShutdownTaskQueue(); 99 100 // Queues with higher priority (smaller number) are selected to run before 101 // queues of lower priority. Note that there is no starvation protection, 102 // i.e., a constant stream of high priority work can mean that tasks in lower 103 // priority queues won't get to run. 104 using QueuePriority = uint8_t; 105 106 // By default there is only a single priority. Sequences making use of 107 // priorities should parameterize the `SequenceManager` with the appropriate 108 // `SequenceManager::PrioritySettings`. 109 enum class DefaultQueuePriority : QueuePriority { 110 kNormalPriority = 0, 111 112 // Must be the last entry. 113 kQueuePriorityCount = 1, 114 }; 115 116 // Options for constructing a TaskQueue. 117 struct Spec { SpecSpec118 explicit Spec(QueueName name) : name(name) {} 119 SetShouldMonitorQuiescenceSpec120 Spec SetShouldMonitorQuiescence(bool should_monitor) { 121 should_monitor_quiescence = should_monitor; 122 return *this; 123 } 124 SetShouldNotifyObserversSpec125 Spec SetShouldNotifyObservers(bool run_observers) { 126 should_notify_observers = run_observers; 127 return *this; 128 } 129 130 // Delayed fences require Now() to be sampled when posting immediate tasks 131 // which is not free. SetDelayedFencesAllowedSpec132 Spec SetDelayedFencesAllowed(bool allow_delayed_fences) { 133 delayed_fence_allowed = allow_delayed_fences; 134 return *this; 135 } 136 SetNonWakingSpec137 Spec SetNonWaking(bool non_waking_in) { 138 non_waking = non_waking_in; 139 return *this; 140 } 141 142 QueueName name; 143 bool should_monitor_quiescence = false; 144 bool should_notify_observers = true; 145 bool delayed_fence_allowed = false; 146 bool non_waking = false; 147 }; 148 149 // TODO(altimin): Make this private after TaskQueue/TaskQueueImpl refactoring. 150 TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl, 151 const TaskQueue::Spec& spec); 152 TaskQueue(const TaskQueue&) = delete; 153 TaskQueue& operator=(const TaskQueue&) = delete; 154 155 // Information about task execution. 156 // 157 // Wall-time related methods (start_time, end_time, wall_duration) can be 158 // called only when |has_wall_time()| is true. 159 // Thread-time related mehtods (start_thread_time, end_thread_time, 160 // thread_duration) can be called only when |has_thread_time()| is true. 161 // 162 // start_* should be called after RecordTaskStart. 163 // end_* and *_duration should be called after RecordTaskEnd. 164 class BASE_EXPORT TaskTiming { 165 public: 166 enum class State { NotStarted, Running, Finished }; 167 enum class TimeRecordingPolicy { DoRecord, DoNotRecord }; 168 169 TaskTiming(bool has_wall_time, bool has_thread_time); 170 has_wall_time()171 bool has_wall_time() const { return has_wall_time_; } has_thread_time()172 bool has_thread_time() const { return has_thread_time_; } 173 start_time()174 base::TimeTicks start_time() const { 175 DCHECK(has_wall_time()); 176 return start_time_; 177 } end_time()178 base::TimeTicks end_time() const { 179 DCHECK(has_wall_time()); 180 return end_time_; 181 } wall_duration()182 base::TimeDelta wall_duration() const { 183 DCHECK(has_wall_time()); 184 return end_time_ - start_time_; 185 } start_thread_time()186 base::ThreadTicks start_thread_time() const { 187 DCHECK(has_thread_time()); 188 return start_thread_time_; 189 } end_thread_time()190 base::ThreadTicks end_thread_time() const { 191 DCHECK(has_thread_time()); 192 return end_thread_time_; 193 } thread_duration()194 base::TimeDelta thread_duration() const { 195 DCHECK(has_thread_time()); 196 return end_thread_time_ - start_thread_time_; 197 } 198 state()199 State state() const { return state_; } 200 201 void RecordTaskStart(LazyNow* now); 202 void RecordTaskEnd(LazyNow* now); 203 204 // Protected for tests. 205 protected: 206 State state_ = State::NotStarted; 207 208 bool has_wall_time_; 209 bool has_thread_time_; 210 211 base::TimeTicks start_time_; 212 base::TimeTicks end_time_; 213 base::ThreadTicks start_thread_time_; 214 base::ThreadTicks end_thread_time_; 215 }; 216 217 // An interface that lets the owner vote on whether or not the associated 218 // TaskQueue should be enabled. 219 class BASE_EXPORT QueueEnabledVoter { 220 public: 221 ~QueueEnabledVoter(); 222 223 QueueEnabledVoter(const QueueEnabledVoter&) = delete; 224 const QueueEnabledVoter& operator=(const QueueEnabledVoter&) = delete; 225 226 // Votes to enable or disable the associated TaskQueue. The TaskQueue will 227 // only be enabled if all the voters agree it should be enabled, or if there 228 // are no voters. 229 // NOTE this must be called on the thread the associated TaskQueue was 230 // created on. 231 void SetVoteToEnable(bool enabled); 232 IsVotingToEnable()233 bool IsVotingToEnable() const { return enabled_; } 234 235 private: 236 friend class TaskQueue; 237 explicit QueueEnabledVoter(scoped_refptr<TaskQueue> task_queue); 238 239 scoped_refptr<TaskQueue> const task_queue_; 240 bool enabled_; 241 }; 242 243 // Returns an interface that allows the caller to vote on whether or not this 244 // TaskQueue is enabled. The TaskQueue will be enabled if there are no voters 245 // or if all agree it should be enabled. 246 // NOTE this must be called on the thread this TaskQueue was created by. 247 std::unique_ptr<QueueEnabledVoter> CreateQueueEnabledVoter(); 248 249 // NOTE this must be called on the thread this TaskQueue was created by. 250 bool IsQueueEnabled() const; 251 252 // Returns true if the queue is completely empty. 253 bool IsEmpty() const; 254 255 // Returns the number of pending tasks in the queue. 256 size_t GetNumberOfPendingTasks() const; 257 258 // Returns true iff this queue has immediate tasks or delayed tasks that are 259 // ripe for execution. Ignores the queue's enabled state and fences. 260 // NOTE: this must be called on the thread this TaskQueue was created by. 261 // TODO(etiennep): Rename to HasReadyTask() and add LazyNow parameter. 262 bool HasTaskToRunImmediatelyOrReadyDelayedTask() const; 263 264 // Returns a wake-up for the next pending delayed task (pending delayed tasks 265 // that are ripe may be ignored), ignoring Throttler is any. If there are no 266 // such tasks (immediate tasks don't count) or the queue is disabled it 267 // returns nullopt. 268 // NOTE: this must be called on the thread this TaskQueue was created by. 269 absl::optional<WakeUp> GetNextDesiredWakeUp(); 270 271 // Can be called on any thread. 272 virtual const char* GetName() const; 273 274 // Serialise this object into a trace. 275 void WriteIntoTrace(perfetto::TracedValue context) const; 276 277 // Set the priority of the queue to |priority|. NOTE this must be called on 278 // the thread this TaskQueue was created by. 279 void SetQueuePriority(QueuePriority priority); 280 281 // Same as above but with an enum value as the priority. 282 template <typename T, typename = typename std::enable_if_t<std::is_enum_v<T>>> SetQueuePriority(T priority)283 void SetQueuePriority(T priority) { 284 static_assert(std::is_same_v<std::underlying_type_t<T>, QueuePriority>, 285 "Enumerated priorites must have the same underlying type as " 286 "TaskQueue::QueuePriority"); 287 SetQueuePriority(static_cast<QueuePriority>(priority)); 288 } 289 290 // Returns the current queue priority. 291 QueuePriority GetQueuePriority() const; 292 293 // These functions can only be called on the same thread that the task queue 294 // manager executes its tasks on. 295 void AddTaskObserver(TaskObserver* task_observer); 296 void RemoveTaskObserver(TaskObserver* task_observer); 297 298 enum class InsertFencePosition { 299 kNow, // Tasks posted on the queue up till this point further may run. 300 // All further tasks are blocked. 301 kBeginningOfTime, // No tasks posted on this queue may run. 302 }; 303 304 // Inserts a barrier into the task queue which prevents tasks with an enqueue 305 // order greater than the fence from running until either the fence has been 306 // removed or a subsequent fence has unblocked some tasks within the queue. 307 // Note: delayed tasks get their enqueue order set once their delay has 308 // expired, and non-delayed tasks get their enqueue order set when posted. 309 // 310 // Fences come in three flavours: 311 // - Regular (InsertFence(NOW)) - all tasks posted after this moment 312 // are blocked. 313 // - Fully blocking (InsertFence(kBeginningOfTime)) - all tasks including 314 // already posted are blocked. 315 // - Delayed (InsertFenceAt(timestamp)) - blocks all tasks posted after given 316 // point in time (must be in the future). 317 // 318 // Only one fence can be scheduled at a time. Inserting a new fence 319 // will automatically remove the previous one, regardless of fence type. 320 void InsertFence(InsertFencePosition position); 321 322 // Delayed fences are only allowed for queues created with 323 // SetDelayedFencesAllowed(true) because this feature implies sampling Now() 324 // (which isn't free) for every PostTask, even those with zero delay. 325 void InsertFenceAt(TimeTicks time); 326 327 // Removes any previously added fence and unblocks execution of any tasks 328 // blocked by it. 329 void RemoveFence(); 330 331 // Returns true if the queue has a fence but it isn't necessarily blocking 332 // execution of tasks (it may be the case if tasks enqueue order hasn't 333 // reached the number set for a fence). 334 bool HasActiveFence(); 335 336 // Returns true if the queue has a fence which is blocking execution of tasks. 337 bool BlockedByFence() const; 338 339 // Associates |throttler| to this queue. Only one throttler can be associated 340 // with this queue. |throttler| must outlive this TaskQueue, or remain valid 341 // until ResetThrottler(). 342 void SetThrottler(Throttler* throttler); 343 // Disassociates the current throttler from this queue, if any. 344 void ResetThrottler(); 345 346 // Updates the task queue's next wake up time in its time domain, taking into 347 // account the desired run time of queued tasks and policies enforced by the 348 // throttler if any. 349 void UpdateWakeUp(LazyNow* lazy_now); 350 351 // Controls whether or not the queue will emit traces events when tasks are 352 // posted to it while disabled. This only applies for the current or next 353 // period during which the queue is disabled. When the queue is re-enabled 354 // this will revert back to the default value of false. 355 void SetShouldReportPostedTasksWhenDisabled(bool should_report); 356 357 // Create a task runner for this TaskQueue which will annotate all 358 // posted tasks with the given task type. 359 // May be called on any thread. 360 // NOTE: Task runners don't hold a reference to a TaskQueue, hence, 361 // it's required to retain that reference to prevent automatic graceful 362 // shutdown. Unique ownership of task queues will fix this issue soon. 363 scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(TaskType task_type); 364 365 // Default task runner which doesn't annotate tasks with a task type. task_runner()366 const scoped_refptr<SingleThreadTaskRunner>& task_runner() const { 367 return default_task_runner_; 368 } 369 370 // Checks whether or not this TaskQueue has a TaskQueueImpl. 371 // TODO(crbug.com/1143007): Remove this method when TaskQueueImpl inherits 372 // from TaskQueue and TaskQueue no longer owns an Impl. HasImpl()373 bool HasImpl() { return !!impl_; } 374 375 using OnTaskStartedHandler = 376 RepeatingCallback<void(const Task&, const TaskQueue::TaskTiming&)>; 377 using OnTaskCompletedHandler = 378 RepeatingCallback<void(const Task&, TaskQueue::TaskTiming*, LazyNow*)>; 379 using OnTaskPostedHandler = RepeatingCallback<void(const Task&)>; 380 using TaskExecutionTraceLogger = 381 RepeatingCallback<void(perfetto::EventContext&, const Task&)>; 382 383 // Sets a handler to subscribe for notifications about started and completed 384 // tasks. 385 void SetOnTaskStartedHandler(OnTaskStartedHandler handler); 386 387 // |task_timing| may be passed in Running state and may not have the end time, 388 // so that the handler can run an additional task that is counted as a part of 389 // the main task. 390 // The handler can call TaskTiming::RecordTaskEnd, which is optional, to 391 // finalize the task, and use the resulting timing. 392 void SetOnTaskCompletedHandler(OnTaskCompletedHandler handler); 393 394 // RAII handle associated with an OnTaskPostedHandler. Unregisters the handler 395 // upon destruction. 396 class OnTaskPostedCallbackHandle { 397 public: 398 OnTaskPostedCallbackHandle(const OnTaskPostedCallbackHandle&) = delete; 399 OnTaskPostedCallbackHandle& operator=(const OnTaskPostedCallbackHandle&) = 400 delete; 401 virtual ~OnTaskPostedCallbackHandle() = default; 402 403 protected: 404 OnTaskPostedCallbackHandle() = default; 405 }; 406 407 // Add a callback for adding custom functionality for processing posted task. 408 // Callback will be dispatched while holding a scheduler lock. As a result, 409 // callback should not call scheduler APIs directly, as this can lead to 410 // deadlocks. For example, PostTask should not be called directly and 411 // ScopedDeferTaskPosting::PostOrDefer should be used instead. `handler` must 412 // not be a null callback. Must be called on the thread this task queue is 413 // associated with, and the handle returned must be destroyed on the same 414 // thread. 415 [[nodiscard]] std::unique_ptr<OnTaskPostedCallbackHandle> 416 AddOnTaskPostedHandler(OnTaskPostedHandler handler); 417 418 // Set a callback to fill trace event arguments associated with the task 419 // execution. 420 void SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger); 421 AsWeakPtr()422 base::WeakPtr<TaskQueue> AsWeakPtr() { 423 return weak_ptr_factory_.GetWeakPtr(); 424 } 425 426 protected: 427 virtual ~TaskQueue(); 428 GetTaskQueueImpl()429 internal::TaskQueueImpl* GetTaskQueueImpl() const { return impl_.get(); } 430 431 private: 432 friend class RefCountedThreadSafe<TaskQueue>; 433 friend class internal::SequenceManagerImpl; 434 friend class internal::TaskQueueImpl; 435 436 void AddQueueEnabledVoter(bool voter_is_enabled); 437 void RemoveQueueEnabledVoter(bool voter_is_enabled); 438 bool AreAllQueueEnabledVotersEnabled() const; 439 void OnQueueEnabledVoteChanged(bool enabled); 440 441 bool IsOnMainThread() const; 442 443 // Shuts down the queue when there are no more tasks queued. 444 void ShutdownTaskQueueGracefully(); 445 446 // TaskQueue has ownership of an underlying implementation but in certain 447 // cases (e.g. detached frames) their lifetime may diverge. 448 // This method should be used to take away the impl for graceful shutdown. 449 // TaskQueue will disregard any calls or posting tasks thereafter. 450 std::unique_ptr<internal::TaskQueueImpl> TakeTaskQueueImpl(); 451 452 // |impl_| can be written to on the main thread but can be read from 453 // any thread. 454 // |impl_lock_| must be acquired when writing to |impl_| or when accessing 455 // it from non-main thread. Reading from the main thread does not require 456 // a lock. 457 mutable base::internal::CheckedLock impl_lock_{ 458 base::internal::UniversalPredecessor{}}; 459 std::unique_ptr<internal::TaskQueueImpl> impl_; 460 461 const WeakPtr<internal::SequenceManagerImpl> sequence_manager_; 462 463 const scoped_refptr<const internal::AssociatedThreadId> associated_thread_; 464 const scoped_refptr<SingleThreadTaskRunner> default_task_runner_; 465 466 int enabled_voter_count_ = 0; 467 int voter_count_ = 0; 468 QueueName name_; 469 470 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_{this}; 471 }; 472 473 } // namespace sequence_manager 474 } // namespace base 475 476 #endif // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_H_ 477