1 // Copyright 2018 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 7 8 #include <atomic> 9 #include <deque> 10 #include <map> 11 #include <memory> 12 #include <optional> 13 #include <set> 14 #include <string> 15 #include <utility> 16 17 #include "base/atomic_sequence_num.h" 18 #include "base/base_export.h" 19 #include "base/callback_list.h" 20 #include "base/compiler_specific.h" 21 #include "base/containers/circular_deque.h" 22 #include "base/debug/crash_logging.h" 23 #include "base/feature_list.h" 24 #include "base/functional/callback_forward.h" 25 #include "base/memory/raw_ptr.h" 26 #include "base/memory/raw_ptr_exclusion.h" 27 #include "base/memory/scoped_refptr.h" 28 #include "base/memory/weak_ptr.h" 29 #include "base/message_loop/message_pump_type.h" 30 #include "base/observer_list.h" 31 #include "base/pending_task.h" 32 #include "base/rand_util.h" 33 #include "base/run_loop.h" 34 #include "base/synchronization/lock.h" 35 #include "base/task/current_thread.h" 36 #include "base/task/sequence_manager/associated_thread_id.h" 37 #include "base/task/sequence_manager/enqueue_order.h" 38 #include "base/task/sequence_manager/enqueue_order_generator.h" 39 #include "base/task/sequence_manager/sequence_manager.h" 40 #include "base/task/sequence_manager/task_queue.h" 41 #include "base/task/sequence_manager/task_queue_impl.h" 42 #include "base/task/sequence_manager/task_queue_selector.h" 43 #include "base/task/sequence_manager/thread_controller.h" 44 #include "base/task/sequence_manager/work_tracker.h" 45 #include "base/task/sequenced_task_runner.h" 46 #include "base/task/single_thread_task_runner.h" 47 #include "base/threading/thread_checker.h" 48 #include "base/time/default_tick_clock.h" 49 #include "base/types/pass_key.h" 50 #include "base/values.h" 51 #include "build/build_config.h" 52 53 namespace base { 54 55 namespace internal { 56 class SequenceManagerThreadDelegate; 57 } 58 59 namespace trace_event { 60 class ConvertableToTraceFormat; 61 } // namespace trace_event 62 63 namespace sequence_manager { 64 65 class SequenceManagerForTest; 66 class TaskQueue; 67 class TaskTimeObserver; 68 class TimeDomain; 69 70 namespace internal { 71 72 class TaskQueueImpl; 73 class DefaultWakeUpQueue; 74 class SequenceManagerImpl; 75 class ThreadControllerImpl; 76 77 // A private factory method for SequenceManagerThreadDelegate which is 78 // equivalent to sequence_manager::CreateUnboundSequenceManager() but returns 79 // the underlying impl. 80 std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl( 81 PassKey<base::internal::SequenceManagerThreadDelegate>, 82 SequenceManager::Settings settings); 83 84 // The task queue manager provides N task queues and a selector interface for 85 // choosing which task queue to service next. Each task queue consists of two 86 // sub queues: 87 // 88 // 1. Incoming task queue. Tasks that are posted get immediately appended here. 89 // When a task is appended into an empty incoming queue, the task manager 90 // work function (DoWork()) is scheduled to run on the main task runner. 91 // 92 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from 93 // the incoming task queue (if any) are moved here. The work queues are 94 // registered with the selector as input to the scheduling decision. 95 // 96 class BASE_EXPORT SequenceManagerImpl 97 : public SequenceManager, 98 public internal::SequencedTaskSource, 99 public internal::TaskQueueSelector::Observer, 100 public RunLoop::NestingObserver { 101 public: 102 using Observer = SequenceManager::Observer; 103 104 SequenceManagerImpl(const SequenceManagerImpl&) = delete; 105 SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete; 106 ~SequenceManagerImpl() override; 107 108 // Initializes features for this class. See `base::features::Init()`. 109 static void InitializeFeatures(); 110 111 // SequenceManager implementation: 112 void BindToCurrentThread() override; 113 scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override; 114 void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override; 115 void SetObserver(Observer* observer) override; 116 void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 117 void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 118 void SetTimeDomain(TimeDomain* time_domain) override; 119 void ResetTimeDomain() override; 120 const TickClock* GetTickClock() const override; 121 TimeTicks NowTicks() const override; 122 void SetDefaultTaskRunner( 123 scoped_refptr<SingleThreadTaskRunner> task_runner) override; 124 void ReclaimMemory() override; 125 bool GetAndClearSystemIsQuiescentBit() override; 126 void SetWorkBatchSize(int work_batch_size) override; 127 void EnableCrashKeys(const char* async_stack_crash_key) override; 128 const MetricRecordingSettings& GetMetricRecordingSettings() const override; 129 size_t GetPendingTaskCountForTesting() const override; 130 TaskQueue::Handle CreateTaskQueue(const TaskQueue::Spec& spec) override; 131 std::string DescribeAllPendingTasks() const override; 132 void PrioritizeYieldingToNative(base::TimeTicks prioritize_until) override; 133 void AddTaskObserver(TaskObserver* task_observer) override; 134 void RemoveTaskObserver(TaskObserver* task_observer) override; 135 std::optional<WakeUp> GetNextDelayedWakeUp() const override; 136 TaskQueue::QueuePriority GetPriorityCount() const override; 137 138 // SequencedTaskSource implementation: 139 void SetRunTaskSynchronouslyAllowed( 140 bool can_run_tasks_synchronously) override; 141 std::optional<SelectedTask> SelectNextTask( 142 LazyNow& lazy_now, 143 SelectTaskOption option = SelectTaskOption::kDefault) override; 144 void DidRunTask(LazyNow& lazy_now) override; 145 std::optional<WakeUp> GetPendingWakeUp( 146 LazyNow* lazy_now, 147 SelectTaskOption option = SelectTaskOption::kDefault) override; 148 bool HasPendingHighResolutionTasks() override; 149 void OnBeginWork() override; 150 bool OnIdle() override; 151 void MaybeEmitTaskDetails( 152 perfetto::EventContext& ctx, 153 const SequencedTaskSource::SelectedTask& selected_task) const override; 154 155 void AddDestructionObserver( 156 CurrentThread::DestructionObserver* destruction_observer); 157 void RemoveDestructionObserver( 158 CurrentThread::DestructionObserver* destruction_observer); 159 [[nodiscard]] CallbackListSubscription RegisterOnNextIdleCallback( 160 OnceClosure on_next_idle_callback); 161 162 // Sets / returns the default TaskRunner. Thread-safe. 163 void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner); 164 scoped_refptr<SingleThreadTaskRunner> GetTaskRunner(); 165 166 bool IsBoundToCurrentThread() const; 167 MessagePump* GetMessagePump() const; 168 bool IsType(MessagePumpType type) const; 169 void SetAddQueueTimeToTasks(bool enable); 170 void SetTaskExecutionAllowedInNativeNestedLoop(bool allowed); 171 bool IsTaskExecutionAllowedInNativeNestedLoop() const; 172 #if BUILDFLAG(IS_IOS) 173 void AttachToMessagePump(); 174 #endif 175 bool IsIdleForTesting() override; 176 void EnableMessagePumpTimeKeeperMetrics( 177 const char* thread_name, 178 bool wall_time_based_metrics_enabled_for_testing = false); 179 180 // Requests that a task to process work is scheduled. 181 void ScheduleWork(); 182 183 // Returns the currently executing TaskQueue if any. Must be called on the 184 // thread this class was created on. 185 internal::TaskQueueImpl* currently_executing_task_queue() const; 186 187 // Unregisters a TaskQueue previously created by |NewTaskQueue()|. 188 // No tasks will run on this queue after this call. 189 void UnregisterTaskQueueImpl( 190 std::unique_ptr<internal::TaskQueueImpl> task_queue); 191 associated_thread()192 scoped_refptr<AssociatedThreadId> associated_thread() const { 193 return associated_thread_; 194 } 195 settings()196 const Settings& settings() const LIFETIME_BOUND { return settings_; } 197 198 WeakPtr<SequenceManagerImpl> GetWeakPtr(); 199 200 // How frequently to perform housekeeping tasks (sweeping canceled tasks etc). 201 static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30); 202 203 protected: 204 static std::unique_ptr<ThreadControllerImpl> 205 CreateThreadControllerImplForCurrentThread(const TickClock* clock); 206 207 // Create a task queue manager where |controller| controls the thread 208 // on which the tasks are eventually run. 209 SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller, 210 SequenceManager::Settings settings = Settings()); 211 212 friend class internal::TaskQueueImpl; 213 friend class internal::DefaultWakeUpQueue; 214 friend class ::base::sequence_manager::SequenceManagerForTest; 215 216 private: 217 // Returns the SequenceManager running the 218 // current thread. It must only be used on the thread it was obtained. 219 // Only to be used by CurrentThread for the moment 220 static SequenceManagerImpl* GetCurrent(); 221 friend class ::base::CurrentThread; 222 223 // Factory friends to call into private creation methods. 224 friend std::unique_ptr<SequenceManager> 225 sequence_manager::CreateSequenceManagerOnCurrentThread( 226 SequenceManager::Settings); 227 friend std::unique_ptr<SequenceManager> 228 sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump( 229 std::unique_ptr<MessagePump> message_pump, 230 SequenceManager::Settings); 231 friend std::unique_ptr<SequenceManager> 232 sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings); 233 friend std::unique_ptr<SequenceManagerImpl> 234 sequence_manager::internal::CreateUnboundSequenceManagerImpl( 235 PassKey<base::internal::SequenceManagerThreadDelegate>, 236 SequenceManager::Settings); 237 238 // Assume direct control over current thread and create a SequenceManager. 239 // This function should be called only once per thread. 240 // This function assumes that a task execution environment is already 241 // initialized for the current thread. 242 static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread( 243 SequenceManager::Settings settings); 244 245 // Create an unbound SequenceManager (typically for a future thread). The 246 // SequenceManager can be initialized on the current thread and then needs to 247 // be bound and initialized on the target thread by calling one of the Bind*() 248 // methods. 249 static std::unique_ptr<SequenceManagerImpl> CreateUnbound( 250 SequenceManager::Settings settings); 251 252 enum class ProcessTaskResult { 253 kDeferred, 254 kExecuted, 255 kSequenceManagerDeleted, 256 }; 257 258 // SequenceManager maintains a queue of non-nestable tasks since they're 259 // uncommon and allocating an extra deque per TaskQueue will waste the memory. 260 using NonNestableTaskDeque = 261 circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>; 262 263 // We have to track rentrancy because we support nested runloops but the 264 // selector interface is unaware of those. This struct keeps track off all 265 // task related state needed to make pairs of SelectNextTask() / DidRunTask() 266 // work. 267 struct ExecutingTask { ExecutingTaskExecutingTask268 ExecutingTask(Task&& task, 269 internal::TaskQueueImpl* task_queue, 270 TaskQueue::TaskTiming task_timing) 271 : pending_task(std::move(task)), 272 task_queue(task_queue), 273 task_queue_name(task_queue->GetProtoName()), 274 task_timing(task_timing), 275 priority(task_queue->GetQueuePriority()), 276 task_type(pending_task.task_type) {} 277 278 Task pending_task; 279 280 // `task_queue` is not a raw_ptr<...> for performance reasons (based on 281 // analysis of sampling profiler data and tab_search:top100:2020). 282 RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr; 283 // Save task_queue_name as the task queue can be deleted within the task. 284 QueueName task_queue_name; 285 TaskQueue::TaskTiming task_timing; 286 // Save priority as it might change after running a task. 287 TaskQueue::QueuePriority priority; 288 // Save task metadata to use in after running a task as |pending_task| 289 // won't be available then. 290 int task_type; 291 }; 292 293 struct MainThreadOnly { 294 explicit MainThreadOnly( 295 SequenceManagerImpl* sequence_manager, 296 const scoped_refptr<AssociatedThreadId>& associated_thread, 297 const SequenceManager::Settings& settings, 298 const base::TickClock* clock); 299 ~MainThreadOnly(); 300 301 int nesting_depth = 0; 302 NonNestableTaskDeque non_nestable_task_queue; 303 // TODO(altimin): Switch to instruction pointer crash key when it's 304 // available. 305 raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr; 306 raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr; 307 raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr; 308 std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)> 309 async_stack_buffer = {}; 310 311 std::optional<base::MetricsSubSampler> metrics_subsampler; 312 313 internal::TaskQueueSelector selector; 314 // RAW_PTR_EXCLUSION: Performance reasons(based on analysis of 315 // speedometer3). 316 ObserverList<TaskObserver>::UncheckedAndRawPtrExcluded task_observers; 317 ObserverList<TaskTimeObserver> task_time_observers; 318 const raw_ptr<const base::TickClock> default_clock; 319 raw_ptr<TimeDomain> time_domain = nullptr; 320 321 std::unique_ptr<WakeUpQueue> wake_up_queue; 322 std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue; 323 324 // If true MaybeReclaimMemory will attempt to reclaim memory. 325 bool memory_reclaim_scheduled = false; 326 327 // Used to ensure we don't perform expensive housekeeping too frequently. 328 TimeTicks next_time_to_reclaim_memory; 329 330 // List of task queues managed by this SequenceManager. 331 // - active_queues contains queues that are still running tasks, which are 332 // are owned by relevant TaskQueues. 333 // - queues_to_delete contains soon-to-be-deleted queues, because some 334 // internal scheduling code does not expect queues to be pulled 335 // from underneath. 336 337 // RAW_PTR_EXCLUSION: Performance reasons (based on analysis of 338 // speedometer3). 339 RAW_PTR_EXCLUSION std::set<internal::TaskQueueImpl*> active_queues; 340 341 std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>> 342 queues_to_delete; 343 344 bool task_was_run_on_quiescence_monitored_queue = false; 345 bool nesting_observer_registered_ = false; 346 347 // Use std::deque() so that references returned by SelectNextTask() remain 348 // valid until the matching call to DidRunTask(), even when nested RunLoops 349 // cause tasks to be pushed on the stack in-between. This is needed because 350 // references are kept in local variables by calling code between 351 // SelectNextTask()/DidRunTask(). 352 std::deque<ExecutingTask> task_execution_stack; 353 354 raw_ptr<Observer> observer = nullptr; // NOT OWNED 355 356 ObserverList<CurrentThread::DestructionObserver>:: 357 UncheckedAndDanglingUntriaged destruction_observers; 358 359 // Notified the next time `OnIdle()` completes without scheduling additional 360 // work. 361 OnceClosureList on_next_idle_callbacks; 362 }; 363 364 void CompleteInitializationOnBoundThread(); 365 366 // TaskQueueSelector::Observer: 367 void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override; 368 void OnWorkAvailable() override; 369 370 // RunLoop::NestingObserver: 371 void OnBeginNestedRunLoop() override; 372 void OnExitNestedRunLoop() override; 373 374 // Schedules next wake-up at the given time, canceling any previous requests. 375 // Use std::nullopt to cancel a wake-up. Must be called on the thread this 376 // class was created on. 377 void SetNextWakeUp(LazyNow* lazy_now, std::optional<WakeUp> wake_up); 378 379 // Called before TaskQueue requests to reload its empty immediate work queue. 380 void WillRequestReloadImmediateWorkQueue(); 381 382 // Returns a valid `SyncWorkAuthorization` if a call to `RunOrPostTask` on a 383 // `SequencedTaskRunner` bound to this `SequenceManager` may run its task 384 // synchronously. 385 SyncWorkAuthorization TryAcquireSyncWorkAuthorization(); 386 387 // Called when a task is about to be queued. May add metadata to the task and 388 // emit trace events. 389 void WillQueueTask(Task* pending_task); 390 391 // Enqueues onto delayed WorkQueues all delayed tasks which must run now 392 // (cannot be postponed) and possibly some delayed tasks which can run now but 393 // could be postponed (due to how tasks are stored, it is not possible to 394 // retrieve all such tasks efficiently) and reloads any empty work queues. 395 void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now); 396 397 void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task); 398 void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task); 399 400 EnqueueOrder GetNextSequenceNumber(); 401 402 bool GetAddQueueTimeToTasks(); 403 404 std::unique_ptr<trace_event::ConvertableToTraceFormat> 405 AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue, 406 bool force_verbose) const; 407 Value::Dict AsValueWithSelectorResult( 408 internal::WorkQueue* selected_work_queue, 409 bool force_verbose) const; 410 411 // Used in construction of TaskQueueImpl to obtain an AtomicFlag which it can 412 // use to request reload by ReloadEmptyWorkQueues. The lifetime of 413 // TaskQueueImpl is managed by this class and the handle will be released by 414 // TaskQueueImpl::UnregisterTaskQueue which is always called before the 415 // queue's destruction. 416 AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue( 417 TaskQueueImpl* task_queue); 418 419 // Calls |TakeImmediateIncomingQueueTasks| on all queues with their reload 420 // flag set in |empty_queues_to_reload_|. 421 void ReloadEmptyWorkQueues(); 422 423 std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl( 424 const TaskQueue::Spec& spec); 425 426 // Periodically reclaims memory by sweeping away canceled tasks and shrinking 427 // buffers. 428 void MaybeReclaimMemory(); 429 430 // Deletes queues marked for deletion and empty queues marked for shutdown. 431 void CleanUpQueues(); 432 433 // Removes canceled delayed tasks from the front of wake up queue. 434 void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now); 435 436 TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming( 437 const internal::TaskQueueImpl* task_queue); 438 bool ShouldRecordCPUTimeForTask(); 439 440 // Write the async stack trace onto a crash key as whitespace-delimited hex 441 // addresses. 442 void RecordCrashKeys(const PendingTask&); 443 444 // Helper to terminate all scoped trace events to allow starting new ones 445 // in SelectNextTask(). 446 std::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now, 447 SelectTaskOption option); 448 449 // Returns a wake-up for the next delayed task which is not ripe for 450 // execution, or nullopt if `option` is `kSkipDelayedTask` or there 451 // are no such tasks (immediate tasks don't count). 452 std::optional<WakeUp> GetNextDelayedWakeUpWithOption( 453 SelectTaskOption option) const; 454 455 // Given a `wake_up` describing when the next delayed task should run, returns 456 // a wake up that should be scheduled on the thread. `is_immediate()` if the 457 // wake up should run immediately. `nullopt` if no wake up is required because 458 // `wake_up` is `nullopt` or a `time_domain` is used. 459 std::optional<WakeUp> AdjustWakeUp(std::optional<WakeUp> wake_up, 460 LazyNow* lazy_now) const; 461 462 void MaybeAddLeewayToTask(Task& task) const; 463 464 #if DCHECK_IS_ON() 465 void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const; 466 #endif 467 468 // Determines if wall time or thread time should be recorded for the next 469 // task. 470 TaskQueue::TaskTiming InitializeTaskTiming( 471 internal::TaskQueueImpl* task_queue); 472 473 const scoped_refptr<AssociatedThreadId> associated_thread_; 474 475 EnqueueOrderGenerator enqueue_order_generator_; 476 477 const std::unique_ptr<internal::ThreadController> controller_; 478 const Settings settings_; 479 480 const MetricRecordingSettings metric_recording_settings_; 481 482 WorkTracker work_tracker_; 483 484 // Whether to add the queue time to tasks. 485 std::atomic<bool> add_queue_time_to_tasks_; 486 487 AtomicFlagSet empty_queues_to_reload_; 488 489 MainThreadOnly main_thread_only_; main_thread_only()490 MainThreadOnly& main_thread_only() { 491 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 492 return main_thread_only_; 493 } main_thread_only()494 const MainThreadOnly& main_thread_only() const LIFETIME_BOUND { 495 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 496 return main_thread_only_; 497 } 498 499 // |clock_| either refers to the TickClock representation of |time_domain| 500 // (same object) if any, or to |default_clock| otherwise. It is maintained as 501 // an atomic pointer here for multi-threaded usage. 502 std::atomic<const base::TickClock*> clock_; main_thread_clock()503 const base::TickClock* main_thread_clock() const { 504 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 505 return clock_.load(std::memory_order_relaxed); 506 } any_thread_clock()507 const base::TickClock* any_thread_clock() const { 508 // |memory_order_acquire| matched by |memory_order_release| in 509 // SetTimeDomain() to ensure all data used by |clock_| is visible when read 510 // from the current thread. A thread might try to access a stale |clock_| 511 // but that's not an issue since |time_domain| contractually outlives 512 // SequenceManagerImpl even if it's reset. 513 return clock_.load(std::memory_order_acquire); 514 } 515 516 WeakPtrFactory<SequenceManagerImpl> weak_factory_{this}; 517 }; 518 519 } // namespace internal 520 } // namespace sequence_manager 521 } // namespace base 522 523 #endif // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 524