1 // Copyright 2018 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 7 8 #include <deque> 9 #include <map> 10 #include <memory> 11 #include <set> 12 #include <string> 13 #include <utility> 14 15 #include "base/atomic_sequence_num.h" 16 #include "base/base_export.h" 17 #include "base/cancelable_callback.h" 18 #include "base/containers/circular_deque.h" 19 #include "base/debug/crash_logging.h" 20 #include "base/feature_list.h" 21 #include "base/functional/callback_forward.h" 22 #include "base/memory/raw_ptr.h" 23 #include "base/memory/scoped_refptr.h" 24 #include "base/memory/weak_ptr.h" 25 #include "base/message_loop/message_pump_type.h" 26 #include "base/observer_list.h" 27 #include "base/pending_task.h" 28 #include "base/rand_util.h" 29 #include "base/run_loop.h" 30 #include "base/synchronization/lock.h" 31 #include "base/task/current_thread.h" 32 #include "base/task/sequence_manager/associated_thread_id.h" 33 #include "base/task/sequence_manager/enqueue_order.h" 34 #include "base/task/sequence_manager/enqueue_order_generator.h" 35 #include "base/task/sequence_manager/sequence_manager.h" 36 #include "base/task/sequence_manager/task_queue_impl.h" 37 #include "base/task/sequence_manager/task_queue_selector.h" 38 #include "base/task/sequence_manager/thread_controller.h" 39 #include "base/task/sequenced_task_runner.h" 40 #include "base/task/single_thread_task_runner.h" 41 #include "base/threading/thread_checker.h" 42 #include "base/time/default_tick_clock.h" 43 #include "base/types/pass_key.h" 44 #include "base/values.h" 45 #include "build/build_config.h" 46 #include "third_party/abseil-cpp/absl/types/optional.h" 47 48 namespace base { 49 50 namespace internal { 51 class SequenceManagerThreadDelegate; 52 } 53 54 namespace trace_event { 55 class ConvertableToTraceFormat; 56 } // namespace trace_event 57 58 namespace sequence_manager { 59 60 class SequenceManagerForTest; 61 class TaskQueue; 62 class TaskTimeObserver; 63 class TimeDomain; 64 65 namespace internal { 66 67 class TaskQueueImpl; 68 class DefaultWakeUpQueue; 69 class SequenceManagerImpl; 70 class ThreadControllerImpl; 71 72 // A private factory method for SequenceManagerThreadDelegate which is 73 // equivalent to sequence_manager::CreateUnboundSequenceManager() but returns 74 // the underlying impl. 75 std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl( 76 PassKey<base::internal::SequenceManagerThreadDelegate>, 77 SequenceManager::Settings settings); 78 79 // The task queue manager provides N task queues and a selector interface for 80 // choosing which task queue to service next. Each task queue consists of two 81 // sub queues: 82 // 83 // 1. Incoming task queue. Tasks that are posted get immediately appended here. 84 // When a task is appended into an empty incoming queue, the task manager 85 // work function (DoWork()) is scheduled to run on the main task runner. 86 // 87 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from 88 // the incoming task queue (if any) are moved here. The work queues are 89 // registered with the selector as input to the scheduling decision. 90 // 91 class BASE_EXPORT SequenceManagerImpl 92 : public SequenceManager, 93 public internal::SequencedTaskSource, 94 public internal::TaskQueueSelector::Observer, 95 public RunLoop::NestingObserver { 96 public: 97 using Observer = SequenceManager::Observer; 98 99 SequenceManagerImpl(const SequenceManagerImpl&) = delete; 100 SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete; 101 ~SequenceManagerImpl() override; 102 103 // Initializes the state of all the sequence manager features. Must be invoked 104 // after FeatureList initialization. 105 static void InitializeFeatures(); 106 107 // Sets the global cached state of the NoWakeUpsForCanceledTasks feature 108 // according to its enabled state. Must be invoked after FeatureList 109 // initialization. 110 static void ApplyNoWakeUpsForCanceledTasks(); 111 112 // Resets the global cached state of the NoWakeUpsForCanceledTasks feature 113 // according to its default state. 114 static void ResetNoWakeUpsForCanceledTasksForTesting(); 115 116 // SequenceManager implementation: 117 void BindToCurrentThread() override; 118 scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override; 119 void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override; 120 void SetObserver(Observer* observer) override; 121 void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 122 void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 123 void SetTimeDomain(TimeDomain* time_domain) override; 124 void ResetTimeDomain() override; 125 const TickClock* GetTickClock() const override; 126 TimeTicks NowTicks() const override; 127 void SetDefaultTaskRunner( 128 scoped_refptr<SingleThreadTaskRunner> task_runner) override; 129 void ReclaimMemory() override; 130 bool GetAndClearSystemIsQuiescentBit() override; 131 void SetWorkBatchSize(int work_batch_size) override; 132 void SetTimerSlack(TimerSlack timer_slack) override; 133 void EnableCrashKeys(const char* async_stack_crash_key) override; 134 const MetricRecordingSettings& GetMetricRecordingSettings() const override; 135 size_t GetPendingTaskCountForTesting() const override; 136 scoped_refptr<TaskQueue> CreateTaskQueue( 137 const TaskQueue::Spec& spec) override; 138 std::string DescribeAllPendingTasks() const override; 139 void PrioritizeYieldingToNative(base::TimeTicks prioritize_until) override; 140 void AddTaskObserver(TaskObserver* task_observer) override; 141 void RemoveTaskObserver(TaskObserver* task_observer) override; 142 absl::optional<WakeUp> GetNextDelayedWakeUp() const override; 143 TaskQueue::QueuePriority GetPriorityCount() const override; 144 145 // SequencedTaskSource implementation: 146 absl::optional<SelectedTask> SelectNextTask( 147 LazyNow& lazy_now, 148 SelectTaskOption option = SelectTaskOption::kDefault) override; 149 void DidRunTask(LazyNow& lazy_now) override; 150 void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) override; 151 absl::optional<WakeUp> GetPendingWakeUp( 152 LazyNow* lazy_now, 153 SelectTaskOption option = SelectTaskOption::kDefault) const override; 154 bool HasPendingHighResolutionTasks() override; 155 bool OnSystemIdle() override; 156 void MaybeEmitTaskDetails( 157 perfetto::EventContext& ctx, 158 const SequencedTaskSource::SelectedTask& selected_task) const override; 159 160 void AddDestructionObserver( 161 CurrentThread::DestructionObserver* destruction_observer); 162 void RemoveDestructionObserver( 163 CurrentThread::DestructionObserver* destruction_observer); 164 void RegisterOnNextIdleCallback(OnceClosure on_next_idle_callback); 165 // TODO(alexclarke): Remove this as part of https://crbug.com/825327. 166 void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner); 167 // TODO(alexclarke): Remove this as part of https://crbug.com/825327. 168 scoped_refptr<SingleThreadTaskRunner> GetTaskRunner(); 169 bool IsBoundToCurrentThread() const; 170 MessagePump* GetMessagePump() const; 171 bool IsType(MessagePumpType type) const; 172 void SetAddQueueTimeToTasks(bool enable); 173 void SetTaskExecutionAllowed(bool allowed); 174 bool IsTaskExecutionAllowed() const; 175 #if BUILDFLAG(IS_IOS) 176 void AttachToMessagePump(); 177 #endif 178 bool IsIdleForTesting() override; 179 void EnableMessagePumpTimeKeeperMetrics(const char* thread_name); 180 181 // Requests that a task to process work is scheduled. 182 void ScheduleWork(); 183 184 // Returns the currently executing TaskQueue if any. Must be called on the 185 // thread this class was created on. 186 internal::TaskQueueImpl* currently_executing_task_queue() const; 187 188 // Unregisters a TaskQueue previously created by |NewTaskQueue()|. 189 // No tasks will run on this queue after this call. 190 void UnregisterTaskQueueImpl( 191 std::unique_ptr<internal::TaskQueueImpl> task_queue); 192 193 // Schedule a call to UnregisterTaskQueueImpl as soon as it's safe to do so. 194 void ShutdownTaskQueueGracefully( 195 std::unique_ptr<internal::TaskQueueImpl> task_queue); 196 associated_thread()197 scoped_refptr<const AssociatedThreadId> associated_thread() const { 198 return associated_thread_; 199 } 200 settings()201 const Settings& settings() const { return settings_; } 202 203 WeakPtr<SequenceManagerImpl> GetWeakPtr(); 204 205 // How frequently to perform housekeeping tasks (sweeping canceled tasks etc). 206 static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30); 207 208 protected: 209 static std::unique_ptr<ThreadControllerImpl> 210 CreateThreadControllerImplForCurrentThread(const TickClock* clock); 211 212 // Create a task queue manager where |controller| controls the thread 213 // on which the tasks are eventually run. 214 SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller, 215 SequenceManager::Settings settings = Settings()); 216 217 friend class internal::TaskQueueImpl; 218 friend class internal::DefaultWakeUpQueue; 219 friend class ::base::sequence_manager::SequenceManagerForTest; 220 221 private: 222 // Returns the SequenceManager running the 223 // current thread. It must only be used on the thread it was obtained. 224 // Only to be used by CurrentThread for the moment 225 static SequenceManagerImpl* GetCurrent(); 226 friend class ::base::CurrentThread; 227 228 // Factory friends to call into private creation methods. 229 friend std::unique_ptr<SequenceManager> 230 sequence_manager::CreateSequenceManagerOnCurrentThread( 231 SequenceManager::Settings); 232 friend std::unique_ptr<SequenceManager> 233 sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump( 234 std::unique_ptr<MessagePump> message_pump, 235 SequenceManager::Settings); 236 friend std::unique_ptr<SequenceManager> 237 sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings); 238 friend std::unique_ptr<SequenceManagerImpl> 239 sequence_manager::internal::CreateUnboundSequenceManagerImpl( 240 PassKey<base::internal::SequenceManagerThreadDelegate>, 241 SequenceManager::Settings); 242 243 // Assume direct control over current thread and create a SequenceManager. 244 // This function should be called only once per thread. 245 // This function assumes that a task execution environment is already 246 // initialized for the current thread. 247 static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread( 248 SequenceManager::Settings settings); 249 250 // Create an unbound SequenceManager (typically for a future thread). The 251 // SequenceManager can be initialized on the current thread and then needs to 252 // be bound and initialized on the target thread by calling one of the Bind*() 253 // methods. 254 static std::unique_ptr<SequenceManagerImpl> CreateUnbound( 255 SequenceManager::Settings settings); 256 257 enum class ProcessTaskResult { 258 kDeferred, 259 kExecuted, 260 kSequenceManagerDeleted, 261 }; 262 263 // SequenceManager maintains a queue of non-nestable tasks since they're 264 // uncommon and allocating an extra deque per TaskQueue will waste the memory. 265 using NonNestableTaskDeque = 266 circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>; 267 268 // We have to track rentrancy because we support nested runloops but the 269 // selector interface is unaware of those. This struct keeps track off all 270 // task related state needed to make pairs of SelectNextTask() / DidRunTask() 271 // work. 272 struct ExecutingTask { ExecutingTaskExecutingTask273 ExecutingTask(Task&& task, 274 internal::TaskQueueImpl* task_queue, 275 TaskQueue::TaskTiming task_timing) 276 : pending_task(std::move(task)), 277 task_queue(task_queue), 278 task_queue_name(task_queue->GetProtoName()), 279 task_timing(task_timing), 280 priority(task_queue->GetQueuePriority()), 281 task_type(pending_task.task_type) {} 282 283 Task pending_task; 284 285 // `task_queue` is not a raw_ptr<...> for performance reasons (based on 286 // analysis of sampling profiler data and tab_search:top100:2020). 287 RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr; 288 // Save task_queue_name as the task queue can be deleted within the task. 289 QueueName task_queue_name; 290 TaskQueue::TaskTiming task_timing; 291 // Save priority as it might change after running a task. 292 TaskQueue::QueuePriority priority; 293 // Save task metadata to use in after running a task as |pending_task| 294 // won't be available then. 295 int task_type; 296 }; 297 298 struct MainThreadOnly { 299 explicit MainThreadOnly( 300 SequenceManagerImpl* sequence_manager, 301 const scoped_refptr<AssociatedThreadId>& associated_thread, 302 const SequenceManager::Settings& settings, 303 const base::TickClock* clock); 304 ~MainThreadOnly(); 305 306 int nesting_depth = 0; 307 NonNestableTaskDeque non_nestable_task_queue; 308 // TODO(altimin): Switch to instruction pointer crash key when it's 309 // available. 310 raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr; 311 raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr; 312 raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr; 313 std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)> 314 async_stack_buffer = {}; 315 316 absl::optional<base::MetricsSubSampler> metrics_subsampler; 317 318 internal::TaskQueueSelector selector; 319 ObserverList<TaskObserver>::Unchecked task_observers; 320 ObserverList<TaskTimeObserver>::Unchecked task_time_observers; 321 const raw_ptr<const base::TickClock> default_clock; 322 raw_ptr<TimeDomain> time_domain = nullptr; 323 324 std::unique_ptr<WakeUpQueue> wake_up_queue; 325 std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue; 326 327 // If true MaybeReclaimMemory will attempt to reclaim memory. 328 bool memory_reclaim_scheduled = false; 329 330 // Used to ensure we don't perform expensive housekeeping too frequently. 331 TimeTicks next_time_to_reclaim_memory; 332 333 // List of task queues managed by this SequenceManager. 334 // - active_queues contains queues that are still running tasks. 335 // Most often they are owned by relevant TaskQueues, but 336 // queues_to_gracefully_shutdown_ are included here too. 337 // - queues_to_gracefully_shutdown contains queues which should be deleted 338 // when they become empty. 339 // - queues_to_delete contains soon-to-be-deleted queues, because some 340 // internal scheduling code does not expect queues to be pulled 341 // from underneath. 342 343 std::set<internal::TaskQueueImpl*> active_queues; 344 345 std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>> 346 queues_to_gracefully_shutdown; 347 std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>> 348 queues_to_delete; 349 350 bool task_was_run_on_quiescence_monitored_queue = false; 351 bool nesting_observer_registered_ = false; 352 353 // Use std::deque() so that references returned by SelectNextTask() remain 354 // valid until the matching call to DidRunTask(), even when nested RunLoops 355 // cause tasks to be pushed on the stack in-between. This is needed because 356 // references are kept in local variables by calling code between 357 // SelectNextTask()/DidRunTask(). 358 std::deque<ExecutingTask> task_execution_stack; 359 360 raw_ptr<Observer> observer = nullptr; // NOT OWNED 361 362 ObserverList<CurrentThread::DestructionObserver>::Unchecked 363 destruction_observers; 364 365 // If non-null, invoked the next time OnSystemIdle() completes without 366 // scheduling additional work. 367 OnceClosure on_next_idle_callback; 368 }; 369 370 void CompleteInitializationOnBoundThread(); 371 372 // TaskQueueSelector::Observer: 373 void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override; 374 375 // RunLoop::NestingObserver: 376 void OnBeginNestedRunLoop() override; 377 void OnExitNestedRunLoop() override; 378 379 // Schedules next wake-up at the given time, canceling any previous requests. 380 // Use absl::nullopt to cancel a wake-up. Must be called on the thread this 381 // class was created on. 382 void SetNextWakeUp(LazyNow* lazy_now, absl::optional<WakeUp> wake_up); 383 384 // Called by the task queue to inform this SequenceManager of a task that's 385 // about to be queued. This SequenceManager may use this opportunity to add 386 // metadata to |pending_task| before it is moved into the queue. 387 void WillQueueTask(Task* pending_task); 388 389 // Enqueues onto delayed WorkQueues all delayed tasks which must run now 390 // (cannot be postponed) and possibly some delayed tasks which can run now but 391 // could be postponed (due to how tasks are stored, it is not possible to 392 // retrieve all such tasks efficiently) and reloads any empty work queues. 393 void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now); 394 395 void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task); 396 void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task); 397 398 EnqueueOrder GetNextSequenceNumber(); 399 400 bool GetAddQueueTimeToTasks(); 401 402 std::unique_ptr<trace_event::ConvertableToTraceFormat> 403 AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue, 404 bool force_verbose) const; 405 Value::Dict AsValueWithSelectorResult( 406 internal::WorkQueue* selected_work_queue, 407 bool force_verbose) const; 408 409 // Used in construction of TaskQueueImpl to obtain an AtomicFlag which it can 410 // use to request reload by ReloadEmptyWorkQueues. The lifetime of 411 // TaskQueueImpl is managed by this class and the handle will be released by 412 // TaskQueueImpl::UnregisterTaskQueue which is always called before the 413 // queue's destruction. 414 AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue( 415 TaskQueueImpl* task_queue); 416 417 // Calls |TakeImmediateIncomingQueueTasks| on all queues with their reload 418 // flag set in |empty_queues_to_reload_|. 419 void ReloadEmptyWorkQueues() const; 420 421 std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl( 422 const TaskQueue::Spec& spec) override; 423 424 // Periodically reclaims memory by sweeping away canceled tasks and shrinking 425 // buffers. 426 void MaybeReclaimMemory(); 427 428 // Deletes queues marked for deletion and empty queues marked for shutdown. 429 void CleanUpQueues(); 430 431 void RemoveAllCanceledTasksFromFrontOfWorkQueues(); 432 433 TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming( 434 const internal::TaskQueueImpl* task_queue); 435 bool ShouldRecordCPUTimeForTask(); 436 437 // Write the async stack trace onto a crash key as whitespace-delimited hex 438 // addresses. 439 void RecordCrashKeys(const PendingTask&); 440 441 // Helper to terminate all scoped trace events to allow starting new ones 442 // in SelectNextTask(). 443 absl::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now, 444 SelectTaskOption option); 445 446 // Returns a wake-up for the next delayed task which is not ripe for 447 // execution, or nullopt if `option` is `kSkipDelayedTask` or there 448 // are no such tasks (immediate tasks don't count). 449 absl::optional<WakeUp> GetNextDelayedWakeUpWithOption( 450 SelectTaskOption option) const; 451 452 // Given a `wake_up` describing when the next delayed task should run, returns 453 // a wake up that should be scheduled on the thread. `is_immediate()` if the 454 // wake up should run immediately. `nullopt` if no wake up is required because 455 // `wake_up` is `nullopt` or a `time_domain` is used. 456 absl::optional<WakeUp> AdjustWakeUp(absl::optional<WakeUp> wake_up, 457 LazyNow* lazy_now) const; 458 459 void MaybeAddLeewayToTask(Task& task) const; 460 461 #if DCHECK_IS_ON() 462 void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const; 463 #endif 464 465 // Determines if wall time or thread time should be recorded for the next 466 // task. 467 TaskQueue::TaskTiming InitializeTaskTiming( 468 internal::TaskQueueImpl* task_queue); 469 470 const scoped_refptr<AssociatedThreadId> associated_thread_; 471 472 EnqueueOrderGenerator enqueue_order_generator_; 473 474 const std::unique_ptr<internal::ThreadController> controller_; 475 const Settings settings_; 476 477 const MetricRecordingSettings metric_recording_settings_; 478 479 // Whether to add the queue time to tasks. 480 base::subtle::Atomic32 add_queue_time_to_tasks_; 481 482 AtomicFlagSet empty_queues_to_reload_; 483 484 MainThreadOnly main_thread_only_; main_thread_only()485 MainThreadOnly& main_thread_only() { 486 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 487 return main_thread_only_; 488 } main_thread_only()489 const MainThreadOnly& main_thread_only() const { 490 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 491 return main_thread_only_; 492 } 493 494 // |clock_| either refers to the TickClock representation of |time_domain| 495 // (same object) if any, or to |default_clock| otherwise. It is maintained as 496 // an atomic pointer here for multi-threaded usage. 497 std::atomic<const base::TickClock*> clock_; main_thread_clock()498 const base::TickClock* main_thread_clock() const { 499 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 500 return clock_.load(std::memory_order_relaxed); 501 } any_thread_clock()502 const base::TickClock* any_thread_clock() const { 503 // |memory_order_acquire| matched by |memory_order_release| in 504 // SetTimeDomain() to ensure all data used by |clock_| is visible when read 505 // from the current thread. A thread might try to access a stale |clock_| 506 // but that's not an issue since |time_domain| contractually outlives 507 // SequenceManagerImpl even if it's reset. 508 return clock_.load(std::memory_order_acquire); 509 } 510 511 WeakPtrFactory<SequenceManagerImpl> weak_factory_{this}; 512 }; 513 514 } // namespace internal 515 } // namespace sequence_manager 516 } // namespace base 517 518 #endif // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 519