1 // Copyright 2018 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 7 8 #include <list> 9 #include <map> 10 #include <memory> 11 #include <random> 12 #include <set> 13 #include <unordered_map> 14 #include <utility> 15 16 #include "base/atomic_sequence_num.h" 17 #include "base/cancelable_callback.h" 18 #include "base/containers/circular_deque.h" 19 #include "base/debug/task_annotator.h" 20 #include "base/macros.h" 21 #include "base/memory/scoped_refptr.h" 22 #include "base/memory/weak_ptr.h" 23 #include "base/message_loop/message_loop.h" 24 #include "base/pending_task.h" 25 #include "base/run_loop.h" 26 #include "base/single_thread_task_runner.h" 27 #include "base/synchronization/lock.h" 28 #include "base/task/sequence_manager/enqueue_order.h" 29 #include "base/task/sequence_manager/graceful_queue_shutdown_helper.h" 30 #include "base/task/sequence_manager/moveable_auto_lock.h" 31 #include "base/task/sequence_manager/sequence_manager.h" 32 #include "base/task/sequence_manager/task_queue_impl.h" 33 #include "base/task/sequence_manager/task_queue_selector.h" 34 #include "base/task/sequence_manager/thread_controller.h" 35 #include "base/threading/thread_checker.h" 36 37 namespace base { 38 39 namespace debug { 40 struct CrashKeyString; 41 } // namespace debug 42 43 namespace trace_event { 44 class ConvertableToTraceFormat; 45 } // namespace trace_event 46 47 namespace sequence_manager { 48 49 class SequenceManagerForTest; 50 class TaskQueue; 51 class TaskTimeObserver; 52 class TimeDomain; 53 54 namespace internal { 55 56 class RealTimeDomain; 57 class TaskQueueImpl; 58 59 // The task queue manager provides N task queues and a selector interface for 60 // choosing which task queue to service next. Each task queue consists of two 61 // sub queues: 62 // 63 // 1. Incoming task queue. Tasks that are posted get immediately appended here. 64 // When a task is appended into an empty incoming queue, the task manager 65 // work function (DoWork()) is scheduled to run on the main task runner. 66 // 67 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from 68 // the incoming task queue (if any) are moved here. The work queues are 69 // registered with the selector as input to the scheduling decision. 70 // 71 class BASE_EXPORT SequenceManagerImpl 72 : public SequenceManager, 73 public internal::SequencedTaskSource, 74 public internal::TaskQueueSelector::Observer, 75 public RunLoop::NestingObserver { 76 public: 77 using Observer = SequenceManager::Observer; 78 79 ~SequenceManagerImpl() override; 80 81 // Assume direct control over current thread and create a SequenceManager. 82 // This function should be called only once per thread. 83 // This function assumes that a MessageLoop is initialized for 84 // the current thread. 85 static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread(); 86 87 // SequenceManager implementation: 88 void SetObserver(Observer* observer) override; 89 void AddTaskObserver(MessageLoop::TaskObserver* task_observer) override; 90 void RemoveTaskObserver(MessageLoop::TaskObserver* task_observer) override; 91 void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 92 void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override; 93 void RegisterTimeDomain(TimeDomain* time_domain) override; 94 void UnregisterTimeDomain(TimeDomain* time_domain) override; 95 TimeDomain* GetRealTimeDomain() const override; 96 const TickClock* GetTickClock() const override; 97 TimeTicks NowTicks() const override; 98 void SetDefaultTaskRunner( 99 scoped_refptr<SingleThreadTaskRunner> task_runner) override; 100 void SweepCanceledDelayedTasks() override; 101 bool GetAndClearSystemIsQuiescentBit() override; 102 void SetWorkBatchSize(int work_batch_size) override; 103 void EnableCrashKeys(const char* file_name_crash_key, 104 const char* function_name_crash_key) override; 105 const MetricRecordingSettings& GetMetricRecordingSettings() const override; 106 107 // Implementation of SequencedTaskSource: 108 Optional<PendingTask> TakeTask() override; 109 void DidRunTask() override; 110 TimeDelta DelayTillNextTask(LazyNow* lazy_now) override; 111 112 // Requests that a task to process work is posted on the main task runner. 113 // These tasks are de-duplicated in two buckets: main-thread and all other 114 // threads. This distinction is done to reduce the overhead from locks, we 115 // assume the main-thread path will be hot. 116 void MaybeScheduleImmediateWork(const Location& from_here); 117 118 // Requests that a delayed task to process work is posted on the main task 119 // runner. These delayed tasks are de-duplicated. Must be called on the thread 120 // this class was created on. 121 122 // Schedules next wake-up at the given time, cancels any previous requests. 123 // Use TimeTicks::Max() to cancel a wake-up. 124 // Must be called from a TimeDomain only. 125 void SetNextDelayedDoWork(LazyNow* lazy_now, TimeTicks run_time); 126 127 // Returns the currently executing TaskQueue if any. Must be called on the 128 // thread this class was created on. 129 internal::TaskQueueImpl* currently_executing_task_queue() const; 130 131 // Unregisters a TaskQueue previously created by |NewTaskQueue()|. 132 // No tasks will run on this queue after this call. 133 void UnregisterTaskQueueImpl( 134 std::unique_ptr<internal::TaskQueueImpl> task_queue); 135 136 scoped_refptr<internal::GracefulQueueShutdownHelper> 137 GetGracefulQueueShutdownHelper() const; 138 139 WeakPtr<SequenceManagerImpl> GetWeakPtr(); 140 141 protected: 142 // Create a task queue manager where |controller| controls the thread 143 // on which the tasks are eventually run. 144 explicit SequenceManagerImpl( 145 std::unique_ptr<internal::ThreadController> controller); 146 147 friend class internal::TaskQueueImpl; 148 friend class ::base::sequence_manager::SequenceManagerForTest; 149 150 private: 151 enum class ProcessTaskResult { 152 kDeferred, 153 kExecuted, 154 kSequenceManagerDeleted, 155 }; 156 157 struct AnyThread { 158 AnyThread(); 159 ~AnyThread(); 160 161 // Task queues with newly available work on the incoming queue. 162 internal::IncomingImmediateWorkList* incoming_immediate_work_list = nullptr; 163 }; 164 165 // SequenceManager maintains a queue of non-nestable tasks since they're 166 // uncommon and allocating an extra deque per TaskQueue will waste the memory. 167 using NonNestableTaskDeque = 168 circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>; 169 170 // We have to track rentrancy because we support nested runloops but the 171 // selector interface is unaware of those. This struct keeps track off all 172 // task related state needed to make pairs of TakeTask() / DidRunTask() work. 173 struct ExecutingTask { ExecutingTaskExecutingTask174 ExecutingTask(internal::TaskQueueImpl::Task&& pending_task, 175 internal::TaskQueueImpl* task_queue, 176 TaskQueue::TaskTiming task_timing) 177 : pending_task(std::move(pending_task)), 178 task_queue(task_queue), 179 task_timing(task_timing) {} 180 181 internal::TaskQueueImpl::Task pending_task; 182 internal::TaskQueueImpl* task_queue = nullptr; 183 TaskQueue::TaskTiming task_timing; 184 }; 185 186 struct MainThreadOnly { 187 MainThreadOnly(); 188 ~MainThreadOnly(); 189 190 int nesting_depth = 0; 191 NonNestableTaskDeque non_nestable_task_queue; 192 // TODO(altimin): Switch to instruction pointer crash key when it's 193 // available. 194 debug::CrashKeyString* file_name_crash_key = nullptr; 195 debug::CrashKeyString* function_name_crash_key = nullptr; 196 197 std::mt19937_64 random_generator; 198 std::uniform_real_distribution<double> uniform_distribution; 199 200 internal::TaskQueueSelector selector; 201 ObserverList<MessageLoop::TaskObserver> task_observers; 202 ObserverList<TaskTimeObserver> task_time_observers; 203 std::set<TimeDomain*> time_domains; 204 std::unique_ptr<internal::RealTimeDomain> real_time_domain; 205 206 // List of task queues managed by this SequenceManager. 207 // - active_queues contains queues that are still running tasks. 208 // Most often they are owned by relevant TaskQueues, but 209 // queues_to_gracefully_shutdown_ are included here too. 210 // - queues_to_gracefully_shutdown contains queues which should be deleted 211 // when they become empty. 212 // - queues_to_delete contains soon-to-be-deleted queues, because some 213 // internal scheduling code does not expect queues to be pulled 214 // from underneath. 215 216 std::set<internal::TaskQueueImpl*> active_queues; 217 std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>> 218 queues_to_gracefully_shutdown; 219 std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>> 220 queues_to_delete; 221 222 // Scratch space used to store the contents of 223 // any_thread().incoming_immediate_work_list for use by 224 // ReloadEmptyWorkQueues. We keep hold of this vector to avoid unnecessary 225 // memory allocations. 226 std::vector<internal::TaskQueueImpl*> queues_to_reload; 227 228 bool task_was_run_on_quiescence_monitored_queue = false; 229 230 // Due to nested runloops more than one task can be executing concurrently. 231 std::list<ExecutingTask> task_execution_stack; 232 233 Observer* observer = nullptr; // NOT OWNED 234 }; 235 236 // TaskQueueSelector::Observer: 237 void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override; 238 239 // RunLoop::NestingObserver: 240 void OnBeginNestedRunLoop() override; 241 void OnExitNestedRunLoop() override; 242 243 // Called by the task queue to inform this SequenceManager of a task that's 244 // about to be queued. This SequenceManager may use this opportunity to add 245 // metadata to |pending_task| before it is moved into the queue. 246 void WillQueueTask(internal::TaskQueueImpl::Task* pending_task); 247 248 // Delayed Tasks with run_times <= Now() are enqueued onto the work queue and 249 // reloads any empty work queues. 250 void WakeUpReadyDelayedQueues(LazyNow* lazy_now); 251 252 void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task); 253 void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task); 254 255 internal::EnqueueOrder GetNextSequenceNumber(); 256 257 std::unique_ptr<trace_event::ConvertableToTraceFormat> 258 AsValueWithSelectorResult(bool should_run, 259 internal::WorkQueue* selected_work_queue) const; 260 261 // Adds |queue| to |any_thread().has_incoming_immediate_work_| and if 262 // |queue_is_blocked| is false it makes sure a DoWork is posted. 263 // Can be called from any thread. 264 void OnQueueHasIncomingImmediateWork(internal::TaskQueueImpl* queue, 265 internal::EnqueueOrder enqueue_order, 266 bool queue_is_blocked); 267 268 // Returns true if |task_queue| was added to the list, or false if it was 269 // already in the list. If |task_queue| was inserted, the |order| is set 270 // with |enqueue_order|. 271 bool AddToIncomingImmediateWorkList(internal::TaskQueueImpl* task_queue, 272 internal::EnqueueOrder enqueue_order); 273 void RemoveFromIncomingImmediateWorkList(internal::TaskQueueImpl* task_queue); 274 275 // Calls |ReloadImmediateWorkQueueIfEmpty| on all queues in 276 // |main_thread_only().queues_to_reload|. 277 void ReloadEmptyWorkQueues(); 278 279 std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl( 280 const TaskQueue::Spec& spec) override; 281 282 void TakeQueuesToGracefullyShutdownFromHelper(); 283 284 // Deletes queues marked for deletion and empty queues marked for shutdown. 285 void CleanUpQueues(); 286 287 bool ShouldRecordCPUTimeForTask(); 288 289 // Determines if wall time or thread time should be recorded for the next 290 // task. 291 TaskQueue::TaskTiming InitializeTaskTiming( 292 internal::TaskQueueImpl* task_queue); 293 294 const scoped_refptr<internal::GracefulQueueShutdownHelper> 295 graceful_shutdown_helper_; 296 297 internal::EnqueueOrder::Generator enqueue_order_generator_; 298 299 std::unique_ptr<internal::ThreadController> controller_; 300 301 mutable Lock any_thread_lock_; 302 AnyThread any_thread_; 303 any_thread()304 struct AnyThread& any_thread() { 305 any_thread_lock_.AssertAcquired(); 306 return any_thread_; 307 } any_thread()308 const struct AnyThread& any_thread() const { 309 any_thread_lock_.AssertAcquired(); 310 return any_thread_; 311 } 312 313 const MetricRecordingSettings metric_recording_settings_; 314 315 // A check to bail out early during memory corruption. 316 // https://crbug.com/757940 317 bool Validate(); 318 319 int32_t memory_corruption_sentinel_; 320 321 THREAD_CHECKER(main_thread_checker_); 322 MainThreadOnly main_thread_only_; main_thread_only()323 MainThreadOnly& main_thread_only() { 324 DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_); 325 return main_thread_only_; 326 } main_thread_only()327 const MainThreadOnly& main_thread_only() const { 328 DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_); 329 return main_thread_only_; 330 } 331 332 WeakPtrFactory<SequenceManagerImpl> weak_factory_; 333 334 DISALLOW_COPY_AND_ASSIGN(SequenceManagerImpl); 335 }; 336 337 } // namespace internal 338 } // namespace sequence_manager 339 } // namespace base 340 341 #endif // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_ 342