• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/thread_controller_with_message_pump_impl.h"
6 
7 #include "base/auto_reset.h"
8 #include "base/message_loop/message_pump_default.h"
9 #include "base/time/tick_clock.h"
10 #include "base/trace_event/trace_event.h"
11 
12 namespace base {
13 namespace sequence_manager {
14 namespace internal {
15 
ThreadControllerWithMessagePumpImpl(TickClock * time_source)16 ThreadControllerWithMessagePumpImpl::ThreadControllerWithMessagePumpImpl(
17     TickClock* time_source)
18     : main_thread_id_(PlatformThread::CurrentId()),
19       pump_(new MessagePumpDefault()),
20       time_source_(time_source) {
21   RunLoop::RegisterDelegateForCurrentThread(this);
22 }
23 
~ThreadControllerWithMessagePumpImpl()24 ThreadControllerWithMessagePumpImpl::~ThreadControllerWithMessagePumpImpl() {
25   // Destructors of RunLoop::Delegate and ThreadTaskRunnerHandle
26   // will do all the clean-up.
27 }
28 
29 ThreadControllerWithMessagePumpImpl::MainThreadOnly::MainThreadOnly() = default;
30 
31 ThreadControllerWithMessagePumpImpl::MainThreadOnly::~MainThreadOnly() =
32     default;
33 
SetSequencedTaskSource(SequencedTaskSource * task_source)34 void ThreadControllerWithMessagePumpImpl::SetSequencedTaskSource(
35     SequencedTaskSource* task_source) {
36   DCHECK(task_source);
37   DCHECK(!main_thread_only().task_source);
38   main_thread_only().task_source = task_source;
39 }
40 
SetWorkBatchSize(int work_batch_size)41 void ThreadControllerWithMessagePumpImpl::SetWorkBatchSize(
42     int work_batch_size) {
43   DCHECK_GE(work_batch_size, 1);
44   main_thread_only().batch_size = work_batch_size;
45 }
46 
WillQueueTask(PendingTask * pending_task)47 void ThreadControllerWithMessagePumpImpl::WillQueueTask(
48     PendingTask* pending_task) {
49   task_annotator_.WillQueueTask("ThreadController::Task", pending_task);
50 }
51 
ScheduleWork()52 void ThreadControllerWithMessagePumpImpl::ScheduleWork() {
53   // Continuation will be posted if necessary.
54   if (RunsTasksInCurrentSequence() && is_doing_work())
55     return;
56 
57   pump_->ScheduleWork();
58 }
59 
SetNextDelayedDoWork(LazyNow * lazy_now,TimeTicks run_time)60 void ThreadControllerWithMessagePumpImpl::SetNextDelayedDoWork(
61     LazyNow* lazy_now,
62     TimeTicks run_time) {
63   if (main_thread_only().next_delayed_work == run_time)
64     return;
65   main_thread_only().next_delayed_work = run_time;
66 
67   if (run_time == TimeTicks::Max())
68     return;
69 
70   // Continuation will be posted if necessary.
71   if (is_doing_work())
72     return;
73 
74   // |lazy_now| will be removed in this method soon.
75   DCHECK_LT(time_source_->NowTicks(), run_time);
76   pump_->ScheduleDelayedWork(run_time);
77 }
78 
GetClock()79 const TickClock* ThreadControllerWithMessagePumpImpl::GetClock() {
80   return time_source_;
81 }
82 
RunsTasksInCurrentSequence()83 bool ThreadControllerWithMessagePumpImpl::RunsTasksInCurrentSequence() {
84   return main_thread_id_ == PlatformThread::CurrentId();
85 }
86 
SetDefaultTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner)87 void ThreadControllerWithMessagePumpImpl::SetDefaultTaskRunner(
88     scoped_refptr<SingleThreadTaskRunner> task_runner) {
89   main_thread_only().thread_task_runner_handle =
90       std::make_unique<ThreadTaskRunnerHandle>(task_runner);
91 }
92 
RestoreDefaultTaskRunner()93 void ThreadControllerWithMessagePumpImpl::RestoreDefaultTaskRunner() {
94   // There's no default task runner unlike with the MessageLoop.
95   main_thread_only().thread_task_runner_handle.reset();
96 }
97 
AddNestingObserver(RunLoop::NestingObserver * observer)98 void ThreadControllerWithMessagePumpImpl::AddNestingObserver(
99     RunLoop::NestingObserver* observer) {
100   DCHECK_LE(main_thread_only().run_depth, 1);
101   DCHECK(!main_thread_only().nesting_observer);
102   DCHECK(observer);
103   main_thread_only().nesting_observer = observer;
104 }
105 
RemoveNestingObserver(RunLoop::NestingObserver * observer)106 void ThreadControllerWithMessagePumpImpl::RemoveNestingObserver(
107     RunLoop::NestingObserver* observer) {
108   DCHECK_EQ(main_thread_only().nesting_observer, observer);
109   main_thread_only().nesting_observer = nullptr;
110 }
111 
DoWork()112 bool ThreadControllerWithMessagePumpImpl::DoWork() {
113   DCHECK(main_thread_only().task_source);
114   bool task_ran = false;
115 
116   {
117     AutoReset<int> do_work_scope(&main_thread_only().do_work_depth,
118                                  main_thread_only().do_work_depth + 1);
119 
120     for (int i = 0; i < main_thread_only().batch_size; i++) {
121       Optional<PendingTask> task = main_thread_only().task_source->TakeTask();
122       if (!task)
123         break;
124 
125       TRACE_TASK_EXECUTION("ThreadController::Task", *task);
126       task_annotator_.RunTask("ThreadController::Task", &*task);
127       task_ran = true;
128 
129       main_thread_only().task_source->DidRunTask();
130 
131       if (main_thread_only().quit_do_work) {
132         // When Quit() is called we must stop running the batch because
133         // caller expects per-task granularity.
134         main_thread_only().quit_do_work = false;
135         return true;
136       }
137     }
138   }  // DoWorkScope.
139 
140   LazyNow lazy_now(time_source_);
141   TimeDelta do_work_delay =
142       main_thread_only().task_source->DelayTillNextTask(&lazy_now);
143   DCHECK_GE(do_work_delay, TimeDelta());
144   // Schedule a continuation.
145   if (do_work_delay.is_zero()) {
146     // Need to run new work immediately.
147     pump_->ScheduleWork();
148   } else if (do_work_delay != TimeDelta::Max()) {
149     SetNextDelayedDoWork(&lazy_now, lazy_now.Now() + do_work_delay);
150   } else {
151     SetNextDelayedDoWork(&lazy_now, TimeTicks::Max());
152   }
153 
154   return task_ran;
155 }
156 
DoDelayedWork(TimeTicks * next_run_time)157 bool ThreadControllerWithMessagePumpImpl::DoDelayedWork(
158     TimeTicks* next_run_time) {
159   // Delayed work is getting processed in DoWork().
160   return false;
161 }
162 
DoIdleWork()163 bool ThreadControllerWithMessagePumpImpl::DoIdleWork() {
164   // RunLoop::Delegate knows whether we called Run() or RunUntilIdle().
165   if (ShouldQuitWhenIdle())
166     Quit();
167   return false;
168 }
169 
Run(bool application_tasks_allowed)170 void ThreadControllerWithMessagePumpImpl::Run(bool application_tasks_allowed) {
171   // No system messages are being processed by this class.
172   DCHECK(application_tasks_allowed);
173 
174   // We already have a MessagePump::Run() running, so we're in a nested RunLoop.
175   if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer)
176     main_thread_only().nesting_observer->OnBeginNestedRunLoop();
177 
178   {
179     AutoReset<int> run_scope(&main_thread_only().run_depth,
180                              main_thread_only().run_depth + 1);
181     // MessagePump::Run() blocks until Quit() called, but previously started
182     // Run() calls continue to block.
183     pump_->Run(this);
184   }
185 
186   // We'll soon continue to run an outer MessagePump::Run() loop.
187   if (main_thread_only().run_depth > 0 && main_thread_only().nesting_observer)
188     main_thread_only().nesting_observer->OnExitNestedRunLoop();
189 }
190 
Quit()191 void ThreadControllerWithMessagePumpImpl::Quit() {
192   // Interrupt a batch of work.
193   if (is_doing_work())
194     main_thread_only().quit_do_work = true;
195   // If we're in a nested RunLoop, continuation will be posted if necessary.
196   pump_->Quit();
197 }
198 
EnsureWorkScheduled()199 void ThreadControllerWithMessagePumpImpl::EnsureWorkScheduled() {
200   ScheduleWork();
201 }
202 
203 }  // namespace internal
204 }  // namespace sequence_manager
205 }  // namespace base
206