1 // Copyright 2018 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 7 8 #include <deque> 9 #include <map> 10 #include <memory> 11 #include <set> 12 #include <string> 13 #include <utility> 14 15 #include "base/atomic_sequence_num.h" 16 #include "base/base_export.h" 17 #include "base/cancelable_callback.h" 18 #include "base/containers/circular_deque.h" 19 #include "base/debug/crash_logging.h" 20 #include "base/feature_list.h" 21 #include "base/functional/callback_forward.h" 22 #include "base/memory/raw_ptr.h" 23 #include "base/memory/scoped_refptr.h" 24 #include "base/memory/weak_ptr.h" 25 #include "base/message_loop/message_pump_type.h" 26 #include "base/observer_list.h" 27 #include "base/pending_task.h" 28 #include "base/rand_util.h" 29 #include "base/run_loop.h" 30 #include "base/synchronization/lock.h" 31 #include "base/task/current_thread.h" 32 #include "base/task/sequence_manager/associated_thread_id.h" 33 #include "base/task/sequence_manager/enqueue_order.h" 34 #include "base/task/sequence_manager/enqueue_order_generator.h" 35 #include "base/task/sequence_manager/sequence_manager.h" 36 #include "base/task/sequence_manager/task_queue.h" 37 #include "base/task/sequence_manager/task_queue_impl.h" 38 #include "base/task/sequence_manager/task_queue_selector.h" 39 #include "base/task/sequence_manager/thread_controller.h" 40 #include "base/task/sequenced_task_runner.h" 41 #include "base/task/single_thread_task_runner.h" 42 #include "base/threading/thread_checker.h" 43 #include "base/time/default_tick_clock.h" 44 #include "base/types/pass_key.h" 45 #include "base/values.h" 46 #include "build/build_config.h" 47 #include "third_party/abseil-cpp/absl/types/optional.h" 48 49 namespace base { 50 51 namespace internal { 52 class SequenceManagerThreadDelegate; 53 } 54 55 namespace trace_event { 56 class ConvertableToTraceFormat; 57 } // namespace trace_event 58 59 namespace sequence_manager { 60 61 class SequenceManagerForTest; 62 class TaskQueue; 63 class TaskTimeObserver; 64 class TimeDomain; 65 66 namespace internal { 67 68 class TaskQueueImpl; 69 class DefaultWakeUpQueue; 70 class SequenceManagerImpl; 71 class ThreadControllerImpl; 72 73 // A private factory method for SequenceManagerThreadDelegate which is 74 // equivalent to sequence_manager::CreateUnboundSequenceManager() but returns 75 // the underlying impl. 76 std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl( 77 PassKey<base::internal::SequenceManagerThreadDelegate>, 78 SequenceManager::Settings settings); 79 80 // The task queue manager provides N task queues and a selector interface for 81 // choosing which task queue to service next. Each task queue consists of two 82 // sub queues: 83 // 84 // 1. Incoming task queue. Tasks that are posted get immediately appended here. 85 // When a task is appended into an empty incoming queue, the task manager 86 // work function (DoWork()) is scheduled to run on the main task runner. 87 // 88 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from 89 // the incoming task queue (if any) are moved here. The work queues are 90 // registered with the selector as input to the scheduling decision. 91 // 92 class BASE_EXPORT SequenceManagerImpl 93 : public SequenceManager, 94 public internal::SequencedTaskSource, 95 public internal::TaskQueueSelector::Observer, 96 public RunLoop::NestingObserver { 97 public: 98 using Observer = SequenceManager::Observer; 99 100 SequenceManagerImpl(const SequenceManagerImpl&) = delete; 101 SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete; 102 ~SequenceManagerImpl() override; 103 104 // Initializes the state of all the sequence manager features. Must be invoked 105 // after FeatureList initialization. 106 static void InitializeFeatures(); 107 108 // SequenceManager implementation: 109 void BindToCurrentThread() override; 110 scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override; 111 void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override; 112 void SetObserver(Observer* observer) override; 113 void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 114 void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 115 void SetTimeDomain(TimeDomain* time_domain) override; 116 void ResetTimeDomain() override; 117 const TickClock* GetTickClock() const override; 118 TimeTicks NowTicks() const override; 119 void SetDefaultTaskRunner( 120 scoped_refptr<SingleThreadTaskRunner> task_runner) override; 121 void ReclaimMemory() override; 122 bool GetAndClearSystemIsQuiescentBit() override; 123 void SetWorkBatchSize(int work_batch_size) override; 124 void EnableCrashKeys(const char* async_stack_crash_key) override; 125 const MetricRecordingSettings& GetMetricRecordingSettings() const override; 126 size_t GetPendingTaskCountForTesting() const override; 127 TaskQueue::Handle CreateTaskQueue(const TaskQueue::Spec& spec) override; 128 std::string DescribeAllPendingTasks() const override; 129 void PrioritizeYieldingToNative(base::TimeTicks prioritize_until) override; 130 void AddTaskObserver(TaskObserver* task_observer) override; 131 void RemoveTaskObserver(TaskObserver* task_observer) override; 132 absl::optional<WakeUp> GetNextDelayedWakeUp() const override; 133 TaskQueue::QueuePriority GetPriorityCount() const override; 134 135 // SequencedTaskSource implementation: 136 absl::optional<SelectedTask> SelectNextTask( 137 LazyNow& lazy_now, 138 SelectTaskOption option = SelectTaskOption::kDefault) override; 139 void DidRunTask(LazyNow& lazy_now) override; 140 absl::optional<WakeUp> GetPendingWakeUp( 141 LazyNow* lazy_now, 142 SelectTaskOption option = SelectTaskOption::kDefault) override; 143 bool HasPendingHighResolutionTasks() override; 144 bool OnSystemIdle() override; 145 void MaybeEmitTaskDetails( 146 perfetto::EventContext& ctx, 147 const SequencedTaskSource::SelectedTask& selected_task) const override; 148 149 void AddDestructionObserver( 150 CurrentThread::DestructionObserver* destruction_observer); 151 void RemoveDestructionObserver( 152 CurrentThread::DestructionObserver* destruction_observer); 153 void RegisterOnNextIdleCallback(OnceClosure on_next_idle_callback); 154 // TODO(alexclarke): Remove this as part of https://crbug.com/825327. 155 void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner); 156 // TODO(alexclarke): Remove this as part of https://crbug.com/825327. 157 scoped_refptr<SingleThreadTaskRunner> GetTaskRunner(); 158 bool IsBoundToCurrentThread() const; 159 MessagePump* GetMessagePump() const; 160 bool IsType(MessagePumpType type) const; 161 void SetAddQueueTimeToTasks(bool enable); 162 void SetTaskExecutionAllowed(bool allowed); 163 bool IsTaskExecutionAllowed() const; 164 #if BUILDFLAG(IS_IOS) 165 void AttachToMessagePump(); 166 #endif 167 bool IsIdleForTesting() override; 168 void EnableMessagePumpTimeKeeperMetrics(const char* thread_name); 169 170 // Requests that a task to process work is scheduled. 171 void ScheduleWork(); 172 173 // Returns the currently executing TaskQueue if any. Must be called on the 174 // thread this class was created on. 175 internal::TaskQueueImpl* currently_executing_task_queue() const; 176 177 // Unregisters a TaskQueue previously created by |NewTaskQueue()|. 178 // No tasks will run on this queue after this call. 179 void UnregisterTaskQueueImpl( 180 std::unique_ptr<internal::TaskQueueImpl> task_queue); 181 associated_thread()182 scoped_refptr<const AssociatedThreadId> associated_thread() const { 183 return associated_thread_; 184 } 185 settings()186 const Settings& settings() const { return settings_; } 187 188 WeakPtr<SequenceManagerImpl> GetWeakPtr(); 189 190 // How frequently to perform housekeeping tasks (sweeping canceled tasks etc). 191 static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30); 192 193 protected: 194 static std::unique_ptr<ThreadControllerImpl> 195 CreateThreadControllerImplForCurrentThread(const TickClock* clock); 196 197 // Create a task queue manager where |controller| controls the thread 198 // on which the tasks are eventually run. 199 SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller, 200 SequenceManager::Settings settings = Settings()); 201 202 friend class internal::TaskQueueImpl; 203 friend class internal::DefaultWakeUpQueue; 204 friend class ::base::sequence_manager::SequenceManagerForTest; 205 206 private: 207 // Returns the SequenceManager running the 208 // current thread. It must only be used on the thread it was obtained. 209 // Only to be used by CurrentThread for the moment 210 static SequenceManagerImpl* GetCurrent(); 211 friend class ::base::CurrentThread; 212 213 // Factory friends to call into private creation methods. 214 friend std::unique_ptr<SequenceManager> 215 sequence_manager::CreateSequenceManagerOnCurrentThread( 216 SequenceManager::Settings); 217 friend std::unique_ptr<SequenceManager> 218 sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump( 219 std::unique_ptr<MessagePump> message_pump, 220 SequenceManager::Settings); 221 friend std::unique_ptr<SequenceManager> 222 sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings); 223 friend std::unique_ptr<SequenceManagerImpl> 224 sequence_manager::internal::CreateUnboundSequenceManagerImpl( 225 PassKey<base::internal::SequenceManagerThreadDelegate>, 226 SequenceManager::Settings); 227 228 // Assume direct control over current thread and create a SequenceManager. 229 // This function should be called only once per thread. 230 // This function assumes that a task execution environment is already 231 // initialized for the current thread. 232 static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread( 233 SequenceManager::Settings settings); 234 235 // Create an unbound SequenceManager (typically for a future thread). The 236 // SequenceManager can be initialized on the current thread and then needs to 237 // be bound and initialized on the target thread by calling one of the Bind*() 238 // methods. 239 static std::unique_ptr<SequenceManagerImpl> CreateUnbound( 240 SequenceManager::Settings settings); 241 242 enum class ProcessTaskResult { 243 kDeferred, 244 kExecuted, 245 kSequenceManagerDeleted, 246 }; 247 248 // SequenceManager maintains a queue of non-nestable tasks since they're 249 // uncommon and allocating an extra deque per TaskQueue will waste the memory. 250 using NonNestableTaskDeque = 251 circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>; 252 253 // We have to track rentrancy because we support nested runloops but the 254 // selector interface is unaware of those. This struct keeps track off all 255 // task related state needed to make pairs of SelectNextTask() / DidRunTask() 256 // work. 257 struct ExecutingTask { ExecutingTaskExecutingTask258 ExecutingTask(Task&& task, 259 internal::TaskQueueImpl* task_queue, 260 TaskQueue::TaskTiming task_timing) 261 : pending_task(std::move(task)), 262 task_queue(task_queue), 263 task_queue_name(task_queue->GetProtoName()), 264 task_timing(task_timing), 265 priority(task_queue->GetQueuePriority()), 266 task_type(pending_task.task_type) {} 267 268 Task pending_task; 269 270 // `task_queue` is not a raw_ptr<...> for performance reasons (based on 271 // analysis of sampling profiler data and tab_search:top100:2020). 272 RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr; 273 // Save task_queue_name as the task queue can be deleted within the task. 274 QueueName task_queue_name; 275 TaskQueue::TaskTiming task_timing; 276 // Save priority as it might change after running a task. 277 TaskQueue::QueuePriority priority; 278 // Save task metadata to use in after running a task as |pending_task| 279 // won't be available then. 280 int task_type; 281 }; 282 283 struct MainThreadOnly { 284 explicit MainThreadOnly( 285 SequenceManagerImpl* sequence_manager, 286 const scoped_refptr<AssociatedThreadId>& associated_thread, 287 const SequenceManager::Settings& settings, 288 const base::TickClock* clock); 289 ~MainThreadOnly(); 290 291 int nesting_depth = 0; 292 NonNestableTaskDeque non_nestable_task_queue; 293 // TODO(altimin): Switch to instruction pointer crash key when it's 294 // available. 295 raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr; 296 raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr; 297 raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr; 298 std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)> 299 async_stack_buffer = {}; 300 301 absl::optional<base::MetricsSubSampler> metrics_subsampler; 302 303 internal::TaskQueueSelector selector; 304 ObserverList<TaskObserver>::Unchecked task_observers; 305 ObserverList<TaskTimeObserver>::Unchecked task_time_observers; 306 const raw_ptr<const base::TickClock> default_clock; 307 raw_ptr<TimeDomain> time_domain = nullptr; 308 309 std::unique_ptr<WakeUpQueue> wake_up_queue; 310 std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue; 311 312 // If true MaybeReclaimMemory will attempt to reclaim memory. 313 bool memory_reclaim_scheduled = false; 314 315 // Used to ensure we don't perform expensive housekeeping too frequently. 316 TimeTicks next_time_to_reclaim_memory; 317 318 // List of task queues managed by this SequenceManager. 319 // - active_queues contains queues that are still running tasks, which are 320 // are owned by relevant TaskQueues. 321 // - queues_to_delete contains soon-to-be-deleted queues, because some 322 // internal scheduling code does not expect queues to be pulled 323 // from underneath. 324 325 std::set<internal::TaskQueueImpl*> active_queues; 326 327 std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>> 328 queues_to_delete; 329 330 bool task_was_run_on_quiescence_monitored_queue = false; 331 bool nesting_observer_registered_ = false; 332 333 // Use std::deque() so that references returned by SelectNextTask() remain 334 // valid until the matching call to DidRunTask(), even when nested RunLoops 335 // cause tasks to be pushed on the stack in-between. This is needed because 336 // references are kept in local variables by calling code between 337 // SelectNextTask()/DidRunTask(). 338 std::deque<ExecutingTask> task_execution_stack; 339 340 raw_ptr<Observer> observer = nullptr; // NOT OWNED 341 342 ObserverList<CurrentThread::DestructionObserver>::Unchecked 343 destruction_observers; 344 345 // If non-null, invoked the next time OnSystemIdle() completes without 346 // scheduling additional work. 347 OnceClosure on_next_idle_callback; 348 }; 349 350 void CompleteInitializationOnBoundThread(); 351 352 // TaskQueueSelector::Observer: 353 void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override; 354 355 // RunLoop::NestingObserver: 356 void OnBeginNestedRunLoop() override; 357 void OnExitNestedRunLoop() override; 358 359 // Schedules next wake-up at the given time, canceling any previous requests. 360 // Use absl::nullopt to cancel a wake-up. Must be called on the thread this 361 // class was created on. 362 void SetNextWakeUp(LazyNow* lazy_now, absl::optional<WakeUp> wake_up); 363 364 // Called by the task queue to inform this SequenceManager of a task that's 365 // about to be queued. This SequenceManager may use this opportunity to add 366 // metadata to |pending_task| before it is moved into the queue. 367 void WillQueueTask(Task* pending_task); 368 369 // Enqueues onto delayed WorkQueues all delayed tasks which must run now 370 // (cannot be postponed) and possibly some delayed tasks which can run now but 371 // could be postponed (due to how tasks are stored, it is not possible to 372 // retrieve all such tasks efficiently) and reloads any empty work queues. 373 void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now); 374 375 void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task); 376 void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task); 377 378 EnqueueOrder GetNextSequenceNumber(); 379 380 bool GetAddQueueTimeToTasks(); 381 382 std::unique_ptr<trace_event::ConvertableToTraceFormat> 383 AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue, 384 bool force_verbose) const; 385 Value::Dict AsValueWithSelectorResult( 386 internal::WorkQueue* selected_work_queue, 387 bool force_verbose) const; 388 389 // Used in construction of TaskQueueImpl to obtain an AtomicFlag which it can 390 // use to request reload by ReloadEmptyWorkQueues. The lifetime of 391 // TaskQueueImpl is managed by this class and the handle will be released by 392 // TaskQueueImpl::UnregisterTaskQueue which is always called before the 393 // queue's destruction. 394 AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue( 395 TaskQueueImpl* task_queue); 396 397 // Calls |TakeImmediateIncomingQueueTasks| on all queues with their reload 398 // flag set in |empty_queues_to_reload_|. 399 void ReloadEmptyWorkQueues() const; 400 401 std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl( 402 const TaskQueue::Spec& spec); 403 404 // Periodically reclaims memory by sweeping away canceled tasks and shrinking 405 // buffers. 406 void MaybeReclaimMemory(); 407 408 // Deletes queues marked for deletion and empty queues marked for shutdown. 409 void CleanUpQueues(); 410 411 // Removes canceled delayed tasks from the front of wake up queue. 412 void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now); 413 414 TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming( 415 const internal::TaskQueueImpl* task_queue); 416 bool ShouldRecordCPUTimeForTask(); 417 418 // Write the async stack trace onto a crash key as whitespace-delimited hex 419 // addresses. 420 void RecordCrashKeys(const PendingTask&); 421 422 // Helper to terminate all scoped trace events to allow starting new ones 423 // in SelectNextTask(). 424 absl::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now, 425 SelectTaskOption option); 426 427 // Returns a wake-up for the next delayed task which is not ripe for 428 // execution, or nullopt if `option` is `kSkipDelayedTask` or there 429 // are no such tasks (immediate tasks don't count). 430 absl::optional<WakeUp> GetNextDelayedWakeUpWithOption( 431 SelectTaskOption option) const; 432 433 // Given a `wake_up` describing when the next delayed task should run, returns 434 // a wake up that should be scheduled on the thread. `is_immediate()` if the 435 // wake up should run immediately. `nullopt` if no wake up is required because 436 // `wake_up` is `nullopt` or a `time_domain` is used. 437 absl::optional<WakeUp> AdjustWakeUp(absl::optional<WakeUp> wake_up, 438 LazyNow* lazy_now) const; 439 440 void MaybeAddLeewayToTask(Task& task) const; 441 442 #if DCHECK_IS_ON() 443 void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const; 444 #endif 445 446 // Determines if wall time or thread time should be recorded for the next 447 // task. 448 TaskQueue::TaskTiming InitializeTaskTiming( 449 internal::TaskQueueImpl* task_queue); 450 451 const scoped_refptr<AssociatedThreadId> associated_thread_; 452 453 EnqueueOrderGenerator enqueue_order_generator_; 454 455 const std::unique_ptr<internal::ThreadController> controller_; 456 const Settings settings_; 457 458 const MetricRecordingSettings metric_recording_settings_; 459 460 // Whether to add the queue time to tasks. 461 base::subtle::Atomic32 add_queue_time_to_tasks_; 462 463 AtomicFlagSet empty_queues_to_reload_; 464 465 MainThreadOnly main_thread_only_; main_thread_only()466 MainThreadOnly& main_thread_only() { 467 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 468 return main_thread_only_; 469 } main_thread_only()470 const MainThreadOnly& main_thread_only() const { 471 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 472 return main_thread_only_; 473 } 474 475 // |clock_| either refers to the TickClock representation of |time_domain| 476 // (same object) if any, or to |default_clock| otherwise. It is maintained as 477 // an atomic pointer here for multi-threaded usage. 478 std::atomic<const base::TickClock*> clock_; main_thread_clock()479 const base::TickClock* main_thread_clock() const { 480 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); 481 return clock_.load(std::memory_order_relaxed); 482 } any_thread_clock()483 const base::TickClock* any_thread_clock() const { 484 // |memory_order_acquire| matched by |memory_order_release| in 485 // SetTimeDomain() to ensure all data used by |clock_| is visible when read 486 // from the current thread. A thread might try to access a stale |clock_| 487 // but that's not an issue since |time_domain| contractually outlives 488 // SequenceManagerImpl even if it's reset. 489 return clock_.load(std::memory_order_acquire); 490 } 491 492 WeakPtrFactory<SequenceManagerImpl> weak_factory_{this}; 493 }; 494 495 } // namespace internal 496 } // namespace sequence_manager 497 } // namespace base 498 499 #endif // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 500