1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/task_queue.h"
6
7 #include <utility>
8
9 #include "base/functional/bind.h"
10 #include "base/memory/ptr_util.h"
11 #include "base/task/sequence_manager/associated_thread_id.h"
12 #include "base/task/sequence_manager/sequence_manager_impl.h"
13 #include "base/task/sequence_manager/task_queue_impl.h"
14 #include "base/threading/thread_checker.h"
15 #include "base/threading/thread_checker_impl.h"
16 #include "base/time/time.h"
17 #include "base/trace_event/base_tracing.h"
18 #include "third_party/abseil-cpp/absl/types/optional.h"
19
20 namespace base {
21 namespace sequence_manager {
22
QueueEnabledVoter(scoped_refptr<TaskQueue> task_queue)23 TaskQueue::QueueEnabledVoter::QueueEnabledVoter(
24 scoped_refptr<TaskQueue> task_queue)
25 : task_queue_(std::move(task_queue)), enabled_(true) {
26 task_queue_->AddQueueEnabledVoter(enabled_);
27 }
28
~QueueEnabledVoter()29 TaskQueue::QueueEnabledVoter::~QueueEnabledVoter() {
30 task_queue_->RemoveQueueEnabledVoter(enabled_);
31 }
32
SetVoteToEnable(bool enabled)33 void TaskQueue::QueueEnabledVoter::SetVoteToEnable(bool enabled) {
34 if (enabled == enabled_)
35 return;
36 enabled_ = enabled;
37 task_queue_->OnQueueEnabledVoteChanged(enabled_);
38 }
39
AddQueueEnabledVoter(bool voter_is_enabled)40 void TaskQueue::AddQueueEnabledVoter(bool voter_is_enabled) {
41 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
42 ++voter_count_;
43 if (voter_is_enabled)
44 ++enabled_voter_count_;
45 }
46
RemoveQueueEnabledVoter(bool voter_is_enabled)47 void TaskQueue::RemoveQueueEnabledVoter(bool voter_is_enabled) {
48 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
49 // Voters and task queues are often stored in pairs, and the voter is often
50 // destroyed after the queue is shut down.
51 if (!impl_) {
52 return;
53 }
54
55 bool was_enabled = AreAllQueueEnabledVotersEnabled();
56 if (voter_is_enabled) {
57 --enabled_voter_count_;
58 DCHECK_GE(enabled_voter_count_, 0);
59 }
60
61 --voter_count_;
62 DCHECK_GE(voter_count_, 0);
63
64 bool is_enabled = AreAllQueueEnabledVotersEnabled();
65 if (was_enabled != is_enabled)
66 impl_->SetQueueEnabled(is_enabled);
67 }
68
AreAllQueueEnabledVotersEnabled() const69 bool TaskQueue::AreAllQueueEnabledVotersEnabled() const {
70 return enabled_voter_count_ == voter_count_;
71 }
72
OnQueueEnabledVoteChanged(bool enabled)73 void TaskQueue::OnQueueEnabledVoteChanged(bool enabled) {
74 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
75 bool was_enabled = AreAllQueueEnabledVotersEnabled();
76 if (enabled) {
77 ++enabled_voter_count_;
78 DCHECK_LE(enabled_voter_count_, voter_count_);
79 } else {
80 --enabled_voter_count_;
81 DCHECK_GE(enabled_voter_count_, 0);
82 }
83
84 bool is_enabled = AreAllQueueEnabledVotersEnabled();
85 if (was_enabled != is_enabled)
86 impl_->SetQueueEnabled(is_enabled);
87 }
88
TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl,const TaskQueue::Spec & spec)89 TaskQueue::TaskQueue(std::unique_ptr<internal::TaskQueueImpl> impl,
90 const TaskQueue::Spec& spec)
91 : impl_(std::move(impl)),
92 sequence_manager_(impl_->GetSequenceManagerWeakPtr()),
93 associated_thread_((impl_->sequence_manager())
94 ? impl_->sequence_manager()->associated_thread()
95 : MakeRefCounted<internal::AssociatedThreadId>()),
96 default_task_runner_(impl_->CreateTaskRunner(kTaskTypeNone)),
97 name_(impl_->GetProtoName()) {}
98
~TaskQueue()99 TaskQueue::~TaskQueue() {
100 ShutdownTaskQueueGracefully();
101 }
102
ShutdownTaskQueueGracefully()103 void TaskQueue::ShutdownTaskQueueGracefully() {
104 // scoped_refptr guarantees us that this object isn't used.
105 if (!impl_)
106 return;
107 if (impl_->IsUnregistered())
108 return;
109
110 // If we've not been unregistered then this must occur on the main thread.
111 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
112 impl_->ResetThrottler();
113 impl_->sequence_manager()->ShutdownTaskQueueGracefully(TakeTaskQueueImpl());
114 }
115
TaskTiming(bool has_wall_time,bool has_thread_time)116 TaskQueue::TaskTiming::TaskTiming(bool has_wall_time, bool has_thread_time)
117 : has_wall_time_(has_wall_time), has_thread_time_(has_thread_time) {}
118
RecordTaskStart(LazyNow * now)119 void TaskQueue::TaskTiming::RecordTaskStart(LazyNow* now) {
120 DCHECK_EQ(State::NotStarted, state_);
121 state_ = State::Running;
122
123 if (has_wall_time())
124 start_time_ = now->Now();
125 if (has_thread_time())
126 start_thread_time_ = base::ThreadTicks::Now();
127 }
128
RecordTaskEnd(LazyNow * now)129 void TaskQueue::TaskTiming::RecordTaskEnd(LazyNow* now) {
130 DCHECK(state_ == State::Running || state_ == State::Finished);
131 if (state_ == State::Finished)
132 return;
133 state_ = State::Finished;
134
135 if (has_wall_time())
136 end_time_ = now->Now();
137 if (has_thread_time())
138 end_thread_time_ = base::ThreadTicks::Now();
139 }
140
ShutdownTaskQueue()141 void TaskQueue::ShutdownTaskQueue() {
142 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
143 // TODO(crbug.com/1413795): Fix that some task queues get shut down more than
144 // once.
145 if (!impl_) {
146 return;
147 }
148 if (!sequence_manager_) {
149 TakeTaskQueueImpl().reset();
150 return;
151 }
152 sequence_manager_->UnregisterTaskQueueImpl(TakeTaskQueueImpl());
153 }
154
CreateTaskRunner(TaskType task_type)155 scoped_refptr<SingleThreadTaskRunner> TaskQueue::CreateTaskRunner(
156 TaskType task_type) {
157 // We only need to lock if we're not on the main thread.
158 base::internal::CheckedAutoLockMaybe lock(IsOnMainThread() ? &impl_lock_
159 : nullptr);
160 DCHECK(impl_);
161 return impl_->CreateTaskRunner(task_type);
162 }
163
164 std::unique_ptr<TaskQueue::QueueEnabledVoter>
CreateQueueEnabledVoter()165 TaskQueue::CreateQueueEnabledVoter() {
166 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
167 DCHECK(impl_);
168 return WrapUnique(new QueueEnabledVoter(this));
169 }
170
IsQueueEnabled() const171 bool TaskQueue::IsQueueEnabled() const {
172 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
173 DCHECK(impl_);
174 return impl_->IsQueueEnabled();
175 }
176
IsEmpty() const177 bool TaskQueue::IsEmpty() const {
178 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
179 DCHECK(impl_);
180 return impl_->IsEmpty();
181 }
182
GetNumberOfPendingTasks() const183 size_t TaskQueue::GetNumberOfPendingTasks() const {
184 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
185 DCHECK(impl_);
186 return impl_->GetNumberOfPendingTasks();
187 }
188
HasTaskToRunImmediatelyOrReadyDelayedTask() const189 bool TaskQueue::HasTaskToRunImmediatelyOrReadyDelayedTask() const {
190 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
191 DCHECK(impl_);
192 return impl_->HasTaskToRunImmediatelyOrReadyDelayedTask();
193 }
194
GetNextDesiredWakeUp()195 absl::optional<WakeUp> TaskQueue::GetNextDesiredWakeUp() {
196 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
197 DCHECK(impl_);
198 return impl_->GetNextDesiredWakeUp();
199 }
200
UpdateWakeUp(LazyNow * lazy_now)201 void TaskQueue::UpdateWakeUp(LazyNow* lazy_now) {
202 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
203 DCHECK(impl_);
204 impl_->UpdateWakeUp(lazy_now);
205 }
206
SetQueuePriority(TaskQueue::QueuePriority priority)207 void TaskQueue::SetQueuePriority(TaskQueue::QueuePriority priority) {
208 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
209 DCHECK(impl_);
210 impl_->SetQueuePriority(priority);
211 }
212
GetQueuePriority() const213 TaskQueue::QueuePriority TaskQueue::GetQueuePriority() const {
214 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
215 // TODO(crbug.com/1413795): change this to DCHECK(impl_) since task queues
216 // should not be used after shutdown.
217 DCHECK(impl_);
218 return impl_->GetQueuePriority();
219 }
220
AddTaskObserver(TaskObserver * task_observer)221 void TaskQueue::AddTaskObserver(TaskObserver* task_observer) {
222 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
223 DCHECK(impl_);
224 impl_->AddTaskObserver(task_observer);
225 }
226
RemoveTaskObserver(TaskObserver * task_observer)227 void TaskQueue::RemoveTaskObserver(TaskObserver* task_observer) {
228 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
229 DCHECK(impl_);
230 impl_->RemoveTaskObserver(task_observer);
231 }
232
InsertFence(InsertFencePosition position)233 void TaskQueue::InsertFence(InsertFencePosition position) {
234 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
235 DCHECK(impl_);
236 impl_->InsertFence(position);
237 }
238
InsertFenceAt(TimeTicks time)239 void TaskQueue::InsertFenceAt(TimeTicks time) {
240 impl_->InsertFenceAt(time);
241 }
242
RemoveFence()243 void TaskQueue::RemoveFence() {
244 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
245 DCHECK(impl_);
246 impl_->RemoveFence();
247 }
248
HasActiveFence()249 bool TaskQueue::HasActiveFence() {
250 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
251 DCHECK(impl_);
252 return impl_->HasActiveFence();
253 }
254
BlockedByFence() const255 bool TaskQueue::BlockedByFence() const {
256 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
257 DCHECK(impl_);
258 return impl_->BlockedByFence();
259 }
260
GetName() const261 const char* TaskQueue::GetName() const {
262 return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_);
263 }
264
WriteIntoTrace(perfetto::TracedValue context) const265 void TaskQueue::WriteIntoTrace(perfetto::TracedValue context) const {
266 auto dict = std::move(context).WriteDictionary();
267 dict.Add("name", name_);
268 }
269
SetThrottler(Throttler * throttler)270 void TaskQueue::SetThrottler(Throttler* throttler) {
271 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
272 DCHECK(impl_);
273 // |throttler| is guaranteed to outlive TaskQueue and TaskQueueImpl lifecycle
274 // is controlled by |this|.
275 impl_->SetThrottler(throttler);
276 }
277
ResetThrottler()278 void TaskQueue::ResetThrottler() {
279 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
280 DCHECK(impl_);
281 impl_->ResetThrottler();
282 }
283
SetShouldReportPostedTasksWhenDisabled(bool should_report)284 void TaskQueue::SetShouldReportPostedTasksWhenDisabled(bool should_report) {
285 impl_->SetShouldReportPostedTasksWhenDisabled(should_report);
286 }
287
IsOnMainThread() const288 bool TaskQueue::IsOnMainThread() const {
289 return associated_thread_->IsBoundToCurrentThread();
290 }
291
TakeTaskQueueImpl()292 std::unique_ptr<internal::TaskQueueImpl> TaskQueue::TakeTaskQueueImpl() {
293 base::internal::CheckedAutoLock lock(impl_lock_);
294 DCHECK(impl_);
295 return std::move(impl_);
296 }
297
SetOnTaskStartedHandler(OnTaskStartedHandler handler)298 void TaskQueue::SetOnTaskStartedHandler(OnTaskStartedHandler handler) {
299 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
300 DCHECK(impl_);
301 impl_->SetOnTaskStartedHandler(std::move(handler));
302 }
303
SetOnTaskCompletedHandler(OnTaskCompletedHandler handler)304 void TaskQueue::SetOnTaskCompletedHandler(OnTaskCompletedHandler handler) {
305 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
306 DCHECK(impl_);
307 impl_->SetOnTaskCompletedHandler(std::move(handler));
308 }
309
310 std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler)311 TaskQueue::AddOnTaskPostedHandler(OnTaskPostedHandler handler) {
312 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
313 DCHECK(impl_);
314 return impl_->AddOnTaskPostedHandler(std::move(handler));
315 }
316
SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger)317 void TaskQueue::SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger) {
318 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
319 DCHECK(impl_);
320 impl_->SetTaskExecutionTraceLogger(std::move(logger));
321 }
322
323 } // namespace sequence_manager
324 } // namespace base
325