• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_SEQUENCE_H_
6 #define BASE_TASK_THREAD_POOL_SEQUENCE_H_
7 
8 #include <stddef.h>
9 
10 #include "base/base_export.h"
11 #include "base/compiler_specific.h"
12 #include "base/containers/intrusive_heap.h"
13 #include "base/containers/queue.h"
14 #include "base/sequence_token.h"
15 #include "base/task/task_traits.h"
16 #include "base/task/thread_pool/pooled_parallel_task_runner.h"
17 #include "base/task/thread_pool/task.h"
18 #include "base/task/thread_pool/task_source.h"
19 #include "base/task/thread_pool/task_source_sort_key.h"
20 #include "base/thread_annotations.h"
21 #include "base/threading/sequence_local_storage_map.h"
22 
23 namespace base {
24 namespace internal {
25 
26 // A Sequence is intended to hold delayed tasks and immediate tasks.
27 // Delayed tasks are held in a prority_queue until they are ripe and
28 // immediate tasks in a simple fifo queue.
29 // When Sequence::TakeTask is called, we select the next appropriate task
30 // from both queues and return it.
31 // Each queue holds slots each containing up to a single Task that must be
32 // executed in posting/runtime order.
33 //
34 // In comments below, an "empty Sequence" is a Sequence with no slot.
35 //
36 // Note: there is a known refcounted-ownership cycle in the Scheduler
37 // architecture: Sequence -> Task -> TaskRunner -> Sequence -> ...
38 // This is okay so long as the other owners of Sequence (PriorityQueue and
39 // WorkerThread in alternation and
40 // ThreadGroup::WorkerThreadDelegateImpl::GetWork()
41 // temporarily) keep running it (and taking Tasks from it as a result). A
42 // dangling reference cycle would only occur should they release their reference
43 // to it while it's not empty. In other words, it is only correct for them to
44 // release it after PopTask() returns false to indicate it was made empty by
45 // that call (in which case the next PushImmediateTask() will return true to
46 // indicate to the caller that the Sequence should be re-enqueued for
47 // execution). This class is thread-safe.
48 class BASE_EXPORT Sequence : public TaskSource {
49  public:
50   // A Transaction can perform multiple operations atomically on a
51   // Sequence. While a Transaction is alive, it is guaranteed that nothing
52   // else will access the Sequence; the Sequence's lock is held for the
53   // lifetime of the Transaction.
54   class BASE_EXPORT Transaction : public TaskSource::Transaction {
55    public:
56     Transaction(Transaction&& other);
57     Transaction(const Transaction&) = delete;
58     Transaction& operator=(const Transaction&) = delete;
59     ~Transaction();
60 
61     // Returns true if the sequence must be added to the immediate queue after
62     // receiving a new immediate Task in order to be scheduled. If the caller
63     // doesn't want the sequence to be scheduled, it may not add the sequence to
64     // the immediate queue even if this returns true.
65     bool WillPushImmediateTask();
66 
67     // Adds immediate |task| to the end of this sequence.
68     void PushImmediateTask(Task task);
69 
70     // Adds a delayed |task| in this sequence, and returns true if the sequence
71     // needs to be re-enqueued in the delayed queue as a result of this
72     // sequence's delayed sort key changing.
73     bool PushDelayedTask(Task task);
74 
sequence()75     Sequence* sequence() const { return static_cast<Sequence*>(task_source()); }
76 
77    private:
78     friend class Sequence;
79 
80     explicit Transaction(Sequence* sequence);
81   };
82 
83   // |traits| is metadata that applies to all Tasks in the Sequence.
84   // |task_runner| is a reference to the TaskRunner feeding this TaskSource.
85   // |task_runner| can be nullptr only for tasks with no TaskRunner, in which
86   // case |execution_mode| must be kParallel. Otherwise, |execution_mode| is the
87   // execution mode of |task_runner|.
88   Sequence(const TaskTraits& traits,
89            SequencedTaskRunner* task_runner,
90            TaskSourceExecutionMode execution_mode);
91   Sequence(const Sequence&) = delete;
92   Sequence& operator=(const Sequence&) = delete;
93 
94   // Begins a Transaction. This method cannot be called on a thread which has an
95   // active Sequence::Transaction.
96   [[nodiscard]] Transaction BeginTransaction();
97 
98   // TaskSource:
99   ExecutionEnvironment GetExecutionEnvironment() override;
100   size_t GetRemainingConcurrency() const override;
101   TaskSourceSortKey GetSortKey() const override;
102   TimeTicks GetDelayedSortKey() const override;
103 
104   // Returns a token that uniquely identifies this Sequence.
token()105   const SequenceToken& token() const LIFETIME_BOUND { return token_; }
106 
sequence_local_storage()107   SequenceLocalStorageMap* sequence_local_storage() {
108     return &sequence_local_storage_;
109   }
110 
111   bool OnBecomeReady() override;
112 
has_worker_for_testing()113   bool has_worker_for_testing() const NO_THREAD_SAFETY_ANALYSIS {
114     return has_worker_;
115   }
is_immediate_for_testing()116   bool is_immediate_for_testing() const { return is_immediate_; }
IsEmptyForTesting()117   bool IsEmptyForTesting() const NO_THREAD_SAFETY_ANALYSIS { return IsEmpty(); }
118 
119   // A reference to TaskRunner is only retained between
120   // PushImmediateTask()/PushDelayedTask() and when DidProcessTask() returns
121   // false, guaranteeing it is safe to dereference this pointer. Otherwise, the
122   // caller should guarantee such TaskRunner still exists before dereferencing.
task_runner()123   SequencedTaskRunner* task_runner() const { return task_runner_; }
124 
125  private:
126   ~Sequence() override;
127 
128   struct DelayedTaskGreater {
129     bool operator()(const Task& lhs, const Task& rhs) const;
130   };
131 
132   // TaskSource:
133   RunStatus WillRunTask() override;
134   Task TakeTask(TaskSource::Transaction* transaction) override;
135   std::optional<Task> Clear(TaskSource::Transaction* transaction) override;
136   bool DidProcessTask(TaskSource::Transaction* transaction) override;
137   bool WillReEnqueue(TimeTicks now,
138                      TaskSource::Transaction* transaction) override;
139 
140   // Returns true if the delayed task to be posted will cause the delayed sort
141   // key to change.
142   bool DelayedSortKeyWillChange(const Task& delayed_task) const
143       EXCLUSIVE_LOCKS_REQUIRED(lock_);
144 
145   // Selects the earliest task to run, either from immediate or
146   // delayed queue and return it.
147   // Expects this sequence to have at least one task that can run
148   // immediately.
149   Task TakeEarliestTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
150 
151   // Get and return next task from immediate queue
152   Task TakeNextImmediateTask() EXCLUSIVE_LOCKS_REQUIRED(lock_);
153 
154   // Update the next earliest/latest ready time.
155   void UpdateReadyTimes() EXCLUSIVE_LOCKS_REQUIRED(lock_);
156 
157   // Returns true if there are immediate tasks
158   bool HasImmediateTasks() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
159 
160   // Returns true if tasks ready to be executed
161   bool HasReadyTasks(TimeTicks now) const override;
162 
163   bool IsEmpty() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
164 
165   // Releases reference to TaskRunner.
166   void ReleaseTaskRunner();
167 
168   const SequenceToken token_ = SequenceToken::Create();
169 
170   // A pointer to the TaskRunner that posts to this TaskSource, if any. The
171   // derived class is responsible for calling AddRef() when a TaskSource from
172   // which no Task is executing becomes non-empty and Release() when
173   // it becomes empty again (e.g. when DidProcessTask() returns false).
174   //
175   // In practise, this pointer is going to become dangling. See task_runner()
176   // comment.
177   raw_ptr<SequencedTaskRunner, DisableDanglingPtrDetection> task_runner_;
178 
179   // Queues of tasks to execute.
180   base::queue<Task> queue_ GUARDED_BY(lock_);
181   base::IntrusiveHeap<Task, DelayedTaskGreater> delayed_queue_
182       GUARDED_BY(lock_);
183 
184   // Caches the latest/earliest ready time for atomic access. Writes are
185   // protected by |lock_|, but allows atomic reads outside of |lock_|. If this
186   // sequence is empty, these are in an unknown state and shouldn't be read.
187 
188   // Minimum of latest_delayed_run_time() of next delayed task if any, and
189   // |queue_time| of next immediate task if any.
GUARDED_BY(lock_)190   std::atomic<TimeTicks> latest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
191   // is_null() if there is an immediate task, or earliest_delayed_run_time() of
192   // next delayed task otherwise.
GUARDED_BY(lock_)193   std::atomic<TimeTicks> earliest_ready_time_ GUARDED_BY(lock_){TimeTicks()};
194 
195   // True if a worker is currently associated with a Task from this Sequence.
196   bool has_worker_ = false;
197 
198   // True if the sequence has ready tasks and requested to be queued as such
199   // through WillPushImmediateTask() or OnBecomeReady(). Reset to false once all
200   // ready tasks are done being processed and either DidProcessTask() or
201   // WillReEnqueue() returned false. Normally, |is_immediate_| is protected by
202   // |lock_|, except in OnBecomeReady() hence the use of atomics.
203   std::atomic_bool is_immediate_{false};
204 
205   // Holds data stored through the SequenceLocalStorageSlot API.
206   SequenceLocalStorageMap sequence_local_storage_;
207 };
208 
209 }  // namespace internal
210 }  // namespace base
211 
212 #endif  // BASE_TASK_THREAD_POOL_SEQUENCE_H_
213