• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task_scheduler/scheduler_worker_pool.h"
6 
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/task_scheduler/delayed_task_manager.h"
11 #include "base/task_scheduler/task_tracker.h"
12 #include "base/threading/thread_local.h"
13 
14 namespace base {
15 namespace internal {
16 
17 namespace {
18 
19 // The number of SchedulerWorkerPool that are alive in this process. This
20 // variable should only be incremented when the SchedulerWorkerPool instances
21 // are brought up (on the main thread; before any tasks are posted) and
22 // decremented when the same instances are brought down (i.e., only when unit
23 // tests tear down the task environment and never in production). This makes the
24 // variable const while worker threads are up and as such it doesn't need to be
25 // atomic. It is used to tell when a task is posted from the main thread after
26 // the task environment was brought down in unit tests so that
27 // SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting
28 // such callers know they should complete necessary work synchronously. Note:
29 // |!g_active_pools_count| is generally equivalent to
30 // |!TaskScheduler::GetInstance()| but has the advantage of being valid in
31 // task_scheduler unit tests that don't instantiate a full TaskScheduler.
32 int g_active_pools_count = 0;
33 
34 // SchedulerWorkerPool that owns the current thread, if any.
35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
36     tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;
37 
GetCurrentWorkerPool()38 const SchedulerWorkerPool* GetCurrentWorkerPool() {
39   return tls_current_worker_pool.Get().Get();
40 }
41 
42 }  // namespace
43 
44 // A task runner that runs tasks in parallel.
45 class SchedulerParallelTaskRunner : public TaskRunner {
46  public:
47   // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
48   // long as |worker_pool| is alive.
49   // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits & traits,SchedulerWorkerPool * worker_pool)50   SchedulerParallelTaskRunner(const TaskTraits& traits,
51                               SchedulerWorkerPool* worker_pool)
52       : traits_(traits), worker_pool_(worker_pool) {
53     DCHECK(worker_pool_);
54   }
55 
56   // TaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)57   bool PostDelayedTask(const Location& from_here,
58                        OnceClosure closure,
59                        TimeDelta delay) override {
60     if (!g_active_pools_count)
61       return false;
62 
63     // Post the task as part of a one-off single-task Sequence.
64     return worker_pool_->PostTaskWithSequence(
65         Task(from_here, std::move(closure), traits_, delay),
66         MakeRefCounted<Sequence>());
67   }
68 
RunsTasksInCurrentSequence() const69   bool RunsTasksInCurrentSequence() const override {
70     return GetCurrentWorkerPool() == worker_pool_;
71   }
72 
73  private:
74   ~SchedulerParallelTaskRunner() override = default;
75 
76   const TaskTraits traits_;
77   SchedulerWorkerPool* const worker_pool_;
78 
79   DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
80 };
81 
82 // A task runner that runs tasks in sequence.
83 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
84  public:
85   // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
86   // so long as |worker_pool| is alive.
87   // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits & traits,SchedulerWorkerPool * worker_pool)88   SchedulerSequencedTaskRunner(const TaskTraits& traits,
89                                SchedulerWorkerPool* worker_pool)
90       : traits_(traits), worker_pool_(worker_pool) {
91     DCHECK(worker_pool_);
92   }
93 
94   // SequencedTaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)95   bool PostDelayedTask(const Location& from_here,
96                        OnceClosure closure,
97                        TimeDelta delay) override {
98     if (!g_active_pools_count)
99       return false;
100 
101     Task task(from_here, std::move(closure), traits_, delay);
102     task.sequenced_task_runner_ref = this;
103 
104     // Post the task as part of |sequence_|.
105     return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
106   }
107 
PostNonNestableDelayedTask(const Location & from_here,OnceClosure closure,base::TimeDelta delay)108   bool PostNonNestableDelayedTask(const Location& from_here,
109                                   OnceClosure closure,
110                                   base::TimeDelta delay) override {
111     // Tasks are never nested within the task scheduler.
112     return PostDelayedTask(from_here, std::move(closure), delay);
113   }
114 
RunsTasksInCurrentSequence() const115   bool RunsTasksInCurrentSequence() const override {
116     return sequence_->token() == SequenceToken::GetForCurrentThread();
117   }
118 
119  private:
120   ~SchedulerSequencedTaskRunner() override = default;
121 
122   // Sequence for all Tasks posted through this TaskRunner.
123   const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>();
124 
125   const TaskTraits traits_;
126   SchedulerWorkerPool* const worker_pool_;
127 
128   DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
129 };
130 
CreateTaskRunnerWithTraits(const TaskTraits & traits)131 scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits(
132     const TaskTraits& traits) {
133   return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this);
134 }
135 
136 scoped_refptr<SequencedTaskRunner>
CreateSequencedTaskRunnerWithTraits(const TaskTraits & traits)137 SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits(
138     const TaskTraits& traits) {
139   return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this);
140 }
141 
PostTaskWithSequence(Task task,scoped_refptr<Sequence> sequence)142 bool SchedulerWorkerPool::PostTaskWithSequence(
143     Task task,
144     scoped_refptr<Sequence> sequence) {
145   DCHECK(task.task);
146   DCHECK(sequence);
147 
148   if (!task_tracker_->WillPostTask(&task))
149     return false;
150 
151   if (task.delayed_run_time.is_null()) {
152     PostTaskWithSequenceNow(std::move(task), std::move(sequence));
153   } else {
154     // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
155     // for details.
156     CHECK(task.task);
157     delayed_task_manager_->AddDelayedTask(
158         std::move(task), BindOnce(
159                              [](scoped_refptr<Sequence> sequence,
160                                 SchedulerWorkerPool* worker_pool, Task task) {
161                                worker_pool->PostTaskWithSequenceNow(
162                                    std::move(task), std::move(sequence));
163                              },
164                              std::move(sequence), Unretained(this)));
165   }
166 
167   return true;
168 }
169 
SchedulerWorkerPool(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)170 SchedulerWorkerPool::SchedulerWorkerPool(
171     TrackedRef<TaskTracker> task_tracker,
172     DelayedTaskManager* delayed_task_manager)
173     : task_tracker_(std::move(task_tracker)),
174       delayed_task_manager_(delayed_task_manager) {
175   DCHECK(task_tracker_);
176   DCHECK(delayed_task_manager_);
177   ++g_active_pools_count;
178 }
179 
~SchedulerWorkerPool()180 SchedulerWorkerPool::~SchedulerWorkerPool() {
181   --g_active_pools_count;
182   DCHECK_GE(g_active_pools_count, 0);
183 }
184 
BindToCurrentThread()185 void SchedulerWorkerPool::BindToCurrentThread() {
186   DCHECK(!GetCurrentWorkerPool());
187   tls_current_worker_pool.Get().Set(this);
188 }
189 
UnbindFromCurrentThread()190 void SchedulerWorkerPool::UnbindFromCurrentThread() {
191   DCHECK(GetCurrentWorkerPool());
192   tls_current_worker_pool.Get().Set(nullptr);
193 }
194 
PostTaskWithSequenceNow(Task task,scoped_refptr<Sequence> sequence)195 void SchedulerWorkerPool::PostTaskWithSequenceNow(
196     Task task,
197     scoped_refptr<Sequence> sequence) {
198   DCHECK(task.task);
199   DCHECK(sequence);
200 
201   // Confirm that |task| is ready to run (its delayed run time is either null or
202   // in the past).
203   DCHECK_LE(task.delayed_run_time, TimeTicks::Now());
204 
205   const bool sequence_was_empty = sequence->PushTask(std::move(task));
206   if (sequence_was_empty) {
207     // Try to schedule |sequence| if it was empty before |task| was inserted
208     // into it. Otherwise, one of these must be true:
209     // - |sequence| is already scheduled, or,
210     // - The pool is running a Task from |sequence|. The pool is expected to
211     //   reschedule |sequence| once it's done running the Task.
212     sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this);
213     if (sequence)
214       OnCanScheduleSequence(std::move(sequence));
215   }
216 }
217 
218 }  // namespace internal
219 }  // namespace base
220