1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_worker_pool.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/task_scheduler/delayed_task_manager.h"
11 #include "base/task_scheduler/task_tracker.h"
12 #include "base/threading/thread_local.h"
13
14 namespace base {
15 namespace internal {
16
17 namespace {
18
19 // The number of SchedulerWorkerPool that are alive in this process. This
20 // variable should only be incremented when the SchedulerWorkerPool instances
21 // are brought up (on the main thread; before any tasks are posted) and
22 // decremented when the same instances are brought down (i.e., only when unit
23 // tests tear down the task environment and never in production). This makes the
24 // variable const while worker threads are up and as such it doesn't need to be
25 // atomic. It is used to tell when a task is posted from the main thread after
26 // the task environment was brought down in unit tests so that
27 // SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting
28 // such callers know they should complete necessary work synchronously. Note:
29 // |!g_active_pools_count| is generally equivalent to
30 // |!TaskScheduler::GetInstance()| but has the advantage of being valid in
31 // task_scheduler unit tests that don't instantiate a full TaskScheduler.
32 int g_active_pools_count = 0;
33
34 // SchedulerWorkerPool that owns the current thread, if any.
35 LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky
36 tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER;
37
GetCurrentWorkerPool()38 const SchedulerWorkerPool* GetCurrentWorkerPool() {
39 return tls_current_worker_pool.Get().Get();
40 }
41
42 } // namespace
43
44 // A task runner that runs tasks in parallel.
45 class SchedulerParallelTaskRunner : public TaskRunner {
46 public:
47 // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so
48 // long as |worker_pool| is alive.
49 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerParallelTaskRunner(const TaskTraits & traits,SchedulerWorkerPool * worker_pool)50 SchedulerParallelTaskRunner(const TaskTraits& traits,
51 SchedulerWorkerPool* worker_pool)
52 : traits_(traits), worker_pool_(worker_pool) {
53 DCHECK(worker_pool_);
54 }
55
56 // TaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)57 bool PostDelayedTask(const Location& from_here,
58 OnceClosure closure,
59 TimeDelta delay) override {
60 if (!g_active_pools_count)
61 return false;
62
63 // Post the task as part of a one-off single-task Sequence.
64 return worker_pool_->PostTaskWithSequence(
65 Task(from_here, std::move(closure), traits_, delay),
66 MakeRefCounted<Sequence>());
67 }
68
RunsTasksInCurrentSequence() const69 bool RunsTasksInCurrentSequence() const override {
70 return GetCurrentWorkerPool() == worker_pool_;
71 }
72
73 private:
74 ~SchedulerParallelTaskRunner() override = default;
75
76 const TaskTraits traits_;
77 SchedulerWorkerPool* const worker_pool_;
78
79 DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner);
80 };
81
82 // A task runner that runs tasks in sequence.
83 class SchedulerSequencedTaskRunner : public SequencedTaskRunner {
84 public:
85 // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks
86 // so long as |worker_pool| is alive.
87 // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory.
SchedulerSequencedTaskRunner(const TaskTraits & traits,SchedulerWorkerPool * worker_pool)88 SchedulerSequencedTaskRunner(const TaskTraits& traits,
89 SchedulerWorkerPool* worker_pool)
90 : traits_(traits), worker_pool_(worker_pool) {
91 DCHECK(worker_pool_);
92 }
93
94 // SequencedTaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)95 bool PostDelayedTask(const Location& from_here,
96 OnceClosure closure,
97 TimeDelta delay) override {
98 if (!g_active_pools_count)
99 return false;
100
101 Task task(from_here, std::move(closure), traits_, delay);
102 task.sequenced_task_runner_ref = this;
103
104 // Post the task as part of |sequence_|.
105 return worker_pool_->PostTaskWithSequence(std::move(task), sequence_);
106 }
107
PostNonNestableDelayedTask(const Location & from_here,OnceClosure closure,base::TimeDelta delay)108 bool PostNonNestableDelayedTask(const Location& from_here,
109 OnceClosure closure,
110 base::TimeDelta delay) override {
111 // Tasks are never nested within the task scheduler.
112 return PostDelayedTask(from_here, std::move(closure), delay);
113 }
114
RunsTasksInCurrentSequence() const115 bool RunsTasksInCurrentSequence() const override {
116 return sequence_->token() == SequenceToken::GetForCurrentThread();
117 }
118
119 private:
120 ~SchedulerSequencedTaskRunner() override = default;
121
122 // Sequence for all Tasks posted through this TaskRunner.
123 const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>();
124
125 const TaskTraits traits_;
126 SchedulerWorkerPool* const worker_pool_;
127
128 DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner);
129 };
130
CreateTaskRunnerWithTraits(const TaskTraits & traits)131 scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits(
132 const TaskTraits& traits) {
133 return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this);
134 }
135
136 scoped_refptr<SequencedTaskRunner>
CreateSequencedTaskRunnerWithTraits(const TaskTraits & traits)137 SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits(
138 const TaskTraits& traits) {
139 return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this);
140 }
141
PostTaskWithSequence(Task task,scoped_refptr<Sequence> sequence)142 bool SchedulerWorkerPool::PostTaskWithSequence(
143 Task task,
144 scoped_refptr<Sequence> sequence) {
145 DCHECK(task.task);
146 DCHECK(sequence);
147
148 if (!task_tracker_->WillPostTask(&task))
149 return false;
150
151 if (task.delayed_run_time.is_null()) {
152 PostTaskWithSequenceNow(std::move(task), std::move(sequence));
153 } else {
154 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
155 // for details.
156 CHECK(task.task);
157 delayed_task_manager_->AddDelayedTask(
158 std::move(task), BindOnce(
159 [](scoped_refptr<Sequence> sequence,
160 SchedulerWorkerPool* worker_pool, Task task) {
161 worker_pool->PostTaskWithSequenceNow(
162 std::move(task), std::move(sequence));
163 },
164 std::move(sequence), Unretained(this)));
165 }
166
167 return true;
168 }
169
SchedulerWorkerPool(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)170 SchedulerWorkerPool::SchedulerWorkerPool(
171 TrackedRef<TaskTracker> task_tracker,
172 DelayedTaskManager* delayed_task_manager)
173 : task_tracker_(std::move(task_tracker)),
174 delayed_task_manager_(delayed_task_manager) {
175 DCHECK(task_tracker_);
176 DCHECK(delayed_task_manager_);
177 ++g_active_pools_count;
178 }
179
~SchedulerWorkerPool()180 SchedulerWorkerPool::~SchedulerWorkerPool() {
181 --g_active_pools_count;
182 DCHECK_GE(g_active_pools_count, 0);
183 }
184
BindToCurrentThread()185 void SchedulerWorkerPool::BindToCurrentThread() {
186 DCHECK(!GetCurrentWorkerPool());
187 tls_current_worker_pool.Get().Set(this);
188 }
189
UnbindFromCurrentThread()190 void SchedulerWorkerPool::UnbindFromCurrentThread() {
191 DCHECK(GetCurrentWorkerPool());
192 tls_current_worker_pool.Get().Set(nullptr);
193 }
194
PostTaskWithSequenceNow(Task task,scoped_refptr<Sequence> sequence)195 void SchedulerWorkerPool::PostTaskWithSequenceNow(
196 Task task,
197 scoped_refptr<Sequence> sequence) {
198 DCHECK(task.task);
199 DCHECK(sequence);
200
201 // Confirm that |task| is ready to run (its delayed run time is either null or
202 // in the past).
203 DCHECK_LE(task.delayed_run_time, TimeTicks::Now());
204
205 const bool sequence_was_empty = sequence->PushTask(std::move(task));
206 if (sequence_was_empty) {
207 // Try to schedule |sequence| if it was empty before |task| was inserted
208 // into it. Otherwise, one of these must be true:
209 // - |sequence| is already scheduled, or,
210 // - The pool is running a Task from |sequence|. The pool is expected to
211 // reschedule |sequence| once it's done running the Task.
212 sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this);
213 if (sequence)
214 OnCanScheduleSequence(std::move(sequence));
215 }
216 }
217
218 } // namespace internal
219 } // namespace base
220