• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
7 
8 #include <deque>
9 #include <map>
10 #include <memory>
11 #include <set>
12 #include <string>
13 #include <utility>
14 
15 #include "base/atomic_sequence_num.h"
16 #include "base/base_export.h"
17 #include "base/cancelable_callback.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/debug/crash_logging.h"
20 #include "base/feature_list.h"
21 #include "base/functional/callback_forward.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/scoped_refptr.h"
24 #include "base/memory/weak_ptr.h"
25 #include "base/message_loop/message_pump_type.h"
26 #include "base/observer_list.h"
27 #include "base/pending_task.h"
28 #include "base/rand_util.h"
29 #include "base/run_loop.h"
30 #include "base/synchronization/lock.h"
31 #include "base/task/current_thread.h"
32 #include "base/task/sequence_manager/associated_thread_id.h"
33 #include "base/task/sequence_manager/enqueue_order.h"
34 #include "base/task/sequence_manager/enqueue_order_generator.h"
35 #include "base/task/sequence_manager/sequence_manager.h"
36 #include "base/task/sequence_manager/task_queue_impl.h"
37 #include "base/task/sequence_manager/task_queue_selector.h"
38 #include "base/task/sequence_manager/thread_controller.h"
39 #include "base/task/sequenced_task_runner.h"
40 #include "base/task/single_thread_task_runner.h"
41 #include "base/threading/thread_checker.h"
42 #include "base/time/default_tick_clock.h"
43 #include "base/types/pass_key.h"
44 #include "base/values.h"
45 #include "build/build_config.h"
46 #include "third_party/abseil-cpp/absl/types/optional.h"
47 
48 namespace base {
49 
50 namespace internal {
51 class SequenceManagerThreadDelegate;
52 }
53 
54 namespace trace_event {
55 class ConvertableToTraceFormat;
56 }  // namespace trace_event
57 
58 namespace sequence_manager {
59 
60 class SequenceManagerForTest;
61 class TaskQueue;
62 class TaskTimeObserver;
63 class TimeDomain;
64 
65 namespace internal {
66 
67 class TaskQueueImpl;
68 class DefaultWakeUpQueue;
69 class SequenceManagerImpl;
70 class ThreadControllerImpl;
71 
72 // A private factory method for SequenceManagerThreadDelegate which is
73 // equivalent to sequence_manager::CreateUnboundSequenceManager() but returns
74 // the underlying impl.
75 std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl(
76     PassKey<base::internal::SequenceManagerThreadDelegate>,
77     SequenceManager::Settings settings);
78 
79 // The task queue manager provides N task queues and a selector interface for
80 // choosing which task queue to service next. Each task queue consists of two
81 // sub queues:
82 //
83 // 1. Incoming task queue. Tasks that are posted get immediately appended here.
84 //    When a task is appended into an empty incoming queue, the task manager
85 //    work function (DoWork()) is scheduled to run on the main task runner.
86 //
87 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from
88 //    the incoming task queue (if any) are moved here. The work queues are
89 //    registered with the selector as input to the scheduling decision.
90 //
91 class BASE_EXPORT SequenceManagerImpl
92     : public SequenceManager,
93       public internal::SequencedTaskSource,
94       public internal::TaskQueueSelector::Observer,
95       public RunLoop::NestingObserver {
96  public:
97   using Observer = SequenceManager::Observer;
98 
99   SequenceManagerImpl(const SequenceManagerImpl&) = delete;
100   SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete;
101   ~SequenceManagerImpl() override;
102 
103   // Initializes the state of all the sequence manager features. Must be invoked
104   // after FeatureList initialization.
105   static void InitializeFeatures();
106 
107   // Sets the global cached state of the NoWakeUpsForCanceledTasks feature
108   // according to its enabled state. Must be invoked after FeatureList
109   // initialization.
110   static void ApplyNoWakeUpsForCanceledTasks();
111 
112   // Resets the global cached state of the NoWakeUpsForCanceledTasks feature
113   // according to its default state.
114   static void ResetNoWakeUpsForCanceledTasksForTesting();
115 
116   // SequenceManager implementation:
117   void BindToCurrentThread() override;
118   scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override;
119   void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override;
120   void SetObserver(Observer* observer) override;
121   void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
122   void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
123   void SetTimeDomain(TimeDomain* time_domain) override;
124   void ResetTimeDomain() override;
125   const TickClock* GetTickClock() const override;
126   TimeTicks NowTicks() const override;
127   void SetDefaultTaskRunner(
128       scoped_refptr<SingleThreadTaskRunner> task_runner) override;
129   void ReclaimMemory() override;
130   bool GetAndClearSystemIsQuiescentBit() override;
131   void SetWorkBatchSize(int work_batch_size) override;
132   void SetTimerSlack(TimerSlack timer_slack) override;
133   void EnableCrashKeys(const char* async_stack_crash_key) override;
134   const MetricRecordingSettings& GetMetricRecordingSettings() const override;
135   size_t GetPendingTaskCountForTesting() const override;
136   scoped_refptr<TaskQueue> CreateTaskQueue(
137       const TaskQueue::Spec& spec) override;
138   std::string DescribeAllPendingTasks() const override;
139   void PrioritizeYieldingToNative(base::TimeTicks prioritize_until) override;
140   void AddTaskObserver(TaskObserver* task_observer) override;
141   void RemoveTaskObserver(TaskObserver* task_observer) override;
142   absl::optional<WakeUp> GetNextDelayedWakeUp() const override;
143   TaskQueue::QueuePriority GetPriorityCount() const override;
144 
145   // SequencedTaskSource implementation:
146   absl::optional<SelectedTask> SelectNextTask(
147       LazyNow& lazy_now,
148       SelectTaskOption option = SelectTaskOption::kDefault) override;
149   void DidRunTask(LazyNow& lazy_now) override;
150   void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) override;
151   absl::optional<WakeUp> GetPendingWakeUp(
152       LazyNow* lazy_now,
153       SelectTaskOption option = SelectTaskOption::kDefault) const override;
154   bool HasPendingHighResolutionTasks() override;
155   bool OnSystemIdle() override;
156   void MaybeEmitTaskDetails(
157       perfetto::EventContext& ctx,
158       const SequencedTaskSource::SelectedTask& selected_task) const override;
159 
160   void AddDestructionObserver(
161       CurrentThread::DestructionObserver* destruction_observer);
162   void RemoveDestructionObserver(
163       CurrentThread::DestructionObserver* destruction_observer);
164   void RegisterOnNextIdleCallback(OnceClosure on_next_idle_callback);
165   // TODO(alexclarke): Remove this as part of https://crbug.com/825327.
166   void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner);
167   // TODO(alexclarke): Remove this as part of https://crbug.com/825327.
168   scoped_refptr<SingleThreadTaskRunner> GetTaskRunner();
169   bool IsBoundToCurrentThread() const;
170   MessagePump* GetMessagePump() const;
171   bool IsType(MessagePumpType type) const;
172   void SetAddQueueTimeToTasks(bool enable);
173   void SetTaskExecutionAllowed(bool allowed);
174   bool IsTaskExecutionAllowed() const;
175 #if BUILDFLAG(IS_IOS)
176   void AttachToMessagePump();
177 #endif
178   bool IsIdleForTesting() override;
179   void EnableMessagePumpTimeKeeperMetrics(const char* thread_name);
180 
181   // Requests that a task to process work is scheduled.
182   void ScheduleWork();
183 
184   // Returns the currently executing TaskQueue if any. Must be called on the
185   // thread this class was created on.
186   internal::TaskQueueImpl* currently_executing_task_queue() const;
187 
188   // Unregisters a TaskQueue previously created by |NewTaskQueue()|.
189   // No tasks will run on this queue after this call.
190   void UnregisterTaskQueueImpl(
191       std::unique_ptr<internal::TaskQueueImpl> task_queue);
192 
193   // Schedule a call to UnregisterTaskQueueImpl as soon as it's safe to do so.
194   void ShutdownTaskQueueGracefully(
195       std::unique_ptr<internal::TaskQueueImpl> task_queue);
196 
associated_thread()197   scoped_refptr<const AssociatedThreadId> associated_thread() const {
198     return associated_thread_;
199   }
200 
settings()201   const Settings& settings() const { return settings_; }
202 
203   WeakPtr<SequenceManagerImpl> GetWeakPtr();
204 
205   // How frequently to perform housekeeping tasks (sweeping canceled tasks etc).
206   static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30);
207 
208  protected:
209   static std::unique_ptr<ThreadControllerImpl>
210   CreateThreadControllerImplForCurrentThread(const TickClock* clock);
211 
212   // Create a task queue manager where |controller| controls the thread
213   // on which the tasks are eventually run.
214   SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller,
215                       SequenceManager::Settings settings = Settings());
216 
217   friend class internal::TaskQueueImpl;
218   friend class internal::DefaultWakeUpQueue;
219   friend class ::base::sequence_manager::SequenceManagerForTest;
220 
221  private:
222   // Returns the SequenceManager running the
223   // current thread. It must only be used on the thread it was obtained.
224   // Only to be used by CurrentThread for the moment
225   static SequenceManagerImpl* GetCurrent();
226   friend class ::base::CurrentThread;
227 
228   // Factory friends to call into private creation methods.
229   friend std::unique_ptr<SequenceManager>
230       sequence_manager::CreateSequenceManagerOnCurrentThread(
231           SequenceManager::Settings);
232   friend std::unique_ptr<SequenceManager>
233   sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump(
234       std::unique_ptr<MessagePump> message_pump,
235       SequenceManager::Settings);
236   friend std::unique_ptr<SequenceManager>
237       sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings);
238   friend std::unique_ptr<SequenceManagerImpl>
239       sequence_manager::internal::CreateUnboundSequenceManagerImpl(
240           PassKey<base::internal::SequenceManagerThreadDelegate>,
241           SequenceManager::Settings);
242 
243   // Assume direct control over current thread and create a SequenceManager.
244   // This function should be called only once per thread.
245   // This function assumes that a task execution environment is already
246   // initialized for the current thread.
247   static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread(
248       SequenceManager::Settings settings);
249 
250   // Create an unbound SequenceManager (typically for a future thread). The
251   // SequenceManager can be initialized on the current thread and then needs to
252   // be bound and initialized on the target thread by calling one of the Bind*()
253   // methods.
254   static std::unique_ptr<SequenceManagerImpl> CreateUnbound(
255       SequenceManager::Settings settings);
256 
257   enum class ProcessTaskResult {
258     kDeferred,
259     kExecuted,
260     kSequenceManagerDeleted,
261   };
262 
263   // SequenceManager maintains a queue of non-nestable tasks since they're
264   // uncommon and allocating an extra deque per TaskQueue will waste the memory.
265   using NonNestableTaskDeque =
266       circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>;
267 
268   // We have to track rentrancy because we support nested runloops but the
269   // selector interface is unaware of those.  This struct keeps track off all
270   // task related state needed to make pairs of SelectNextTask() / DidRunTask()
271   // work.
272   struct ExecutingTask {
ExecutingTaskExecutingTask273     ExecutingTask(Task&& task,
274                   internal::TaskQueueImpl* task_queue,
275                   TaskQueue::TaskTiming task_timing)
276         : pending_task(std::move(task)),
277           task_queue(task_queue),
278           task_queue_name(task_queue->GetProtoName()),
279           task_timing(task_timing),
280           priority(task_queue->GetQueuePriority()),
281           task_type(pending_task.task_type) {}
282 
283     Task pending_task;
284 
285     // `task_queue` is not a raw_ptr<...> for performance reasons (based on
286     // analysis of sampling profiler data and tab_search:top100:2020).
287     RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr;
288     // Save task_queue_name as the task queue can be deleted within the task.
289     QueueName task_queue_name;
290     TaskQueue::TaskTiming task_timing;
291     // Save priority as it might change after running a task.
292     TaskQueue::QueuePriority priority;
293     // Save task metadata to use in after running a task as |pending_task|
294     // won't be available then.
295     int task_type;
296   };
297 
298   struct MainThreadOnly {
299     explicit MainThreadOnly(
300         SequenceManagerImpl* sequence_manager,
301         const scoped_refptr<AssociatedThreadId>& associated_thread,
302         const SequenceManager::Settings& settings,
303         const base::TickClock* clock);
304     ~MainThreadOnly();
305 
306     int nesting_depth = 0;
307     NonNestableTaskDeque non_nestable_task_queue;
308     // TODO(altimin): Switch to instruction pointer crash key when it's
309     // available.
310     raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr;
311     raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr;
312     raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr;
313     std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)>
314         async_stack_buffer = {};
315 
316     absl::optional<base::MetricsSubSampler> metrics_subsampler;
317 
318     internal::TaskQueueSelector selector;
319     ObserverList<TaskObserver>::Unchecked task_observers;
320     ObserverList<TaskTimeObserver>::Unchecked task_time_observers;
321     const raw_ptr<const base::TickClock> default_clock;
322     raw_ptr<TimeDomain> time_domain = nullptr;
323 
324     std::unique_ptr<WakeUpQueue> wake_up_queue;
325     std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue;
326 
327     // If true MaybeReclaimMemory will attempt to reclaim memory.
328     bool memory_reclaim_scheduled = false;
329 
330     // Used to ensure we don't perform expensive housekeeping too frequently.
331     TimeTicks next_time_to_reclaim_memory;
332 
333     // List of task queues managed by this SequenceManager.
334     // - active_queues contains queues that are still running tasks.
335     //   Most often they are owned by relevant TaskQueues, but
336     //   queues_to_gracefully_shutdown_ are included here too.
337     // - queues_to_gracefully_shutdown contains queues which should be deleted
338     //   when they become empty.
339     // - queues_to_delete contains soon-to-be-deleted queues, because some
340     //   internal scheduling code does not expect queues to be pulled
341     //   from underneath.
342 
343     std::set<internal::TaskQueueImpl*> active_queues;
344 
345     std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>>
346         queues_to_gracefully_shutdown;
347     std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>>
348         queues_to_delete;
349 
350     bool task_was_run_on_quiescence_monitored_queue = false;
351     bool nesting_observer_registered_ = false;
352 
353     // Use std::deque() so that references returned by SelectNextTask() remain
354     // valid until the matching call to DidRunTask(), even when nested RunLoops
355     // cause tasks to be pushed on the stack in-between. This is needed because
356     // references are kept in local variables by calling code between
357     // SelectNextTask()/DidRunTask().
358     std::deque<ExecutingTask> task_execution_stack;
359 
360     raw_ptr<Observer> observer = nullptr;  // NOT OWNED
361 
362     ObserverList<CurrentThread::DestructionObserver>::Unchecked
363         destruction_observers;
364 
365     // If non-null, invoked the next time OnSystemIdle() completes without
366     // scheduling additional work.
367     OnceClosure on_next_idle_callback;
368   };
369 
370   void CompleteInitializationOnBoundThread();
371 
372   // TaskQueueSelector::Observer:
373   void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override;
374 
375   // RunLoop::NestingObserver:
376   void OnBeginNestedRunLoop() override;
377   void OnExitNestedRunLoop() override;
378 
379   // Schedules next wake-up at the given time, canceling any previous requests.
380   // Use absl::nullopt to cancel a wake-up. Must be called on the thread this
381   // class was created on.
382   void SetNextWakeUp(LazyNow* lazy_now, absl::optional<WakeUp> wake_up);
383 
384   // Called by the task queue to inform this SequenceManager of a task that's
385   // about to be queued. This SequenceManager may use this opportunity to add
386   // metadata to |pending_task| before it is moved into the queue.
387   void WillQueueTask(Task* pending_task);
388 
389   // Enqueues onto delayed WorkQueues all delayed tasks which must run now
390   // (cannot be postponed) and possibly some delayed tasks which can run now but
391   // could be postponed (due to how tasks are stored, it is not possible to
392   // retrieve all such tasks efficiently) and reloads any empty work queues.
393   void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now);
394 
395   void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task);
396   void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task);
397 
398   EnqueueOrder GetNextSequenceNumber();
399 
400   bool GetAddQueueTimeToTasks();
401 
402   std::unique_ptr<trace_event::ConvertableToTraceFormat>
403   AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue,
404                                       bool force_verbose) const;
405   Value::Dict AsValueWithSelectorResult(
406       internal::WorkQueue* selected_work_queue,
407       bool force_verbose) const;
408 
409   // Used in construction of TaskQueueImpl to obtain an AtomicFlag which it can
410   // use to request reload by ReloadEmptyWorkQueues. The lifetime of
411   // TaskQueueImpl is managed by this class and the handle will be released by
412   // TaskQueueImpl::UnregisterTaskQueue which is always called before the
413   // queue's destruction.
414   AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue(
415       TaskQueueImpl* task_queue);
416 
417   // Calls |TakeImmediateIncomingQueueTasks| on all queues with their reload
418   // flag set in |empty_queues_to_reload_|.
419   void ReloadEmptyWorkQueues() const;
420 
421   std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl(
422       const TaskQueue::Spec& spec) override;
423 
424   // Periodically reclaims memory by sweeping away canceled tasks and shrinking
425   // buffers.
426   void MaybeReclaimMemory();
427 
428   // Deletes queues marked for deletion and empty queues marked for shutdown.
429   void CleanUpQueues();
430 
431   void RemoveAllCanceledTasksFromFrontOfWorkQueues();
432 
433   TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming(
434       const internal::TaskQueueImpl* task_queue);
435   bool ShouldRecordCPUTimeForTask();
436 
437   // Write the async stack trace onto a crash key as whitespace-delimited hex
438   // addresses.
439   void RecordCrashKeys(const PendingTask&);
440 
441   // Helper to terminate all scoped trace events to allow starting new ones
442   // in SelectNextTask().
443   absl::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now,
444                                                   SelectTaskOption option);
445 
446   // Returns a wake-up for the next delayed task which is not ripe for
447   // execution, or nullopt if `option` is `kSkipDelayedTask` or there
448   // are no such tasks (immediate tasks don't count).
449   absl::optional<WakeUp> GetNextDelayedWakeUpWithOption(
450       SelectTaskOption option) const;
451 
452   // Given a `wake_up` describing when the next delayed task should run, returns
453   // a wake up that should be scheduled on the thread. `is_immediate()` if the
454   // wake up should run immediately. `nullopt` if no wake up is required because
455   // `wake_up` is `nullopt` or a `time_domain` is used.
456   absl::optional<WakeUp> AdjustWakeUp(absl::optional<WakeUp> wake_up,
457                                       LazyNow* lazy_now) const;
458 
459   void MaybeAddLeewayToTask(Task& task) const;
460 
461 #if DCHECK_IS_ON()
462   void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const;
463 #endif
464 
465   // Determines if wall time or thread time should be recorded for the next
466   // task.
467   TaskQueue::TaskTiming InitializeTaskTiming(
468       internal::TaskQueueImpl* task_queue);
469 
470   const scoped_refptr<AssociatedThreadId> associated_thread_;
471 
472   EnqueueOrderGenerator enqueue_order_generator_;
473 
474   const std::unique_ptr<internal::ThreadController> controller_;
475   const Settings settings_;
476 
477   const MetricRecordingSettings metric_recording_settings_;
478 
479   // Whether to add the queue time to tasks.
480   base::subtle::Atomic32 add_queue_time_to_tasks_;
481 
482   AtomicFlagSet empty_queues_to_reload_;
483 
484   MainThreadOnly main_thread_only_;
main_thread_only()485   MainThreadOnly& main_thread_only() {
486     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
487     return main_thread_only_;
488   }
main_thread_only()489   const MainThreadOnly& main_thread_only() const {
490     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
491     return main_thread_only_;
492   }
493 
494   // |clock_| either refers to the TickClock representation of |time_domain|
495   // (same object) if any, or to |default_clock| otherwise. It is maintained as
496   // an atomic pointer here for multi-threaded usage.
497   std::atomic<const base::TickClock*> clock_;
main_thread_clock()498   const base::TickClock* main_thread_clock() const {
499     DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
500     return clock_.load(std::memory_order_relaxed);
501   }
any_thread_clock()502   const base::TickClock* any_thread_clock() const {
503     // |memory_order_acquire| matched by |memory_order_release| in
504     // SetTimeDomain() to ensure all data used by |clock_| is visible when read
505     // from the current thread. A thread might try to access a stale |clock_|
506     // but that's not an issue since |time_domain| contractually outlives
507     // SequenceManagerImpl even if it's reset.
508     return clock_.load(std::memory_order_acquire);
509   }
510 
511   WeakPtrFactory<SequenceManagerImpl> weak_factory_{this};
512 };
513 
514 }  // namespace internal
515 }  // namespace sequence_manager
516 }  // namespace base
517 
518 #endif  // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
519