1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/thread_controller_impl.h"
6
7 #include "base/bind.h"
8 #include "base/memory/ptr_util.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/run_loop.h"
11 #include "base/task/sequence_manager/lazy_now.h"
12 #include "base/task/sequence_manager/sequenced_task_source.h"
13 #include "base/trace_event/trace_event.h"
14
15 namespace base {
16 namespace sequence_manager {
17 namespace internal {
18
ThreadControllerImpl(MessageLoop * message_loop,scoped_refptr<SingleThreadTaskRunner> task_runner,const TickClock * time_source)19 ThreadControllerImpl::ThreadControllerImpl(
20 MessageLoop* message_loop,
21 scoped_refptr<SingleThreadTaskRunner> task_runner,
22 const TickClock* time_source)
23 : message_loop_(message_loop),
24 task_runner_(task_runner),
25 message_loop_task_runner_(message_loop ? message_loop->task_runner()
26 : nullptr),
27 time_source_(time_source),
28 weak_factory_(this) {
29 immediate_do_work_closure_ =
30 BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
31 WorkType::kImmediate);
32 delayed_do_work_closure_ =
33 BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
34 WorkType::kDelayed);
35 }
36
37 ThreadControllerImpl::~ThreadControllerImpl() = default;
38
39 ThreadControllerImpl::AnySequence::AnySequence() = default;
40
41 ThreadControllerImpl::AnySequence::~AnySequence() = default;
42
43 ThreadControllerImpl::MainSequenceOnly::MainSequenceOnly() = default;
44
45 ThreadControllerImpl::MainSequenceOnly::~MainSequenceOnly() = default;
46
Create(MessageLoop * message_loop,const TickClock * time_source)47 std::unique_ptr<ThreadControllerImpl> ThreadControllerImpl::Create(
48 MessageLoop* message_loop,
49 const TickClock* time_source) {
50 return WrapUnique(new ThreadControllerImpl(
51 message_loop, message_loop->task_runner(), time_source));
52 }
53
SetSequencedTaskSource(SequencedTaskSource * sequence)54 void ThreadControllerImpl::SetSequencedTaskSource(
55 SequencedTaskSource* sequence) {
56 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
57 DCHECK(sequence);
58 DCHECK(!sequence_);
59 sequence_ = sequence;
60 }
61
ScheduleWork()62 void ThreadControllerImpl::ScheduleWork() {
63 DCHECK(sequence_);
64 AutoLock lock(any_sequence_lock_);
65 // Don't post a DoWork if there's an immediate DoWork in flight or if we're
66 // inside a top level DoWork. We can rely on a continuation being posted as
67 // needed.
68 if (any_sequence().immediate_do_work_posted ||
69 (any_sequence().do_work_running_count > any_sequence().nesting_depth)) {
70 return;
71 }
72 any_sequence().immediate_do_work_posted = true;
73
74 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
75 "ThreadControllerImpl::ScheduleWork::PostTask");
76 task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
77 }
78
SetNextDelayedDoWork(LazyNow * lazy_now,TimeTicks run_time)79 void ThreadControllerImpl::SetNextDelayedDoWork(LazyNow* lazy_now,
80 TimeTicks run_time) {
81 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
82 DCHECK(sequence_);
83
84 if (main_sequence_only().next_delayed_do_work == run_time)
85 return;
86
87 // Cancel DoWork if it was scheduled and we set an "infinite" delay now.
88 if (run_time == TimeTicks::Max()) {
89 cancelable_delayed_do_work_closure_.Cancel();
90 main_sequence_only().next_delayed_do_work = TimeTicks::Max();
91 return;
92 }
93
94 // If DoWork is running then we don't need to do anything because it will post
95 // a continuation as needed. Bailing out here is by far the most common case.
96 if (main_sequence_only().do_work_running_count >
97 main_sequence_only().nesting_depth) {
98 return;
99 }
100
101 // If DoWork is about to run then we also don't need to do anything.
102 {
103 AutoLock lock(any_sequence_lock_);
104 if (any_sequence().immediate_do_work_posted)
105 return;
106 }
107
108 base::TimeDelta delay = std::max(TimeDelta(), run_time - lazy_now->Now());
109 TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
110 "ThreadControllerImpl::SetNextDelayedDoWork::PostDelayedTask",
111 "delay_ms", delay.InMillisecondsF());
112
113 main_sequence_only().next_delayed_do_work = run_time;
114 // Reset also causes cancellation of the previous DoWork task.
115 cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
116 task_runner_->PostDelayedTask(
117 FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay);
118 }
119
RunsTasksInCurrentSequence()120 bool ThreadControllerImpl::RunsTasksInCurrentSequence() {
121 return task_runner_->RunsTasksInCurrentSequence();
122 }
123
GetClock()124 const TickClock* ThreadControllerImpl::GetClock() {
125 return time_source_;
126 }
127
SetDefaultTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner)128 void ThreadControllerImpl::SetDefaultTaskRunner(
129 scoped_refptr<SingleThreadTaskRunner> task_runner) {
130 if (!message_loop_)
131 return;
132 message_loop_->SetTaskRunner(task_runner);
133 }
134
RestoreDefaultTaskRunner()135 void ThreadControllerImpl::RestoreDefaultTaskRunner() {
136 if (!message_loop_)
137 return;
138 message_loop_->SetTaskRunner(message_loop_task_runner_);
139 }
140
WillQueueTask(PendingTask * pending_task)141 void ThreadControllerImpl::WillQueueTask(PendingTask* pending_task) {
142 task_annotator_.WillQueueTask("SequenceManager::PostTask", pending_task);
143 }
144
DoWork(WorkType work_type)145 void ThreadControllerImpl::DoWork(WorkType work_type) {
146 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
147 DCHECK(sequence_);
148
149 {
150 AutoLock lock(any_sequence_lock_);
151 if (work_type == WorkType::kImmediate)
152 any_sequence().immediate_do_work_posted = false;
153 any_sequence().do_work_running_count++;
154 }
155
156 main_sequence_only().do_work_running_count++;
157
158 WeakPtr<ThreadControllerImpl> weak_ptr = weak_factory_.GetWeakPtr();
159 // TODO(scheduler-dev): Consider moving to a time based work batch instead.
160 for (int i = 0; i < main_sequence_only().work_batch_size_; i++) {
161 Optional<PendingTask> task = sequence_->TakeTask();
162 if (!task)
163 break;
164
165 TRACE_TASK_EXECUTION("ThreadControllerImpl::DoWork", *task);
166 task_annotator_.RunTask("ThreadControllerImpl::DoWork", &*task);
167
168 if (!weak_ptr)
169 return;
170
171 sequence_->DidRunTask();
172
173 // NOTE: https://crbug.com/828835.
174 // When we're running inside a nested RunLoop it may quit anytime, so any
175 // outstanding pending tasks must run in the outer RunLoop
176 // (see SequenceManagerTestWithMessageLoop.QuitWhileNested test).
177 // Unfortunately, it's MessageLoop who's receving that signal and we can't
178 // know it before we return from DoWork, hence, OnExitNestedRunLoop
179 // will be called later. Since we must implement ThreadController and
180 // SequenceManager in conformance with MessageLoop task runners, we need
181 // to disable this batching optimization while nested.
182 // Implementing RunLoop::Delegate ourselves will help to resolve this issue.
183 if (main_sequence_only().nesting_depth > 0)
184 break;
185 }
186
187 main_sequence_only().do_work_running_count--;
188
189 {
190 AutoLock lock(any_sequence_lock_);
191 any_sequence().do_work_running_count--;
192 DCHECK_GE(any_sequence().do_work_running_count, 0);
193 LazyNow lazy_now(time_source_);
194 TimeDelta delay_till_next_task = sequence_->DelayTillNextTask(&lazy_now);
195 if (delay_till_next_task <= TimeDelta()) {
196 // The next task needs to run immediately, post a continuation if needed.
197 if (!any_sequence().immediate_do_work_posted) {
198 any_sequence().immediate_do_work_posted = true;
199 task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
200 }
201 } else if (delay_till_next_task < TimeDelta::Max()) {
202 // The next task needs to run after a delay, post a continuation if
203 // needed.
204 TimeTicks next_task_at = lazy_now.Now() + delay_till_next_task;
205 if (next_task_at != main_sequence_only().next_delayed_do_work) {
206 main_sequence_only().next_delayed_do_work = next_task_at;
207 cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
208 task_runner_->PostDelayedTask(
209 FROM_HERE, cancelable_delayed_do_work_closure_.callback(),
210 delay_till_next_task);
211 }
212 } else {
213 // There is no next task scheduled.
214 main_sequence_only().next_delayed_do_work = TimeTicks::Max();
215 }
216 }
217 }
218
AddNestingObserver(RunLoop::NestingObserver * observer)219 void ThreadControllerImpl::AddNestingObserver(
220 RunLoop::NestingObserver* observer) {
221 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222 nesting_observer_ = observer;
223 RunLoop::AddNestingObserverOnCurrentThread(this);
224 }
225
RemoveNestingObserver(RunLoop::NestingObserver * observer)226 void ThreadControllerImpl::RemoveNestingObserver(
227 RunLoop::NestingObserver* observer) {
228 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
229 DCHECK_EQ(observer, nesting_observer_);
230 nesting_observer_ = nullptr;
231 RunLoop::RemoveNestingObserverOnCurrentThread(this);
232 }
233
OnBeginNestedRunLoop()234 void ThreadControllerImpl::OnBeginNestedRunLoop() {
235 main_sequence_only().nesting_depth++;
236 {
237 // We just entered a nested run loop, make sure there's a DoWork posted or
238 // the system will grind to a halt.
239 AutoLock lock(any_sequence_lock_);
240 any_sequence().nesting_depth++;
241 if (!any_sequence().immediate_do_work_posted) {
242 any_sequence().immediate_do_work_posted = true;
243 TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
244 "ThreadControllerImpl::OnBeginNestedRunLoop::PostTask");
245 task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
246 }
247 }
248 if (nesting_observer_)
249 nesting_observer_->OnBeginNestedRunLoop();
250 }
251
OnExitNestedRunLoop()252 void ThreadControllerImpl::OnExitNestedRunLoop() {
253 main_sequence_only().nesting_depth--;
254 {
255 AutoLock lock(any_sequence_lock_);
256 any_sequence().nesting_depth--;
257 DCHECK_GE(any_sequence().nesting_depth, 0);
258 }
259 if (nesting_observer_)
260 nesting_observer_->OnExitNestedRunLoop();
261 }
262
SetWorkBatchSize(int work_batch_size)263 void ThreadControllerImpl::SetWorkBatchSize(int work_batch_size) {
264 main_sequence_only().work_batch_size_ = work_batch_size;
265 }
266
267 } // namespace internal
268 } // namespace sequence_manager
269 } // namespace base
270