• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/thread_controller_impl.h"
6 
7 #include "base/bind.h"
8 #include "base/memory/ptr_util.h"
9 #include "base/message_loop/message_loop.h"
10 #include "base/run_loop.h"
11 #include "base/task/sequence_manager/lazy_now.h"
12 #include "base/task/sequence_manager/sequenced_task_source.h"
13 #include "base/trace_event/trace_event.h"
14 
15 namespace base {
16 namespace sequence_manager {
17 namespace internal {
18 
ThreadControllerImpl(MessageLoop * message_loop,scoped_refptr<SingleThreadTaskRunner> task_runner,const TickClock * time_source)19 ThreadControllerImpl::ThreadControllerImpl(
20     MessageLoop* message_loop,
21     scoped_refptr<SingleThreadTaskRunner> task_runner,
22     const TickClock* time_source)
23     : message_loop_(message_loop),
24       task_runner_(task_runner),
25       message_loop_task_runner_(message_loop ? message_loop->task_runner()
26                                              : nullptr),
27       time_source_(time_source),
28       weak_factory_(this) {
29   immediate_do_work_closure_ =
30       BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
31                     WorkType::kImmediate);
32   delayed_do_work_closure_ =
33       BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
34                     WorkType::kDelayed);
35 }
36 
37 ThreadControllerImpl::~ThreadControllerImpl() = default;
38 
39 ThreadControllerImpl::AnySequence::AnySequence() = default;
40 
41 ThreadControllerImpl::AnySequence::~AnySequence() = default;
42 
43 ThreadControllerImpl::MainSequenceOnly::MainSequenceOnly() = default;
44 
45 ThreadControllerImpl::MainSequenceOnly::~MainSequenceOnly() = default;
46 
Create(MessageLoop * message_loop,const TickClock * time_source)47 std::unique_ptr<ThreadControllerImpl> ThreadControllerImpl::Create(
48     MessageLoop* message_loop,
49     const TickClock* time_source) {
50   return WrapUnique(new ThreadControllerImpl(
51       message_loop, message_loop->task_runner(), time_source));
52 }
53 
SetSequencedTaskSource(SequencedTaskSource * sequence)54 void ThreadControllerImpl::SetSequencedTaskSource(
55     SequencedTaskSource* sequence) {
56   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
57   DCHECK(sequence);
58   DCHECK(!sequence_);
59   sequence_ = sequence;
60 }
61 
ScheduleWork()62 void ThreadControllerImpl::ScheduleWork() {
63   DCHECK(sequence_);
64   AutoLock lock(any_sequence_lock_);
65   // Don't post a DoWork if there's an immediate DoWork in flight or if we're
66   // inside a top level DoWork. We can rely on a continuation being posted as
67   // needed.
68   if (any_sequence().immediate_do_work_posted ||
69       (any_sequence().do_work_running_count > any_sequence().nesting_depth)) {
70     return;
71   }
72   any_sequence().immediate_do_work_posted = true;
73 
74   TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
75                "ThreadControllerImpl::ScheduleWork::PostTask");
76   task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
77 }
78 
SetNextDelayedDoWork(LazyNow * lazy_now,TimeTicks run_time)79 void ThreadControllerImpl::SetNextDelayedDoWork(LazyNow* lazy_now,
80                                                 TimeTicks run_time) {
81   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
82   DCHECK(sequence_);
83 
84   if (main_sequence_only().next_delayed_do_work == run_time)
85     return;
86 
87   // Cancel DoWork if it was scheduled and we set an "infinite" delay now.
88   if (run_time == TimeTicks::Max()) {
89     cancelable_delayed_do_work_closure_.Cancel();
90     main_sequence_only().next_delayed_do_work = TimeTicks::Max();
91     return;
92   }
93 
94   // If DoWork is running then we don't need to do anything because it will post
95   // a continuation as needed. Bailing out here is by far the most common case.
96   if (main_sequence_only().do_work_running_count >
97       main_sequence_only().nesting_depth) {
98     return;
99   }
100 
101   // If DoWork is about to run then we also don't need to do anything.
102   {
103     AutoLock lock(any_sequence_lock_);
104     if (any_sequence().immediate_do_work_posted)
105       return;
106   }
107 
108   base::TimeDelta delay = std::max(TimeDelta(), run_time - lazy_now->Now());
109   TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
110                "ThreadControllerImpl::SetNextDelayedDoWork::PostDelayedTask",
111                "delay_ms", delay.InMillisecondsF());
112 
113   main_sequence_only().next_delayed_do_work = run_time;
114   // Reset also causes cancellation of the previous DoWork task.
115   cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
116   task_runner_->PostDelayedTask(
117       FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay);
118 }
119 
RunsTasksInCurrentSequence()120 bool ThreadControllerImpl::RunsTasksInCurrentSequence() {
121   return task_runner_->RunsTasksInCurrentSequence();
122 }
123 
GetClock()124 const TickClock* ThreadControllerImpl::GetClock() {
125   return time_source_;
126 }
127 
SetDefaultTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner)128 void ThreadControllerImpl::SetDefaultTaskRunner(
129     scoped_refptr<SingleThreadTaskRunner> task_runner) {
130   if (!message_loop_)
131     return;
132   message_loop_->SetTaskRunner(task_runner);
133 }
134 
RestoreDefaultTaskRunner()135 void ThreadControllerImpl::RestoreDefaultTaskRunner() {
136   if (!message_loop_)
137     return;
138   message_loop_->SetTaskRunner(message_loop_task_runner_);
139 }
140 
WillQueueTask(PendingTask * pending_task)141 void ThreadControllerImpl::WillQueueTask(PendingTask* pending_task) {
142   task_annotator_.WillQueueTask("SequenceManager::PostTask", pending_task);
143 }
144 
DoWork(WorkType work_type)145 void ThreadControllerImpl::DoWork(WorkType work_type) {
146   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
147   DCHECK(sequence_);
148 
149   {
150     AutoLock lock(any_sequence_lock_);
151     if (work_type == WorkType::kImmediate)
152       any_sequence().immediate_do_work_posted = false;
153     any_sequence().do_work_running_count++;
154   }
155 
156   main_sequence_only().do_work_running_count++;
157 
158   WeakPtr<ThreadControllerImpl> weak_ptr = weak_factory_.GetWeakPtr();
159   // TODO(scheduler-dev): Consider moving to a time based work batch instead.
160   for (int i = 0; i < main_sequence_only().work_batch_size_; i++) {
161     Optional<PendingTask> task = sequence_->TakeTask();
162     if (!task)
163       break;
164 
165     TRACE_TASK_EXECUTION("ThreadControllerImpl::DoWork", *task);
166     task_annotator_.RunTask("ThreadControllerImpl::DoWork", &*task);
167 
168     if (!weak_ptr)
169       return;
170 
171     sequence_->DidRunTask();
172 
173     // NOTE: https://crbug.com/828835.
174     // When we're running inside a nested RunLoop it may quit anytime, so any
175     // outstanding pending tasks must run in the outer RunLoop
176     // (see SequenceManagerTestWithMessageLoop.QuitWhileNested test).
177     // Unfortunately, it's MessageLoop who's receving that signal and we can't
178     // know it before we return from DoWork, hence, OnExitNestedRunLoop
179     // will be called later. Since we must implement ThreadController and
180     // SequenceManager in conformance with MessageLoop task runners, we need
181     // to disable this batching optimization while nested.
182     // Implementing RunLoop::Delegate ourselves will help to resolve this issue.
183     if (main_sequence_only().nesting_depth > 0)
184       break;
185   }
186 
187   main_sequence_only().do_work_running_count--;
188 
189   {
190     AutoLock lock(any_sequence_lock_);
191     any_sequence().do_work_running_count--;
192     DCHECK_GE(any_sequence().do_work_running_count, 0);
193     LazyNow lazy_now(time_source_);
194     TimeDelta delay_till_next_task = sequence_->DelayTillNextTask(&lazy_now);
195     if (delay_till_next_task <= TimeDelta()) {
196       // The next task needs to run immediately, post a continuation if needed.
197       if (!any_sequence().immediate_do_work_posted) {
198         any_sequence().immediate_do_work_posted = true;
199         task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
200       }
201     } else if (delay_till_next_task < TimeDelta::Max()) {
202       // The next task needs to run after a delay, post a continuation if
203       // needed.
204       TimeTicks next_task_at = lazy_now.Now() + delay_till_next_task;
205       if (next_task_at != main_sequence_only().next_delayed_do_work) {
206         main_sequence_only().next_delayed_do_work = next_task_at;
207         cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
208         task_runner_->PostDelayedTask(
209             FROM_HERE, cancelable_delayed_do_work_closure_.callback(),
210             delay_till_next_task);
211       }
212     } else {
213       // There is no next task scheduled.
214       main_sequence_only().next_delayed_do_work = TimeTicks::Max();
215     }
216   }
217 }
218 
AddNestingObserver(RunLoop::NestingObserver * observer)219 void ThreadControllerImpl::AddNestingObserver(
220     RunLoop::NestingObserver* observer) {
221   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222   nesting_observer_ = observer;
223   RunLoop::AddNestingObserverOnCurrentThread(this);
224 }
225 
RemoveNestingObserver(RunLoop::NestingObserver * observer)226 void ThreadControllerImpl::RemoveNestingObserver(
227     RunLoop::NestingObserver* observer) {
228   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
229   DCHECK_EQ(observer, nesting_observer_);
230   nesting_observer_ = nullptr;
231   RunLoop::RemoveNestingObserverOnCurrentThread(this);
232 }
233 
OnBeginNestedRunLoop()234 void ThreadControllerImpl::OnBeginNestedRunLoop() {
235   main_sequence_only().nesting_depth++;
236   {
237     // We just entered a nested run loop, make sure there's a DoWork posted or
238     // the system will grind to a halt.
239     AutoLock lock(any_sequence_lock_);
240     any_sequence().nesting_depth++;
241     if (!any_sequence().immediate_do_work_posted) {
242       any_sequence().immediate_do_work_posted = true;
243       TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
244                    "ThreadControllerImpl::OnBeginNestedRunLoop::PostTask");
245       task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
246     }
247   }
248   if (nesting_observer_)
249     nesting_observer_->OnBeginNestedRunLoop();
250 }
251 
OnExitNestedRunLoop()252 void ThreadControllerImpl::OnExitNestedRunLoop() {
253   main_sequence_only().nesting_depth--;
254   {
255     AutoLock lock(any_sequence_lock_);
256     any_sequence().nesting_depth--;
257     DCHECK_GE(any_sequence().nesting_depth, 0);
258   }
259   if (nesting_observer_)
260     nesting_observer_->OnExitNestedRunLoop();
261 }
262 
SetWorkBatchSize(int work_batch_size)263 void ThreadControllerImpl::SetWorkBatchSize(int work_batch_size) {
264   main_sequence_only().work_batch_size_ = work_batch_size;
265 }
266 
267 }  // namespace internal
268 }  // namespace sequence_manager
269 }  // namespace base
270