• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
7 
8 #include <atomic>
9 #include <deque>
10 #include <map>
11 #include <memory>
12 #include <optional>
13 #include <set>
14 #include <string>
15 #include <utility>
16 
17 #include "base/atomic_sequence_num.h"
18 #include "base/base_export.h"
19 #include "base/callback_list.h"
20 #include "base/compiler_specific.h"
21 #include "base/containers/circular_deque.h"
22 #include "base/debug/crash_logging.h"
23 #include "base/feature_list.h"
24 #include "base/functional/callback_forward.h"
25 #include "base/memory/raw_ptr.h"
26 #include "base/memory/raw_ptr_exclusion.h"
27 #include "base/memory/scoped_refptr.h"
28 #include "base/memory/weak_ptr.h"
29 #include "base/message_loop/message_pump_type.h"
30 #include "base/observer_list.h"
31 #include "base/pending_task.h"
32 #include "base/rand_util.h"
33 #include "base/run_loop.h"
34 #include "base/synchronization/lock.h"
35 #include "base/task/current_thread.h"
36 #include "base/task/sequence_manager/associated_thread_id.h"
37 #include "base/task/sequence_manager/enqueue_order.h"
38 #include "base/task/sequence_manager/enqueue_order_generator.h"
39 #include "base/task/sequence_manager/sequence_manager.h"
40 #include "base/task/sequence_manager/task_queue.h"
41 #include "base/task/sequence_manager/task_queue_impl.h"
42 #include "base/task/sequence_manager/task_queue_selector.h"
43 #include "base/task/sequence_manager/thread_controller.h"
44 #include "base/task/sequence_manager/work_tracker.h"
45 #include "base/task/sequenced_task_runner.h"
46 #include "base/task/single_thread_task_runner.h"
47 #include "base/threading/thread_checker.h"
48 #include "base/time/default_tick_clock.h"
49 #include "base/types/pass_key.h"
50 #include "base/values.h"
51 #include "build/build_config.h"
52 
53 namespace base {
54 
55 namespace internal {
56 class SequenceManagerThreadDelegate;
57 }
58 
59 namespace trace_event {
60 class ConvertableToTraceFormat;
61 }  // namespace trace_event
62 
63 namespace sequence_manager {
64 
65 class SequenceManagerForTest;
66 class TaskQueue;
67 class TaskTimeObserver;
68 class TimeDomain;
69 
70 namespace internal {
71 
72 class TaskQueueImpl;
73 class DefaultWakeUpQueue;
74 class SequenceManagerImpl;
75 class ThreadControllerImpl;
76 
77 // A private factory method for SequenceManagerThreadDelegate which is
78 // equivalent to sequence_manager::CreateUnboundSequenceManager() but returns
79 // the underlying impl.
80 std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl(
81     PassKey<base::internal::SequenceManagerThreadDelegate>,
82     SequenceManager::Settings settings);
83 
84 // The task queue manager provides N task queues and a selector interface for
85 // choosing which task queue to service next. Each task queue consists of two
86 // sub queues:
87 //
88 // 1. Incoming task queue. Tasks that are posted get immediately appended here.
89 //    When a task is appended into an empty incoming queue, the task manager
90 //    work function (DoWork()) is scheduled to run on the main task runner.
91 //
92 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from
93 //    the incoming task queue (if any) are moved here. The work queues are
94 //    registered with the selector as input to the scheduling decision.
95 //
96 class BASE_EXPORT SequenceManagerImpl
97     : public SequenceManager,
98       public internal::SequencedTaskSource,
99       public internal::TaskQueueSelector::Observer,
100       public RunLoop::NestingObserver {
101  public:
102   using Observer = SequenceManager::Observer;
103 
104   SequenceManagerImpl(const SequenceManagerImpl&) = delete;
105   SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete;
106   ~SequenceManagerImpl() override;
107 
108   // Initializes features for this class. See `base::features::Init()`.
109   static void InitializeFeatures();
110 
111   // SequenceManager implementation:
112   void BindToCurrentThread() override;
113   scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override;
114   void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override;
115   void SetObserver(Observer* observer) override;
116   void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
117   void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
118   void SetTimeDomain(TimeDomain* time_domain) override;
119   void ResetTimeDomain() override;
120   const TickClock* GetTickClock() const override;
121   TimeTicks NowTicks() const override;
122   void SetDefaultTaskRunner(
123       scoped_refptr<SingleThreadTaskRunner> task_runner) override;
124   void ReclaimMemory() override;
125   bool GetAndClearSystemIsQuiescentBit() override;
126   void SetWorkBatchSize(int work_batch_size) override;
127   void EnableCrashKeys(const char* async_stack_crash_key) override;
128   const MetricRecordingSettings& GetMetricRecordingSettings() const override;
129   size_t GetPendingTaskCountForTesting() const override;
130   TaskQueue::Handle CreateTaskQueue(const TaskQueue::Spec& spec) override;
131   std::string DescribeAllPendingTasks() const override;
132   void PrioritizeYieldingToNative(base::TimeTicks prioritize_until) override;
133   void AddTaskObserver(TaskObserver* task_observer) override;
134   void RemoveTaskObserver(TaskObserver* task_observer) override;
135   std::optional<WakeUp> GetNextDelayedWakeUp() const override;
136   TaskQueue::QueuePriority GetPriorityCount() const override;
137 
138   // SequencedTaskSource implementation:
139   void SetRunTaskSynchronouslyAllowed(
140       bool can_run_tasks_synchronously) override;
141   std::optional<SelectedTask> SelectNextTask(
142       LazyNow& lazy_now,
143       SelectTaskOption option = SelectTaskOption::kDefault) override;
144   void DidRunTask(LazyNow& lazy_now) override;
145   std::optional<WakeUp> GetPendingWakeUp(
146       LazyNow* lazy_now,
147       SelectTaskOption option = SelectTaskOption::kDefault) override;
148   bool HasPendingHighResolutionTasks() override;
149   void OnBeginWork() override;
150   bool OnIdle() override;
151   void MaybeEmitTaskDetails(
152       perfetto::EventContext& ctx,
153       const SequencedTaskSource::SelectedTask& selected_task) const override;
154 
155   void AddDestructionObserver(
156       CurrentThread::DestructionObserver* destruction_observer);
157   void RemoveDestructionObserver(
158       CurrentThread::DestructionObserver* destruction_observer);
159   [[nodiscard]] CallbackListSubscription RegisterOnNextIdleCallback(
160       OnceClosure on_next_idle_callback);
161 
162   // Sets / returns the default TaskRunner. Thread-safe.
163   void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner);
164   scoped_refptr<SingleThreadTaskRunner> GetTaskRunner();
165 
166   bool IsBoundToCurrentThread() const;
167   MessagePump* GetMessagePump() const;
168   bool IsType(MessagePumpType type) const;
169   void SetAddQueueTimeToTasks(bool enable);
170   void SetTaskExecutionAllowedInNativeNestedLoop(bool allowed);
171   bool IsTaskExecutionAllowedInNativeNestedLoop() const;
172 #if BUILDFLAG(IS_IOS)
173   void AttachToMessagePump();
174 #endif
175   bool IsIdleForTesting() override;
176   void EnableMessagePumpTimeKeeperMetrics(
177       const char* thread_name,
178       bool wall_time_based_metrics_enabled_for_testing = false);
179 
180   // Requests that a task to process work is scheduled.
181   void ScheduleWork();
182 
183   // Returns the currently executing TaskQueue if any. Must be called on the
184   // thread this class was created on.
185   internal::TaskQueueImpl* currently_executing_task_queue() const;
186 
187   // Unregisters a TaskQueue previously created by |NewTaskQueue()|.
188   // No tasks will run on this queue after this call.
189   void UnregisterTaskQueueImpl(
190       std::unique_ptr<internal::TaskQueueImpl> task_queue);
191 
associated_thread()192   scoped_refptr<AssociatedThreadId> associated_thread() const {
193     return associated_thread_;
194   }
195 
settings()196   const Settings& settings() const LIFETIME_BOUND { return settings_; }
197 
198   WeakPtr<SequenceManagerImpl> GetWeakPtr();
199 
200   // How frequently to perform housekeeping tasks (sweeping canceled tasks etc).
201   static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30);
202 
203  protected:
204   static std::unique_ptr<ThreadControllerImpl>
205   CreateThreadControllerImplForCurrentThread(const TickClock* clock);
206 
207   // Create a task queue manager where |controller| controls the thread
208   // on which the tasks are eventually run.
209   SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller,
210                       SequenceManager::Settings settings = Settings());
211 
212   friend class internal::TaskQueueImpl;
213   friend class internal::DefaultWakeUpQueue;
214   friend class ::base::sequence_manager::SequenceManagerForTest;
215 
216  private:
217   // Returns the SequenceManager running the
218   // current thread. It must only be used on the thread it was obtained.
219   // Only to be used by CurrentThread for the moment
220   static SequenceManagerImpl* GetCurrent();
221   friend class ::base::CurrentThread;
222 
223   // Factory friends to call into private creation methods.
224   friend std::unique_ptr<SequenceManager>
225       sequence_manager::CreateSequenceManagerOnCurrentThread(
226           SequenceManager::Settings);
227   friend std::unique_ptr<SequenceManager>
228   sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump(
229       std::unique_ptr<MessagePump> message_pump,
230       SequenceManager::Settings);
231   friend std::unique_ptr<SequenceManager>
232       sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings);
233   friend std::unique_ptr<SequenceManagerImpl>
234       sequence_manager::internal::CreateUnboundSequenceManagerImpl(
235           PassKey<base::internal::SequenceManagerThreadDelegate>,
236           SequenceManager::Settings);
237 
238   // Assume direct control over current thread and create a SequenceManager.
239   // This function should be called only once per thread.
240   // This function assumes that a task execution environment is already
241   // initialized for the current thread.
242   static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread(
243       SequenceManager::Settings settings);
244 
245   // Create an unbound SequenceManager (typically for a future thread). The
246   // SequenceManager can be initialized on the current thread and then needs to
247   // be bound and initialized on the target thread by calling one of the Bind*()
248   // methods.
249   static std::unique_ptr<SequenceManagerImpl> CreateUnbound(
250       SequenceManager::Settings settings);
251 
252   enum class ProcessTaskResult {
253     kDeferred,
254     kExecuted,
255     kSequenceManagerDeleted,
256   };
257 
258   // SequenceManager maintains a queue of non-nestable tasks since they're
259   // uncommon and allocating an extra deque per TaskQueue will waste the memory.
260   using NonNestableTaskDeque =
261       circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>;
262 
263   // We have to track rentrancy because we support nested runloops but the
264   // selector interface is unaware of those.  This struct keeps track off all
265   // task related state needed to make pairs of SelectNextTask() / DidRunTask()
266   // work.
267   struct ExecutingTask {
ExecutingTaskExecutingTask268     ExecutingTask(Task&& task,
269                   internal::TaskQueueImpl* task_queue,
270                   TaskQueue::TaskTiming task_timing)
271         : pending_task(std::move(task)),
272           task_queue(task_queue),
273           task_queue_name(task_queue->GetProtoName()),
274           task_timing(task_timing),
275           priority(task_queue->GetQueuePriority()),
276           task_type(pending_task.task_type) {}
277 
278     Task pending_task;
279 
280     // `task_queue` is not a raw_ptr<...> for performance reasons (based on
281     // analysis of sampling profiler data and tab_search:top100:2020).
282     RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr;
283     // Save task_queue_name as the task queue can be deleted within the task.
284     QueueName task_queue_name;
285     TaskQueue::TaskTiming task_timing;
286     // Save priority as it might change after running a task.
287     TaskQueue::QueuePriority priority;
288     // Save task metadata to use in after running a task as |pending_task|
289     // won't be available then.
290     int task_type;
291   };
292 
293   struct MainThreadOnly {
294     explicit MainThreadOnly(
295         SequenceManagerImpl* sequence_manager,
296         const scoped_refptr<AssociatedThreadId>& associated_thread,
297         const SequenceManager::Settings& settings,
298         const base::TickClock* clock);
299     ~MainThreadOnly();
300 
301     int nesting_depth = 0;
302     NonNestableTaskDeque non_nestable_task_queue;
303     // TODO(altimin): Switch to instruction pointer crash key when it's
304     // available.
305     raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr;
306     raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr;
307     raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr;
308     std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)>
309         async_stack_buffer = {};
310 
311     std::optional<base::MetricsSubSampler> metrics_subsampler;
312 
313     internal::TaskQueueSelector selector;
314     // RAW_PTR_EXCLUSION: Performance reasons(based on analysis of
315     // speedometer3).
316     ObserverList<TaskObserver>::UncheckedAndRawPtrExcluded task_observers;
317     ObserverList<TaskTimeObserver> task_time_observers;
318     const raw_ptr<const base::TickClock> default_clock;
319     raw_ptr<TimeDomain> time_domain = nullptr;
320 
321     std::unique_ptr<WakeUpQueue> wake_up_queue;
322     std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue;
323 
324     // If true MaybeReclaimMemory will attempt to reclaim memory.
325     bool memory_reclaim_scheduled = false;
326 
327     // Used to ensure we don't perform expensive housekeeping too frequently.
328     TimeTicks next_time_to_reclaim_memory;
329 
330     // List of task queues managed by this SequenceManager.
331     // - active_queues contains queues that are still running tasks, which are
332     //   are owned by relevant TaskQueues.
333     // - queues_to_delete contains soon-to-be-deleted queues, because some
334     //   internal scheduling code does not expect queues to be pulled
335     //   from underneath.
336 
337     // RAW_PTR_EXCLUSION: Performance reasons (based on analysis of
338     // speedometer3).
339     RAW_PTR_EXCLUSION std::set<internal::TaskQueueImpl*> active_queues;
340 
341     std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>>
342         queues_to_delete;
343 
344     bool task_was_run_on_quiescence_monitored_queue = false;
345     bool nesting_observer_registered_ = false;
346 
347     // Use std::deque() so that references returned by SelectNextTask() remain
348     // valid until the matching call to DidRunTask(), even when nested RunLoops
349     // cause tasks to be pushed on the stack in-between. This is needed because
350     // references are kept in local variables by calling code between
351     // SelectNextTask()/DidRunTask().
352     std::deque<ExecutingTask> task_execution_stack;
353 
354     raw_ptr<Observer> observer = nullptr;  // NOT OWNED
355 
356     ObserverList<CurrentThread::DestructionObserver>::
357         UncheckedAndDanglingUntriaged destruction_observers;
358 
359     // Notified the next time `OnIdle()` completes without scheduling additional
360     // work.
361     OnceClosureList on_next_idle_callbacks;
362   };
363 
364   void CompleteInitializationOnBoundThread();
365 
366   // TaskQueueSelector::Observer:
367   void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override;
368   void OnWorkAvailable() override;
369 
370   // RunLoop::NestingObserver:
371   void OnBeginNestedRunLoop() override;
372   void OnExitNestedRunLoop() override;
373 
374   // Schedules next wake-up at the given time, canceling any previous requests.
375   // Use std::nullopt to cancel a wake-up. Must be called on the thread this
376   // class was created on.
377   void SetNextWakeUp(LazyNow* lazy_now, std::optional<WakeUp> wake_up);
378 
379   // Called before TaskQueue requests to reload its empty immediate work queue.
380   void WillRequestReloadImmediateWorkQueue();
381 
382   // Returns a valid `SyncWorkAuthorization` if a call to `RunOrPostTask` on a
383   // `SequencedTaskRunner` bound to this `SequenceManager` may run its task
384   // synchronously.
385   SyncWorkAuthorization TryAcquireSyncWorkAuthorization();
386 
387   // Called when a task is about to be queued. May add metadata to the task and
388   // emit trace events.
389   void WillQueueTask(Task* pending_task);
390 
391   // Enqueues onto delayed WorkQueues all delayed tasks which must run now
392   // (cannot be postponed) and possibly some delayed tasks which can run now but
393   // could be postponed (due to how tasks are stored, it is not possible to
394   // retrieve all such tasks efficiently) and reloads any empty work queues.
395   void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now);
396 
397   void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task);
398   void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task);
399 
400   EnqueueOrder GetNextSequenceNumber();
401 
402   bool GetAddQueueTimeToTasks();
403 
404   std::unique_ptr<trace_event::ConvertableToTraceFormat>
405   AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue,
406                                       bool force_verbose) const;
407   Value::Dict AsValueWithSelectorResult(
408       internal::WorkQueue* selected_work_queue,
409       bool force_verbose) const;
410 
411   // Used in construction of TaskQueueImpl to obtain an AtomicFlag which it can
412   // use to request reload by ReloadEmptyWorkQueues. The lifetime of
413   // TaskQueueImpl is managed by this class and the handle will be released by
414   // TaskQueueImpl::UnregisterTaskQueue which is always called before the
415   // queue's destruction.
416   AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue(
417       TaskQueueImpl* task_queue);
418 
419   // Calls |TakeImmediateIncomingQueueTasks| on all queues with their reload
420   // flag set in |empty_queues_to_reload_|.
421   void ReloadEmptyWorkQueues();
422 
423   std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl(
424       const TaskQueue::Spec& spec);
425 
426   // Periodically reclaims memory by sweeping away canceled tasks and shrinking
427   // buffers.
428   void MaybeReclaimMemory();
429 
430   // Deletes queues marked for deletion and empty queues marked for shutdown.
431   void CleanUpQueues();
432 
433   // Removes canceled delayed tasks from the front of wake up queue.
434   void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now);
435 
436   TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming(
437       const internal::TaskQueueImpl* task_queue);
438   bool ShouldRecordCPUTimeForTask();
439 
440   // Write the async stack trace onto a crash key as whitespace-delimited hex
441   // addresses.
442   void RecordCrashKeys(const PendingTask&);
443 
444   // Helper to terminate all scoped trace events to allow starting new ones
445   // in SelectNextTask().
446   std::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now,
447                                                  SelectTaskOption option);
448 
449   // Returns a wake-up for the next delayed task which is not ripe for
450   // execution, or nullopt if `option` is `kSkipDelayedTask` or there
451   // are no such tasks (immediate tasks don't count).
452   std::optional<WakeUp> GetNextDelayedWakeUpWithOption(
453       SelectTaskOption option) const;
454 
455   // Given a `wake_up` describing when the next delayed task should run, returns
456   // a wake up that should be scheduled on the thread. `is_immediate()` if the
457   // wake up should run immediately. `nullopt` if no wake up is required because
458   // `wake_up` is `nullopt` or a `time_domain` is used.
459   std::optional<WakeUp> AdjustWakeUp(std::optional<WakeUp> wake_up,
460                                      LazyNow* lazy_now) const;
461 
462   void MaybeAddLeewayToTask(Task& task) const;
463 
464 #if DCHECK_IS_ON()
465   void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const;
466 #endif
467 
468   // Determines if wall time or thread time should be recorded for the next
469   // task.
470   TaskQueue::TaskTiming InitializeTaskTiming(
471       internal::TaskQueueImpl* task_queue);
472 
473   const scoped_refptr<AssociatedThreadId> associated_thread_;
474 
475   EnqueueOrderGenerator enqueue_order_generator_;
476 
477   const std::unique_ptr<internal::ThreadController> controller_;
478   const Settings settings_;
479 
480   const MetricRecordingSettings metric_recording_settings_;
481 
482   WorkTracker work_tracker_;
483 
484   // Whether to add the queue time to tasks.
485   std::atomic<bool> add_queue_time_to_tasks_;
486 
487   AtomicFlagSet empty_queues_to_reload_;
488 
489   MainThreadOnly main_thread_only_;
main_thread_only()490   MainThreadOnly& main_thread_only() {
491     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
492     return main_thread_only_;
493   }
main_thread_only()494   const MainThreadOnly& main_thread_only() const LIFETIME_BOUND {
495     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
496     return main_thread_only_;
497   }
498 
499   // |clock_| either refers to the TickClock representation of |time_domain|
500   // (same object) if any, or to |default_clock| otherwise. It is maintained as
501   // an atomic pointer here for multi-threaded usage.
502   std::atomic<const base::TickClock*> clock_;
main_thread_clock()503   const base::TickClock* main_thread_clock() const {
504     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
505     return clock_.load(std::memory_order_relaxed);
506   }
any_thread_clock()507   const base::TickClock* any_thread_clock() const {
508     // |memory_order_acquire| matched by |memory_order_release| in
509     // SetTimeDomain() to ensure all data used by |clock_| is visible when read
510     // from the current thread. A thread might try to access a stale |clock_|
511     // but that's not an issue since |time_domain| contractually outlives
512     // SequenceManagerImpl even if it's reset.
513     return clock_.load(std::memory_order_acquire);
514   }
515 
516   WeakPtrFactory<SequenceManagerImpl> weak_factory_{this};
517 };
518 
519 }  // namespace internal
520 }  // namespace sequence_manager
521 }  // namespace base
522 
523 #endif  // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
524