1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/thread_controller_with_message_pump_impl.h"
6
7 #include "base/auto_reset.h"
8 #include "base/message_loop/message_pump_default.h"
9 #include "base/time/tick_clock.h"
10 #include "base/trace_event/trace_event.h"
11
12 namespace base {
13 namespace sequence_manager {
14 namespace internal {
15
ThreadControllerWithMessagePumpImpl(TickClock * time_source)16 ThreadControllerWithMessagePumpImpl::ThreadControllerWithMessagePumpImpl(
17 TickClock* time_source)
18 : main_thread_id_(PlatformThread::CurrentId()),
19 pump_(new MessagePumpDefault()),
20 time_source_(time_source) {
21 RunLoop::RegisterDelegateForCurrentThread(this);
22 }
23
~ThreadControllerWithMessagePumpImpl()24 ThreadControllerWithMessagePumpImpl::~ThreadControllerWithMessagePumpImpl() {
25 // Destructors of RunLoop::Delegate and ThreadTaskRunnerHandle
26 // will do all the clean-up.
27 }
28
29 ThreadControllerWithMessagePumpImpl::MainThreadOnly::MainThreadOnly() = default;
30
31 ThreadControllerWithMessagePumpImpl::MainThreadOnly::~MainThreadOnly() =
32 default;
33
SetSequencedTaskSource(SequencedTaskSource * task_source)34 void ThreadControllerWithMessagePumpImpl::SetSequencedTaskSource(
35 SequencedTaskSource* task_source) {
36 DCHECK(task_source);
37 DCHECK(!main_thread_only().task_source);
38 main_thread_only().task_source = task_source;
39 }
40
SetWorkBatchSize(int work_batch_size)41 void ThreadControllerWithMessagePumpImpl::SetWorkBatchSize(
42 int work_batch_size) {
43 DCHECK_GE(work_batch_size, 1);
44 main_thread_only().batch_size = work_batch_size;
45 }
46
WillQueueTask(PendingTask * pending_task)47 void ThreadControllerWithMessagePumpImpl::WillQueueTask(
48 PendingTask* pending_task) {
49 task_annotator_.WillQueueTask("ThreadController::Task", pending_task);
50 }
51
ScheduleWork()52 void ThreadControllerWithMessagePumpImpl::ScheduleWork() {
53 // Continuation will be posted if necessary.
54 if (RunsTasksInCurrentSequence() && is_doing_work())
55 return;
56
57 pump_->ScheduleWork();
58 }
59
SetNextDelayedDoWork(LazyNow * lazy_now,TimeTicks run_time)60 void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork(
61 LazyNow* lazy_now,
62 TimeTicks run_time) {
63 if (main_thread_only().next_delayed_work == run_time)
64 return;
65 main_thread_only().next_delayed_work = run_time;
66
67 if (run_time == TimeTicks::Max())
68 return;
69
70 // Continuation will be posted if necessary.
71 if (is_doing_work())
72 return;
73
74 // |lazy_now| will be removed in this method soon.
75 DCHECK_LT(time_source_->NowTicks(), run_time);
76 pump_->ScheduleDelayedWork(run_time);
77 }
78
GetClock()79 const TickClock* ThreadControllerWithMessagePumpImpl::GetClock() {
80 return time_source_;
81 }
82
RunsTasksInCurrentSequence()83 bool ThreadControllerWithMessagePumpImpl::RunsTasksInCurrentSequence() {
84 return main_thread_id_ == PlatformThread::CurrentId();
85 }
86
SetDefaultTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner)87 void ThreadControllerWithMessagePumpImpl::SetDefaultTaskRunner(
88 scoped_refptr<SingleThreadTaskRunner> task_runner) {
89 main_thread_only().thread_task_runner_handle =
90 std::make_unique<ThreadTaskRunnerHandle>(task_runner);
91 }
92
RestoreDefaultTaskRunner()93 void ThreadControllerWithMessagePumpImpl::RestoreDefaultTaskRunner() {
94 // There's no default task runner unlike with the MessageLoop.
95 main_thread_only().thread_task_runner_handle.reset();
96 }
97
AddNestingObserver(RunLoop::NestingObserver * observer)98 void ThreadControllerWithMessagePumpImpl::AddNestingObserver(
99 RunLoop::NestingObserver* observer) {
100 DCHECK_LE(main_thread_only().run_depth, 1);
101 DCHECK(!main_thread_only().nesting_observer);
102 DCHECK(observer);
103 main_thread_only().nesting_observer = observer;
104 }
105
RemoveNestingObserver(RunLoop::NestingObserver * observer)106 void ThreadControllerWithMessagePumpImpl::RemoveNestingObserver(
107 RunLoop::NestingObserver* observer) {
108 DCHECK_EQ(main_thread_only().nesting_observer, observer);
109 main_thread_only().nesting_observer = nullptr;
110 }
111
DoWork()112 bool ThreadControllerWithMessagePumpImpl::DoWork() {
113 DCHECK(main_thread_only().task_source);
114 bool task_ran = false;
115
116 {
117 AutoReset<int> do_work_scope(&main_thread_only().do_work_depth,
118 main_thread_only().do_work_depth + 1);
119
120 for (int i = 0; i < main_thread_only().batch_size; i++) {
121 Optional<PendingTask> task = main_thread_only().task_source->TakeTask();
122 if (!task)
123 break;
124
125 TRACE_TASK_EXECUTION("ThreadController::Task", *task);
126 task_annotator_.RunTask("ThreadController::Task", &*task);
127 task_ran = true;
128
129 main_thread_only().task_source->DidRunTask();
130
131 if (main_thread_only().quit_do_work) {
132 // When Quit() is called we must stop running the batch because
133 // caller expects per-task granularity.
134 main_thread_only().quit_do_work = false;
135 return true;
136 }
137 }
138 } // DoWorkScope.
139
140 LazyNow lazy_now(time_source_);
141 TimeDelta do_work_delay =
142 main_thread_only().task_source->DelayTillNextTask(&lazy_now);
143 DCHECK_GE(do_work_delay, TimeDelta());
144 // Schedule a continuation.
145 if (do_work_delay.is_zero()) {
146 // Need to run new work immediately.
147 pump_->ScheduleWork();
148 } else if (do_work_delay != TimeDelta::Max()) {
149 SetNextDelayedDoWork(&lazy_now, lazy_now.Now() + do_work_delay);
150 } else {
151 SetNextDelayedDoWork(&lazy_now, TimeTicks::Max());
152 }
153
154 return task_ran;
155 }
156
DoDelayedWork(TimeTicks * next_run_time)157 bool ThreadControllerWithMessagePumpImpl::DoDelayedWork(
158 TimeTicks* next_run_time) {
159 // Delayed work is getting processed in DoWork().
160 return false;
161 }
162
DoIdleWork()163 bool ThreadControllerWithMessagePumpImpl::DoIdleWork() {
164 // RunLoop::Delegate knows whether we called Run() or RunUntilIdle().
165 if (ShouldQuitWhenIdle())
166 Quit();
167 return false;
168 }
169
Run(bool application_tasks_allowed)170 void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) {
171 // No system messages are being processed by this class.
172 DCHECK(application_tasks_allowed);
173
174 // We already have a MessagePump::Run() running, so we're in a nested RunLoop.
175 if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer)
176 main_thread_only().nesting_observer->OnBeginNestedRunLoop();
177
178 {
179 AutoReset<int> run_scope(&main_thread_only().run_depth,
180 main_thread_only().run_depth + 1);
181 // MessagePump::Run() blocks until Quit() called, but previously started
182 // Run() calls continue to block.
183 pump_->Run(this);
184 }
185
186 // We'll soon continue to run an outer MessagePump::Run() loop.
187 if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer)
188 main_thread_only().nesting_observer->OnExitNestedRunLoop();
189 }
190
Quit()191 void ThreadControllerWithMessagePumpImpl::Quit() {
192 // Interrupt a batch of work.
193 if (is_doing_work())
194 main_thread_only().quit_do_work = true;
195 // If we're in a nested RunLoop, continuation will be posted if necessary.
196 pump_->Quit();
197 }
198
EnsureWorkScheduled()199 void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() {
200 ScheduleWork();
201 }
202
203 } // namespace internal
204 } // namespace sequence_manager
205 } // namespace base
206