1 // Copyright 2016 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_ 6 #define BASE_TASK_THREAD_POOL_SEQUENCE_H_ 7 8 #include <stddef.h> 9 10 #include "base/base_export.h" 11 #include "base/compiler_specific.h" 12 #include "base/containers/intrusive_heap.h" 13 #include "base/containers/queue.h" 14 #include "base/sequence_token.h" 15 #include "base/task/task_traits.h" 16 #include "base/task/thread_pool/pooled_parallel_task_runner.h" 17 #include "base/task/thread_pool/task.h" 18 #include "base/task/thread_pool/task_source.h" 19 #include "base/task/thread_pool/task_source_sort_key.h" 20 #include "base/thread_annotations.h" 21 #include "base/threading/sequence_local_storage_map.h" 22 23 namespace base { 24 namespace internal { 25 26 // A Sequence is intended to hold delayed tasks and immediate tasks. 27 // Delayed tasks are held in a prority_queue until they are ripe and 28 // immediate tasks in a simple fifo queue. 29 // When Sequence::TakeTask is called, we select the next appropriate task 30 // from both queues and return it. 31 // Each queue holds slots each containing up to a single Task that must be 32 // executed in posting/runtime order. 33 // 34 // In comments below, an "empty Sequence" is a Sequence with no slot. 35 // 36 // Note: there is a known refcounted-ownership cycle in the Scheduler 37 // architecture: Sequence -> Task -> TaskRunner -> Sequence -> ... 38 // This is okay so long as the other owners of Sequence (PriorityQueue and 39 // WorkerThread in alternation and 40 // ThreadGroup::WorkerThreadDelegateImpl::GetWork() 41 // temporarily) keep running it (and taking Tasks from it as a result). A 42 // dangling reference cycle would only occur should they release their reference 43 // to it while it's not empty. In other words, it is only correct for them to 44 // release it after PopTask() returns false to indicate it was made empty by 45 // that call (in which case the next PushImmediateTask() will return true to 46 // indicate to the caller that the Sequence should be re-enqueued for 47 // execution). This class is thread-safe. 48 class BASE_EXPORT Sequence : public TaskSource { 49 public: 50 // A Transaction can perform multiple operations atomically on a 51 // Sequence. While a Transaction is alive, it is guaranteed that nothing 52 // else will access the Sequence; the Sequence's lock is held for the 53 // lifetime of the Transaction. 54 class BASE_EXPORT Transaction : public TaskSource::Transaction { 55 public: 56 Transaction(Transaction&& other); 57 Transaction(const Transaction&) = delete; 58 Transaction& operator=(const Transaction&) = delete; 59 ~Transaction(); 60 61 // Returns true if the sequence must be added to the immediate queue after 62 // receiving a new immediate Task in order to be scheduled. If the caller 63 // doesn't want the sequence to be scheduled, it may not add the sequence to 64 // the immediate queue even if this returns true. 65 bool WillPushImmediateTask(); 66 67 // Adds immediate |task| to the end of this sequence. 68 void PushImmediateTask(Task task); 69 70 // Adds a delayed |task| in this sequence, and returns true if the sequence 71 // needs to be re-enqueued in the delayed queue as a result of this 72 // sequence's delayed sort key changing. 73 bool PushDelayedTask(Task task); 74 sequence()75 Sequence* sequence() const { return static_cast<Sequence*>(task_source()); } 76 77 private: 78 friend class Sequence; 79 80 explicit Transaction(Sequence* sequence); 81 }; 82 83 // |traits| is metadata that applies to all Tasks in the Sequence. 84 // |task_runner| is a reference to the TaskRunner feeding this TaskSource. 85 // |task_runner| can be nullptr only for tasks with no TaskRunner, in which 86 // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the 87 // execution mode of |task_runner|. 88 Sequence(const TaskTraits& traits, 89 SequencedTaskRunner* task_runner, 90 TaskSourceExecutionMode execution_mode); 91 Sequence(const Sequence&) = delete; 92 Sequence& operator=(const Sequence&) = delete; 93 94 // Begins a Transaction. This method cannot be called on a thread which has an 95 // active Sequence::Transaction. 96 [[nodiscard]] Transaction BeginTransaction(); 97 98 // TaskSource: 99 ExecutionEnvironment GetExecutionEnvironment() override; 100 size_t GetRemainingConcurrency() const override; 101 TaskSourceSortKey GetSortKey() const override; 102 TimeTicks GetDelayedSortKey() const override; 103 104 // Returns a token that uniquely identifies this Sequence. token()105 const SequenceToken& token() const LIFETIME_BOUND { return token_; } 106 sequence_local_storage()107 SequenceLocalStorageMap* sequence_local_storage() { 108 return &sequence_local_storage_; 109 } 110 111 bool OnBecomeReady() override; 112 has_worker_for_testing()113 bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS { 114 return has_worker_; 115 } is_immediate_for_testing()116 bool is_immediate_for_testing() const { return is_immediate_; } IsEmptyForTesting()117 bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); } 118 119 // A reference to TaskRunner is only retained between 120 // PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns 121 // false, guaranteeing it is safe to dereference this pointer. Otherwise, the 122 // caller should guarantee such TaskRunner still exists before dereferencing. task_runner()123 SequencedTaskRunner* task_runner() const { return task_runner_; } 124 125 private: 126 ~Sequence() override; 127 128 struct DelayedTaskGreater { 129 bool operator()(const Task& lhs, const Task& rhs) const; 130 }; 131 132 // TaskSource: 133 RunStatus WillRunTask() override; 134 Task TakeTask(TaskSource::Transaction* transaction) override; 135 std::optional<Task> Clear(TaskSource::Transaction* transaction) override; 136 bool DidProcessTask(TaskSource::Transaction* transaction) override; 137 bool WillReEnqueue(TimeTicks now, 138 TaskSource::Transaction* transaction) override; 139 140 // Returns true if the delayed task to be posted will cause the delayed sort 141 // key to change. 142 bool DelayedSortKeyWillChange(const Task& delayed_task) const 143 EXCLUSIVE_LOCKS_REQUIRED(lock_); 144 145 // Selects the earliest task to run, either from immediate or 146 // delayed queue and return it. 147 // Expects this sequence to have at least one task that can run 148 // immediately. 149 Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_); 150 151 // Get and return next task from immediate queue 152 Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_); 153 154 // Update the next earliest/latest ready time. 155 void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_); 156 157 // Returns true if there are immediate tasks 158 bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_); 159 160 // Returns true if tasks ready to be executed 161 bool HasReadyTasks(TimeTicks now) const override; 162 163 bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_); 164 165 // Releases reference to TaskRunner. 166 void ReleaseTaskRunner(); 167 168 const SequenceToken token_ = SequenceToken::Create(); 169 170 // A pointer to the TaskRunner that posts to this TaskSource, if any. The 171 // derived class is responsible for calling AddRef() when a TaskSource from 172 // which no Task is executing becomes non-empty and Release() when 173 // it becomes empty again (e.g. when DidProcessTask() returns false). 174 // 175 // In practise, this pointer is going to become dangling. See task_runner() 176 // comment. 177 raw_ptr<SequencedTaskRunner, DisableDanglingPtrDetection> task_runner_; 178 179 // Queues of tasks to execute. 180 base::queue<Task> queue_ GUARDED_BY(lock_); 181 base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue_ 182 GUARDED_BY(lock_); 183 184 // Caches the latest/earliest ready time for atomic access. Writes are 185 // protected by |lock_|, but allows atomic reads outside of |lock_|. If this 186 // sequence is empty, these are in an unknown state and shouldn't be read. 187 188 // Minimum of latest_delayed_run_time() of next delayed task if any, and 189 // |queue_time| of next immediate task if any. GUARDED_BY(lock_)190 std::atomic<TimeTicks> latest_ready_time_ GUARDED_BY(lock_){TimeTicks()}; 191 // is_null() if there is an immediate task, or earliest_delayed_run_time() of 192 // next delayed task otherwise. GUARDED_BY(lock_)193 std::atomic<TimeTicks> earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()}; 194 195 // True if a worker is currently associated with a Task from this Sequence. 196 bool has_worker_ = false; 197 198 // True if the sequence has ready tasks and requested to be queued as such 199 // through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all 200 // ready tasks are done being processed and either DidProcessTask() or 201 // WillReEnqueue() returned false. Normally, |is_immediate_| is protected by 202 // |lock_|, except in OnBecomeReady() hence the use of atomics. 203 std::atomic_bool is_immediate_{false}; 204 205 // Holds data stored through the SequenceLocalStorageSlot API. 206 SequenceLocalStorageMap sequence_local_storage_; 207 }; 208 209 } // namespace internal 210 } // namespace base 211 212 #endif // BASE_TASK_THREAD_POOL_SEQUENCE_H_ 213