1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/task_queue_impl.h"
6
7 #include <memory>
8 #include <utility>
9
10 #include "base/strings/stringprintf.h"
11 #include "base/task/sequence_manager/sequence_manager_impl.h"
12 #include "base/task/sequence_manager/time_domain.h"
13 #include "base/task/sequence_manager/work_queue.h"
14 #include "base/time/time.h"
15 #include "base/trace_event/blame_context.h"
16
17 namespace base {
18 namespace sequence_manager {
19
20 // static
PriorityToString(TaskQueue::QueuePriority priority)21 const char* TaskQueue::PriorityToString(TaskQueue::QueuePriority priority) {
22 switch (priority) {
23 case kControlPriority:
24 return "control";
25 case kHighestPriority:
26 return "highest";
27 case kHighPriority:
28 return "high";
29 case kNormalPriority:
30 return "normal";
31 case kLowPriority:
32 return "low";
33 case kBestEffortPriority:
34 return "best_effort";
35 default:
36 NOTREACHED();
37 return nullptr;
38 }
39 }
40
41 namespace internal {
42
TaskQueueImpl(SequenceManagerImpl * sequence_manager,TimeDomain * time_domain,const TaskQueue::Spec & spec)43 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
44 TimeDomain* time_domain,
45 const TaskQueue::Spec& spec)
46 : name_(spec.name),
47 thread_id_(PlatformThread::CurrentId()),
48 any_thread_(sequence_manager, time_domain),
49 main_thread_only_(sequence_manager, this, time_domain),
50 should_monitor_quiescence_(spec.should_monitor_quiescence),
51 should_notify_observers_(spec.should_notify_observers) {
52 DCHECK(time_domain);
53 }
54
~TaskQueueImpl()55 TaskQueueImpl::~TaskQueueImpl() {
56 #if DCHECK_IS_ON()
57 AutoLock lock(any_thread_lock_);
58 // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
59 // contains a strong reference to this TaskQueueImpl and the
60 // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
61 // queues.
62 DCHECK(!any_thread().sequence_manager)
63 << "UnregisterTaskQueue must be called first!";
64 #endif
65 }
66
PostTaskResult()67 TaskQueueImpl::PostTaskResult::PostTaskResult()
68 : success(false), task(OnceClosure(), Location()) {}
69
PostTaskResult(bool success,TaskQueue::PostedTask task)70 TaskQueueImpl::PostTaskResult::PostTaskResult(bool success,
71 TaskQueue::PostedTask task)
72 : success(success), task(std::move(task)) {}
73
PostTaskResult(PostTaskResult && move_from)74 TaskQueueImpl::PostTaskResult::PostTaskResult(PostTaskResult&& move_from)
75 : success(move_from.success), task(std::move(move_from.task)) {}
76
77 TaskQueueImpl::PostTaskResult::~PostTaskResult() = default;
78
Success()79 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Success() {
80 return PostTaskResult(true, TaskQueue::PostedTask(OnceClosure(), Location()));
81 }
82
Fail(TaskQueue::PostedTask task)83 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Fail(
84 TaskQueue::PostedTask task) {
85 return PostTaskResult(false, std::move(task));
86 }
87
Task(TaskQueue::PostedTask task,TimeTicks desired_run_time,EnqueueOrder sequence_number)88 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task,
89 TimeTicks desired_run_time,
90 EnqueueOrder sequence_number)
91 : TaskQueue::Task(std::move(task), desired_run_time) {
92 // It might wrap around to a negative number but it's handled properly.
93 sequence_num = static_cast<int>(sequence_number);
94 }
95
Task(TaskQueue::PostedTask task,TimeTicks desired_run_time,EnqueueOrder sequence_number,EnqueueOrder enqueue_order)96 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task,
97 TimeTicks desired_run_time,
98 EnqueueOrder sequence_number,
99 EnqueueOrder enqueue_order)
100 : TaskQueue::Task(std::move(task), desired_run_time),
101 enqueue_order_(enqueue_order) {
102 // It might wrap around to a negative number but it's handled properly.
103 sequence_num = static_cast<int>(sequence_number);
104 }
105
AnyThread(SequenceManagerImpl * sequence_manager,TimeDomain * time_domain)106 TaskQueueImpl::AnyThread::AnyThread(SequenceManagerImpl* sequence_manager,
107 TimeDomain* time_domain)
108 : sequence_manager(sequence_manager), time_domain(time_domain) {}
109
110 TaskQueueImpl::AnyThread::~AnyThread() = default;
111
MainThreadOnly(SequenceManagerImpl * sequence_manager,TaskQueueImpl * task_queue,TimeDomain * time_domain)112 TaskQueueImpl::MainThreadOnly::MainThreadOnly(
113 SequenceManagerImpl* sequence_manager,
114 TaskQueueImpl* task_queue,
115 TimeDomain* time_domain)
116 : sequence_manager(sequence_manager),
117 time_domain(time_domain),
118 delayed_work_queue(
119 new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
120 immediate_work_queue(new WorkQueue(task_queue,
121 "immediate",
122 WorkQueue::QueueType::kImmediate)),
123 set_index(0),
124 is_enabled_refcount(0),
125 voter_refcount(0),
126 blame_context(nullptr),
127 is_enabled_for_test(true) {}
128
129 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
130
UnregisterTaskQueue()131 void TaskQueueImpl::UnregisterTaskQueue() {
132 TaskDeque immediate_incoming_queue;
133
134 {
135 AutoLock lock(any_thread_lock_);
136 AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
137
138 if (main_thread_only().time_domain)
139 main_thread_only().time_domain->UnregisterQueue(this);
140
141 if (!any_thread().sequence_manager)
142 return;
143
144 main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
145 any_thread().time_domain = nullptr;
146 main_thread_only().time_domain = nullptr;
147
148 any_thread().sequence_manager = nullptr;
149 main_thread_only().sequence_manager = nullptr;
150 any_thread().on_next_wake_up_changed_callback =
151 OnNextWakeUpChangedCallback();
152 main_thread_only().on_next_wake_up_changed_callback =
153 OnNextWakeUpChangedCallback();
154 immediate_incoming_queue.swap(immediate_incoming_queue_);
155 }
156
157 // It is possible for a task to hold a scoped_refptr to this, which
158 // will lead to TaskQueueImpl destructor being called when deleting a task.
159 // To avoid use-after-free, we need to clear all fields of a task queue
160 // before starting to delete the tasks.
161 // All work queues and priority queues containing tasks should be moved to
162 // local variables on stack (std::move for unique_ptrs and swap for queues)
163 // before clearing them and deleting tasks.
164
165 // Flush the queues outside of the lock because TSAN complains about a lock
166 // order inversion for tasks that are posted from within a lock, with a
167 // destructor that acquires the same lock.
168
169 std::priority_queue<Task> delayed_incoming_queue;
170 delayed_incoming_queue.swap(main_thread_only().delayed_incoming_queue);
171
172 std::unique_ptr<WorkQueue> immediate_work_queue =
173 std::move(main_thread_only().immediate_work_queue);
174 std::unique_ptr<WorkQueue> delayed_work_queue =
175 std::move(main_thread_only().delayed_work_queue);
176 }
177
GetName() const178 const char* TaskQueueImpl::GetName() const {
179 return name_;
180 }
181
RunsTasksInCurrentSequence() const182 bool TaskQueueImpl::RunsTasksInCurrentSequence() const {
183 return PlatformThread::CurrentId() == thread_id_;
184 }
185
PostDelayedTask(TaskQueue::PostedTask task)186 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTask(
187 TaskQueue::PostedTask task) {
188 if (task.delay.is_zero())
189 return PostImmediateTaskImpl(std::move(task));
190
191 return PostDelayedTaskImpl(std::move(task));
192 }
193
PostImmediateTaskImpl(TaskQueue::PostedTask task)194 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostImmediateTaskImpl(
195 TaskQueue::PostedTask task) {
196 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
197 // for details.
198 CHECK(task.callback);
199 AutoLock lock(any_thread_lock_);
200 if (!any_thread().sequence_manager)
201 return PostTaskResult::Fail(std::move(task));
202
203 EnqueueOrder sequence_number =
204 any_thread().sequence_manager->GetNextSequenceNumber();
205
206 PushOntoImmediateIncomingQueueLocked(Task(std::move(task),
207 any_thread().time_domain->Now(),
208 sequence_number, sequence_number));
209 return PostTaskResult::Success();
210 }
211
PostDelayedTaskImpl(TaskQueue::PostedTask task)212 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTaskImpl(
213 TaskQueue::PostedTask task) {
214 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
215 // for details.
216 CHECK(task.callback);
217 DCHECK_GT(task.delay, TimeDelta());
218 if (PlatformThread::CurrentId() == thread_id_) {
219 // Lock-free fast path for delayed tasks posted from the main thread.
220 if (!main_thread_only().sequence_manager)
221 return PostTaskResult::Fail(std::move(task));
222
223 EnqueueOrder sequence_number =
224 main_thread_only().sequence_manager->GetNextSequenceNumber();
225
226 TimeTicks time_domain_now = main_thread_only().time_domain->Now();
227 TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
228 PushOntoDelayedIncomingQueueFromMainThread(
229 Task(std::move(task), time_domain_delayed_run_time, sequence_number),
230 time_domain_now);
231 } else {
232 // NOTE posting a delayed task from a different thread is not expected to
233 // be common. This pathway is less optimal than perhaps it could be
234 // because it causes two main thread tasks to be run. Should this
235 // assumption prove to be false in future, we may need to revisit this.
236 AutoLock lock(any_thread_lock_);
237 if (!any_thread().sequence_manager)
238 return PostTaskResult::Fail(std::move(task));
239
240 EnqueueOrder sequence_number =
241 any_thread().sequence_manager->GetNextSequenceNumber();
242
243 TimeTicks time_domain_now = any_thread().time_domain->Now();
244 TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
245 PushOntoDelayedIncomingQueueLocked(
246 Task(std::move(task), time_domain_delayed_run_time, sequence_number));
247 }
248 return PostTaskResult::Success();
249 }
250
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,TimeTicks now)251 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
252 Task pending_task,
253 TimeTicks now) {
254 main_thread_only().sequence_manager->WillQueueTask(&pending_task);
255 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
256
257 LazyNow lazy_now(now);
258 UpdateDelayedWakeUp(&lazy_now);
259
260 TraceQueueSize();
261 }
262
PushOntoDelayedIncomingQueueLocked(Task pending_task)263 void TaskQueueImpl::PushOntoDelayedIncomingQueueLocked(Task pending_task) {
264 any_thread().sequence_manager->WillQueueTask(&pending_task);
265
266 EnqueueOrder thread_hop_task_sequence_number =
267 any_thread().sequence_manager->GetNextSequenceNumber();
268 // TODO(altimin): Add a copy method to Task to capture metadata here.
269 PushOntoImmediateIncomingQueueLocked(Task(
270 TaskQueue::PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
271 Unretained(this), std::move(pending_task)),
272 FROM_HERE, TimeDelta(), Nestable::kNonNestable,
273 pending_task.task_type()),
274 TimeTicks(), thread_hop_task_sequence_number,
275 thread_hop_task_sequence_number));
276 }
277
ScheduleDelayedWorkTask(Task pending_task)278 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
279 DCHECK(main_thread_checker_.CalledOnValidThread());
280 TimeTicks delayed_run_time = pending_task.delayed_run_time;
281 TimeTicks time_domain_now = main_thread_only().time_domain->Now();
282 if (delayed_run_time <= time_domain_now) {
283 // If |delayed_run_time| is in the past then push it onto the work queue
284 // immediately. To ensure the right task ordering we need to temporarily
285 // push it onto the |delayed_incoming_queue|.
286 delayed_run_time = time_domain_now;
287 pending_task.delayed_run_time = time_domain_now;
288 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
289 LazyNow lazy_now(time_domain_now);
290 WakeUpForDelayedWork(&lazy_now);
291 } else {
292 // If |delayed_run_time| is in the future we can queue it as normal.
293 PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
294 time_domain_now);
295 }
296 TraceQueueSize();
297 }
298
PushOntoImmediateIncomingQueueLocked(Task task)299 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task task) {
300 // If the |immediate_incoming_queue| is empty we need a DoWork posted to make
301 // it run.
302 bool was_immediate_incoming_queue_empty;
303
304 EnqueueOrder sequence_number = task.enqueue_order();
305 TimeTicks desired_run_time = task.delayed_run_time;
306
307 {
308 AutoLock lock(immediate_incoming_queue_lock_);
309 was_immediate_incoming_queue_empty = immediate_incoming_queue().empty();
310 any_thread().sequence_manager->WillQueueTask(&task);
311 immediate_incoming_queue().push_back(std::move(task));
312 }
313
314 if (was_immediate_incoming_queue_empty) {
315 // However there's no point posting a DoWork for a blocked queue. NB we can
316 // only tell if it's disabled from the main thread.
317 bool queue_is_blocked =
318 RunsTasksInCurrentSequence() &&
319 (!IsQueueEnabled() || main_thread_only().current_fence);
320 any_thread().sequence_manager->OnQueueHasIncomingImmediateWork(
321 this, sequence_number, queue_is_blocked);
322 if (!any_thread().on_next_wake_up_changed_callback.is_null())
323 any_thread().on_next_wake_up_changed_callback.Run(desired_run_time);
324 }
325
326 TraceQueueSize();
327 }
328
ReloadImmediateWorkQueueIfEmpty()329 void TaskQueueImpl::ReloadImmediateWorkQueueIfEmpty() {
330 if (!main_thread_only().immediate_work_queue->Empty())
331 return;
332
333 main_thread_only().immediate_work_queue->ReloadEmptyImmediateQueue();
334 }
335
ReloadEmptyImmediateQueue(TaskDeque * queue)336 void TaskQueueImpl::ReloadEmptyImmediateQueue(TaskDeque* queue) {
337 DCHECK(queue->empty());
338
339 AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
340 queue->swap(immediate_incoming_queue());
341
342 // Activate delayed fence if necessary. This is ideologically similar to
343 // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
344 // from any thread we can't generate an enqueue order for the fence there,
345 // so we have to check all immediate tasks and use their enqueue order for
346 // a fence.
347 if (main_thread_only().delayed_fence) {
348 for (const Task& task : *queue) {
349 if (task.delayed_run_time >= main_thread_only().delayed_fence.value()) {
350 main_thread_only().delayed_fence = nullopt;
351 DCHECK(!main_thread_only().current_fence);
352 main_thread_only().current_fence = task.enqueue_order();
353 // Do not trigger WorkQueueSets notification when taking incoming
354 // immediate queue.
355 main_thread_only().immediate_work_queue->InsertFenceSilently(
356 main_thread_only().current_fence);
357 main_thread_only().delayed_work_queue->InsertFenceSilently(
358 main_thread_only().current_fence);
359 break;
360 }
361 }
362 }
363 }
364
IsEmpty() const365 bool TaskQueueImpl::IsEmpty() const {
366 if (!main_thread_only().delayed_work_queue->Empty() ||
367 !main_thread_only().delayed_incoming_queue.empty() ||
368 !main_thread_only().immediate_work_queue->Empty()) {
369 return false;
370 }
371
372 AutoLock lock(immediate_incoming_queue_lock_);
373 return immediate_incoming_queue().empty();
374 }
375
GetNumberOfPendingTasks() const376 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
377 size_t task_count = 0;
378 task_count += main_thread_only().delayed_work_queue->Size();
379 task_count += main_thread_only().delayed_incoming_queue.size();
380 task_count += main_thread_only().immediate_work_queue->Size();
381
382 AutoLock lock(immediate_incoming_queue_lock_);
383 task_count += immediate_incoming_queue().size();
384 return task_count;
385 }
386
HasTaskToRunImmediately() const387 bool TaskQueueImpl::HasTaskToRunImmediately() const {
388 // Any work queue tasks count as immediate work.
389 if (!main_thread_only().delayed_work_queue->Empty() ||
390 !main_thread_only().immediate_work_queue->Empty()) {
391 return true;
392 }
393
394 // Tasks on |delayed_incoming_queue| that could run now, count as
395 // immediate work.
396 if (!main_thread_only().delayed_incoming_queue.empty() &&
397 main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
398 main_thread_only().time_domain->CreateLazyNow().Now()) {
399 return true;
400 }
401
402 // Finally tasks on |immediate_incoming_queue| count as immediate work.
403 AutoLock lock(immediate_incoming_queue_lock_);
404 return !immediate_incoming_queue().empty();
405 }
406
407 Optional<TaskQueueImpl::DelayedWakeUp>
GetNextScheduledWakeUpImpl()408 TaskQueueImpl::GetNextScheduledWakeUpImpl() {
409 // Note we don't scheduled a wake-up for disabled queues.
410 if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
411 return nullopt;
412
413 return main_thread_only().delayed_incoming_queue.top().delayed_wake_up();
414 }
415
GetNextScheduledWakeUp()416 Optional<TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() {
417 Optional<DelayedWakeUp> wake_up = GetNextScheduledWakeUpImpl();
418 if (!wake_up)
419 return nullopt;
420 return wake_up->time;
421 }
422
WakeUpForDelayedWork(LazyNow * lazy_now)423 void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
424 // Enqueue all delayed tasks that should be running now, skipping any that
425 // have been canceled.
426 while (!main_thread_only().delayed_incoming_queue.empty()) {
427 Task& task =
428 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top());
429 if (!task.task || task.task.IsCancelled()) {
430 main_thread_only().delayed_incoming_queue.pop();
431 continue;
432 }
433 if (task.delayed_run_time > lazy_now->Now())
434 break;
435 ActivateDelayedFenceIfNeeded(task.delayed_run_time);
436 task.set_enqueue_order(
437 main_thread_only().sequence_manager->GetNextSequenceNumber());
438 main_thread_only().delayed_work_queue->Push(std::move(task));
439 main_thread_only().delayed_incoming_queue.pop();
440
441 // Normally WakeUpForDelayedWork is called inside DoWork, but it also
442 // can be called elsewhere (e.g. tests and fast-path for posting
443 // delayed tasks). Ensure that there is a DoWork posting. No-op inside
444 // existing DoWork due to DoWork deduplication.
445 if (IsQueueEnabled() || !main_thread_only().current_fence) {
446 main_thread_only().sequence_manager->MaybeScheduleImmediateWork(
447 FROM_HERE);
448 }
449 }
450
451 UpdateDelayedWakeUp(lazy_now);
452 }
453
TraceQueueSize() const454 void TaskQueueImpl::TraceQueueSize() const {
455 bool is_tracing;
456 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
457 TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
458 if (!is_tracing)
459 return;
460
461 // It's only safe to access the work queues from the main thread.
462 // TODO(alexclarke): We should find another way of tracing this
463 if (PlatformThread::CurrentId() != thread_id_)
464 return;
465
466 AutoLock lock(immediate_incoming_queue_lock_);
467 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
468 immediate_incoming_queue().size() +
469 main_thread_only().immediate_work_queue->Size() +
470 main_thread_only().delayed_work_queue->Size() +
471 main_thread_only().delayed_incoming_queue.size());
472 }
473
SetQueuePriority(TaskQueue::QueuePriority priority)474 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
475 if (!main_thread_only().sequence_manager || priority == GetQueuePriority())
476 return;
477 main_thread_only()
478 .sequence_manager->main_thread_only()
479 .selector.SetQueuePriority(this, priority);
480 }
481
GetQueuePriority() const482 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
483 size_t set_index = immediate_work_queue()->work_queue_set_index();
484 DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
485 return static_cast<TaskQueue::QueuePriority>(set_index);
486 }
487
AsValueInto(TimeTicks now,trace_event::TracedValue * state) const488 void TaskQueueImpl::AsValueInto(TimeTicks now,
489 trace_event::TracedValue* state) const {
490 AutoLock lock(any_thread_lock_);
491 AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
492 state->BeginDictionary();
493 state->SetString("name", GetName());
494 if (!main_thread_only().sequence_manager) {
495 state->SetBoolean("unregistered", true);
496 state->EndDictionary();
497 return;
498 }
499 DCHECK(main_thread_only().time_domain);
500 DCHECK(main_thread_only().delayed_work_queue);
501 DCHECK(main_thread_only().immediate_work_queue);
502
503 state->SetString(
504 "task_queue_id",
505 StringPrintf("0x%" PRIx64,
506 static_cast<uint64_t>(reinterpret_cast<uintptr_t>(this))));
507 state->SetBoolean("enabled", IsQueueEnabled());
508 state->SetString("time_domain_name",
509 main_thread_only().time_domain->GetName());
510 state->SetInteger("immediate_incoming_queue_size",
511 immediate_incoming_queue().size());
512 state->SetInteger("delayed_incoming_queue_size",
513 main_thread_only().delayed_incoming_queue.size());
514 state->SetInteger("immediate_work_queue_size",
515 main_thread_only().immediate_work_queue->Size());
516 state->SetInteger("delayed_work_queue_size",
517 main_thread_only().delayed_work_queue->Size());
518
519 if (!main_thread_only().delayed_incoming_queue.empty()) {
520 TimeDelta delay_to_next_task =
521 (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
522 main_thread_only().time_domain->CreateLazyNow().Now());
523 state->SetDouble("delay_to_next_task_ms",
524 delay_to_next_task.InMillisecondsF());
525 }
526 if (main_thread_only().current_fence)
527 state->SetInteger("current_fence", main_thread_only().current_fence);
528 if (main_thread_only().delayed_fence) {
529 state->SetDouble(
530 "delayed_fence_seconds_from_now",
531 (main_thread_only().delayed_fence.value() - now).InSecondsF());
532 }
533
534 bool verbose = false;
535 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
536 TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
537 &verbose);
538
539 if (verbose) {
540 state->BeginArray("immediate_incoming_queue");
541 QueueAsValueInto(immediate_incoming_queue(), now, state);
542 state->EndArray();
543 state->BeginArray("delayed_work_queue");
544 main_thread_only().delayed_work_queue->AsValueInto(now, state);
545 state->EndArray();
546 state->BeginArray("immediate_work_queue");
547 main_thread_only().immediate_work_queue->AsValueInto(now, state);
548 state->EndArray();
549 state->BeginArray("delayed_incoming_queue");
550 QueueAsValueInto(main_thread_only().delayed_incoming_queue, now, state);
551 state->EndArray();
552 }
553 state->SetString("priority", TaskQueue::PriorityToString(GetQueuePriority()));
554 state->EndDictionary();
555 }
556
AddTaskObserver(MessageLoop::TaskObserver * task_observer)557 void TaskQueueImpl::AddTaskObserver(MessageLoop::TaskObserver* task_observer) {
558 main_thread_only().task_observers.AddObserver(task_observer);
559 }
560
RemoveTaskObserver(MessageLoop::TaskObserver * task_observer)561 void TaskQueueImpl::RemoveTaskObserver(
562 MessageLoop::TaskObserver* task_observer) {
563 main_thread_only().task_observers.RemoveObserver(task_observer);
564 }
565
NotifyWillProcessTask(const PendingTask & pending_task)566 void TaskQueueImpl::NotifyWillProcessTask(const PendingTask& pending_task) {
567 DCHECK(should_notify_observers_);
568 if (main_thread_only().blame_context)
569 main_thread_only().blame_context->Enter();
570 for (auto& observer : main_thread_only().task_observers)
571 observer.WillProcessTask(pending_task);
572 }
573
NotifyDidProcessTask(const PendingTask & pending_task)574 void TaskQueueImpl::NotifyDidProcessTask(const PendingTask& pending_task) {
575 DCHECK(should_notify_observers_);
576 for (auto& observer : main_thread_only().task_observers)
577 observer.DidProcessTask(pending_task);
578 if (main_thread_only().blame_context)
579 main_thread_only().blame_context->Leave();
580 }
581
SetTimeDomain(TimeDomain * time_domain)582 void TaskQueueImpl::SetTimeDomain(TimeDomain* time_domain) {
583 {
584 AutoLock lock(any_thread_lock_);
585 DCHECK(time_domain);
586 // NOTE this is similar to checking |any_thread().sequence_manager| but
587 // the TaskQueueSelectorTests constructs TaskQueueImpl directly with a null
588 // sequence_manager. Instead we check |any_thread().time_domain| which is
589 // another way of asserting that UnregisterTaskQueue has not been called.
590 DCHECK(any_thread().time_domain);
591 if (!any_thread().time_domain)
592 return;
593 DCHECK(main_thread_checker_.CalledOnValidThread());
594 if (time_domain == main_thread_only().time_domain)
595 return;
596
597 any_thread().time_domain = time_domain;
598 }
599
600 main_thread_only().time_domain->UnregisterQueue(this);
601 main_thread_only().time_domain = time_domain;
602
603 LazyNow lazy_now = time_domain->CreateLazyNow();
604 // Clear scheduled wake up to ensure that new notifications are issued
605 // correctly.
606 // TODO(altimin): Remove this when we won't have to support changing time
607 // domains.
608 main_thread_only().scheduled_wake_up = nullopt;
609 UpdateDelayedWakeUp(&lazy_now);
610 }
611
GetTimeDomain() const612 TimeDomain* TaskQueueImpl::GetTimeDomain() const {
613 if (PlatformThread::CurrentId() == thread_id_)
614 return main_thread_only().time_domain;
615
616 AutoLock lock(any_thread_lock_);
617 return any_thread().time_domain;
618 }
619
SetBlameContext(trace_event::BlameContext * blame_context)620 void TaskQueueImpl::SetBlameContext(trace_event::BlameContext* blame_context) {
621 main_thread_only().blame_context = blame_context;
622 }
623
InsertFence(TaskQueue::InsertFencePosition position)624 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
625 if (!main_thread_only().sequence_manager)
626 return;
627
628 // Only one fence may be present at a time.
629 main_thread_only().delayed_fence = nullopt;
630
631 EnqueueOrder previous_fence = main_thread_only().current_fence;
632 EnqueueOrder current_fence =
633 position == TaskQueue::InsertFencePosition::kNow
634 ? main_thread_only().sequence_manager->GetNextSequenceNumber()
635 : EnqueueOrder::blocking_fence();
636
637 // Tasks posted after this point will have a strictly higher enqueue order
638 // and will be blocked from running.
639 main_thread_only().current_fence = current_fence;
640 bool task_unblocked =
641 main_thread_only().immediate_work_queue->InsertFence(current_fence);
642 task_unblocked |=
643 main_thread_only().delayed_work_queue->InsertFence(current_fence);
644
645 if (!task_unblocked && previous_fence && previous_fence < current_fence) {
646 AutoLock lock(immediate_incoming_queue_lock_);
647 if (!immediate_incoming_queue().empty() &&
648 immediate_incoming_queue().front().enqueue_order() > previous_fence &&
649 immediate_incoming_queue().front().enqueue_order() < current_fence) {
650 task_unblocked = true;
651 }
652 }
653
654 if (IsQueueEnabled() && task_unblocked) {
655 main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
656 }
657 }
658
InsertFenceAt(TimeTicks time)659 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
660 // Task queue can have only one fence, delayed or not.
661 RemoveFence();
662 main_thread_only().delayed_fence = time;
663 }
664
RemoveFence()665 void TaskQueueImpl::RemoveFence() {
666 if (!main_thread_only().sequence_manager)
667 return;
668
669 EnqueueOrder previous_fence = main_thread_only().current_fence;
670 main_thread_only().current_fence = EnqueueOrder::none();
671 main_thread_only().delayed_fence = nullopt;
672
673 bool task_unblocked = main_thread_only().immediate_work_queue->RemoveFence();
674 task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
675
676 if (!task_unblocked && previous_fence) {
677 AutoLock lock(immediate_incoming_queue_lock_);
678 if (!immediate_incoming_queue().empty() &&
679 immediate_incoming_queue().front().enqueue_order() > previous_fence) {
680 task_unblocked = true;
681 }
682 }
683
684 if (IsQueueEnabled() && task_unblocked) {
685 main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
686 }
687 }
688
BlockedByFence() const689 bool TaskQueueImpl::BlockedByFence() const {
690 if (!main_thread_only().current_fence)
691 return false;
692
693 if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
694 !main_thread_only().delayed_work_queue->BlockedByFence()) {
695 return false;
696 }
697
698 AutoLock lock(immediate_incoming_queue_lock_);
699 if (immediate_incoming_queue().empty())
700 return true;
701
702 return immediate_incoming_queue().front().enqueue_order() >
703 main_thread_only().current_fence;
704 }
705
HasActiveFence()706 bool TaskQueueImpl::HasActiveFence() {
707 if (main_thread_only().delayed_fence &&
708 main_thread_only().time_domain->Now() >
709 main_thread_only().delayed_fence.value()) {
710 return true;
711 }
712 return !!main_thread_only().current_fence;
713 }
714
CouldTaskRun(EnqueueOrder enqueue_order) const715 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
716 if (!IsQueueEnabled())
717 return false;
718
719 if (!main_thread_only().current_fence)
720 return true;
721
722 return enqueue_order < main_thread_only().current_fence;
723 }
724
725 // static
QueueAsValueInto(const TaskDeque & queue,TimeTicks now,trace_event::TracedValue * state)726 void TaskQueueImpl::QueueAsValueInto(const TaskDeque& queue,
727 TimeTicks now,
728 trace_event::TracedValue* state) {
729 for (const Task& task : queue) {
730 TaskAsValueInto(task, now, state);
731 }
732 }
733
734 // static
QueueAsValueInto(const std::priority_queue<Task> & queue,TimeTicks now,trace_event::TracedValue * state)735 void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue,
736 TimeTicks now,
737 trace_event::TracedValue* state) {
738 // Remove const to search |queue| in the destructive manner. Restore the
739 // content from |visited| later.
740 std::priority_queue<Task>* mutable_queue =
741 const_cast<std::priority_queue<Task>*>(&queue);
742 std::priority_queue<Task> visited;
743 while (!mutable_queue->empty()) {
744 TaskAsValueInto(mutable_queue->top(), now, state);
745 visited.push(std::move(const_cast<Task&>(mutable_queue->top())));
746 mutable_queue->pop();
747 }
748 *mutable_queue = std::move(visited);
749 }
750
751 // static
TaskAsValueInto(const Task & task,TimeTicks now,trace_event::TracedValue * state)752 void TaskQueueImpl::TaskAsValueInto(const Task& task,
753 TimeTicks now,
754 trace_event::TracedValue* state) {
755 state->BeginDictionary();
756 state->SetString("posted_from", task.posted_from.ToString());
757 if (task.enqueue_order_set())
758 state->SetInteger("enqueue_order", task.enqueue_order());
759 state->SetInteger("sequence_num", task.sequence_num);
760 state->SetBoolean("nestable", task.nestable == Nestable::kNestable);
761 state->SetBoolean("is_high_res", task.is_high_res);
762 state->SetBoolean("is_cancelled", task.task.IsCancelled());
763 state->SetDouble("delayed_run_time",
764 (task.delayed_run_time - TimeTicks()).InMillisecondsF());
765 state->SetDouble("delayed_run_time_milliseconds_from_now",
766 (task.delayed_run_time - now).InMillisecondsF());
767 state->EndDictionary();
768 }
769
QueueEnabledVoterImpl(scoped_refptr<TaskQueue> task_queue)770 TaskQueueImpl::QueueEnabledVoterImpl::QueueEnabledVoterImpl(
771 scoped_refptr<TaskQueue> task_queue)
772 : task_queue_(task_queue), enabled_(true) {}
773
~QueueEnabledVoterImpl()774 TaskQueueImpl::QueueEnabledVoterImpl::~QueueEnabledVoterImpl() {
775 if (task_queue_->GetTaskQueueImpl())
776 task_queue_->GetTaskQueueImpl()->RemoveQueueEnabledVoter(this);
777 }
778
SetQueueEnabled(bool enabled)779 void TaskQueueImpl::QueueEnabledVoterImpl::SetQueueEnabled(bool enabled) {
780 if (enabled_ == enabled)
781 return;
782
783 task_queue_->GetTaskQueueImpl()->OnQueueEnabledVoteChanged(enabled);
784 enabled_ = enabled;
785 }
786
RemoveQueueEnabledVoter(const QueueEnabledVoterImpl * voter)787 void TaskQueueImpl::RemoveQueueEnabledVoter(
788 const QueueEnabledVoterImpl* voter) {
789 // Bail out if we're being called from TaskQueueImpl::UnregisterTaskQueue.
790 if (!main_thread_only().time_domain)
791 return;
792
793 bool was_enabled = IsQueueEnabled();
794 if (voter->enabled_) {
795 main_thread_only().is_enabled_refcount--;
796 DCHECK_GE(main_thread_only().is_enabled_refcount, 0);
797 }
798
799 main_thread_only().voter_refcount--;
800 DCHECK_GE(main_thread_only().voter_refcount, 0);
801
802 bool is_enabled = IsQueueEnabled();
803 if (was_enabled != is_enabled)
804 EnableOrDisableWithSelector(is_enabled);
805 }
806
IsQueueEnabled() const807 bool TaskQueueImpl::IsQueueEnabled() const {
808 // By default is_enabled_refcount and voter_refcount both equal zero.
809 return (main_thread_only().is_enabled_refcount ==
810 main_thread_only().voter_refcount) &&
811 main_thread_only().is_enabled_for_test;
812 }
813
OnQueueEnabledVoteChanged(bool enabled)814 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
815 bool was_enabled = IsQueueEnabled();
816 if (enabled) {
817 main_thread_only().is_enabled_refcount++;
818 DCHECK_LE(main_thread_only().is_enabled_refcount,
819 main_thread_only().voter_refcount);
820 } else {
821 main_thread_only().is_enabled_refcount--;
822 DCHECK_GE(main_thread_only().is_enabled_refcount, 0);
823 }
824
825 bool is_enabled = IsQueueEnabled();
826 if (was_enabled != is_enabled)
827 EnableOrDisableWithSelector(is_enabled);
828 }
829
EnableOrDisableWithSelector(bool enable)830 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
831 if (!main_thread_only().sequence_manager)
832 return;
833
834 LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
835 UpdateDelayedWakeUp(&lazy_now);
836
837 if (enable) {
838 if (HasPendingImmediateWork() &&
839 !main_thread_only().on_next_wake_up_changed_callback.is_null()) {
840 // Delayed work notification will be issued via time domain.
841 main_thread_only().on_next_wake_up_changed_callback.Run(TimeTicks());
842 }
843
844 // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
845 // a DoWork if needed.
846 main_thread_only()
847 .sequence_manager->main_thread_only()
848 .selector.EnableQueue(this);
849 } else {
850 main_thread_only()
851 .sequence_manager->main_thread_only()
852 .selector.DisableQueue(this);
853 }
854 }
855
856 std::unique_ptr<TaskQueue::QueueEnabledVoter>
CreateQueueEnabledVoter(scoped_refptr<TaskQueue> task_queue)857 TaskQueueImpl::CreateQueueEnabledVoter(scoped_refptr<TaskQueue> task_queue) {
858 DCHECK_EQ(task_queue->GetTaskQueueImpl(), this);
859 main_thread_only().voter_refcount++;
860 main_thread_only().is_enabled_refcount++;
861 return std::make_unique<QueueEnabledVoterImpl>(task_queue);
862 }
863
SweepCanceledDelayedTasks(TimeTicks now)864 void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) {
865 if (main_thread_only().delayed_incoming_queue.empty())
866 return;
867
868 // Remove canceled tasks.
869 std::priority_queue<Task> remaining_tasks;
870 while (!main_thread_only().delayed_incoming_queue.empty()) {
871 if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled()) {
872 remaining_tasks.push(std::move(
873 const_cast<Task&>(main_thread_only().delayed_incoming_queue.top())));
874 }
875 main_thread_only().delayed_incoming_queue.pop();
876 }
877
878 main_thread_only().delayed_incoming_queue = std::move(remaining_tasks);
879
880 LazyNow lazy_now(now);
881 UpdateDelayedWakeUp(&lazy_now);
882 }
883
PushImmediateIncomingTaskForTest(TaskQueueImpl::Task && task)884 void TaskQueueImpl::PushImmediateIncomingTaskForTest(
885 TaskQueueImpl::Task&& task) {
886 AutoLock lock(immediate_incoming_queue_lock_);
887 immediate_incoming_queue().push_back(std::move(task));
888 }
889
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)890 void TaskQueueImpl::RequeueDeferredNonNestableTask(
891 DeferredNonNestableTask task) {
892 DCHECK(task.task.nestable == Nestable::kNonNestable);
893 // The re-queued tasks have to be pushed onto the front because we'd otherwise
894 // violate the strict monotonically increasing enqueue order within the
895 // WorkQueue. We can't assign them a new enqueue order here because that will
896 // not behave correctly with fences and things will break (e.g Idle TQ).
897 if (task.work_queue_type == WorkQueueType::kDelayed) {
898 main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
899 std::move(task.task));
900 } else {
901 main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
902 std::move(task.task));
903 }
904 }
905
SetOnNextWakeUpChangedCallback(TaskQueueImpl::OnNextWakeUpChangedCallback callback)906 void TaskQueueImpl::SetOnNextWakeUpChangedCallback(
907 TaskQueueImpl::OnNextWakeUpChangedCallback callback) {
908 #if DCHECK_IS_ON()
909 if (callback) {
910 DCHECK(main_thread_only().on_next_wake_up_changed_callback.is_null())
911 << "Can't assign two different observers to "
912 "blink::scheduler::TaskQueue";
913 }
914 #endif
915 AutoLock lock(any_thread_lock_);
916 any_thread().on_next_wake_up_changed_callback = callback;
917 main_thread_only().on_next_wake_up_changed_callback = callback;
918 }
919
UpdateDelayedWakeUp(LazyNow * lazy_now)920 void TaskQueueImpl::UpdateDelayedWakeUp(LazyNow* lazy_now) {
921 return UpdateDelayedWakeUpImpl(lazy_now, GetNextScheduledWakeUpImpl());
922 }
923
UpdateDelayedWakeUpImpl(LazyNow * lazy_now,Optional<TaskQueueImpl::DelayedWakeUp> wake_up)924 void TaskQueueImpl::UpdateDelayedWakeUpImpl(
925 LazyNow* lazy_now,
926 Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
927 if (main_thread_only().scheduled_wake_up == wake_up)
928 return;
929 main_thread_only().scheduled_wake_up = wake_up;
930
931 if (wake_up &&
932 !main_thread_only().on_next_wake_up_changed_callback.is_null() &&
933 !HasPendingImmediateWork()) {
934 main_thread_only().on_next_wake_up_changed_callback.Run(wake_up->time);
935 }
936
937 main_thread_only().time_domain->SetNextWakeUpForQueue(this, wake_up,
938 lazy_now);
939 }
940
SetDelayedWakeUpForTesting(Optional<TaskQueueImpl::DelayedWakeUp> wake_up)941 void TaskQueueImpl::SetDelayedWakeUpForTesting(
942 Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
943 LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
944 UpdateDelayedWakeUpImpl(&lazy_now, wake_up);
945 }
946
HasPendingImmediateWork()947 bool TaskQueueImpl::HasPendingImmediateWork() {
948 // Any work queue tasks count as immediate work.
949 if (!main_thread_only().delayed_work_queue->Empty() ||
950 !main_thread_only().immediate_work_queue->Empty()) {
951 return true;
952 }
953
954 // Finally tasks on |immediate_incoming_queue| count as immediate work.
955 AutoLock lock(immediate_incoming_queue_lock_);
956 return !immediate_incoming_queue().empty();
957 }
958
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)959 void TaskQueueImpl::SetOnTaskStartedHandler(
960 TaskQueueImpl::OnTaskStartedHandler handler) {
961 main_thread_only().on_task_started_handler = std::move(handler);
962 }
963
OnTaskStarted(const TaskQueue::Task & task,const TaskQueue::TaskTiming & task_timing)964 void TaskQueueImpl::OnTaskStarted(const TaskQueue::Task& task,
965 const TaskQueue::TaskTiming& task_timing) {
966 if (!main_thread_only().on_task_started_handler.is_null())
967 main_thread_only().on_task_started_handler.Run(task, task_timing);
968 }
969
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)970 void TaskQueueImpl::SetOnTaskCompletedHandler(
971 TaskQueueImpl::OnTaskCompletedHandler handler) {
972 main_thread_only().on_task_completed_handler = std::move(handler);
973 }
974
OnTaskCompleted(const TaskQueue::Task & task,const TaskQueue::TaskTiming & task_timing)975 void TaskQueueImpl::OnTaskCompleted(const TaskQueue::Task& task,
976 const TaskQueue::TaskTiming& task_timing) {
977 if (!main_thread_only().on_task_completed_handler.is_null())
978 main_thread_only().on_task_completed_handler.Run(task, task_timing);
979 }
980
RequiresTaskTiming() const981 bool TaskQueueImpl::RequiresTaskTiming() const {
982 return !main_thread_only().on_task_started_handler.is_null() ||
983 !main_thread_only().on_task_completed_handler.is_null();
984 }
985
IsUnregistered() const986 bool TaskQueueImpl::IsUnregistered() const {
987 AutoLock lock(any_thread_lock_);
988 return !any_thread().sequence_manager;
989 }
990
GetSequenceManagerWeakPtr()991 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
992 return main_thread_only().sequence_manager->GetWeakPtr();
993 }
994
995 scoped_refptr<GracefulQueueShutdownHelper>
GetGracefulQueueShutdownHelper()996 TaskQueueImpl::GetGracefulQueueShutdownHelper() {
997 return main_thread_only().sequence_manager->GetGracefulQueueShutdownHelper();
998 }
999
SetQueueEnabledForTest(bool enabled)1000 void TaskQueueImpl::SetQueueEnabledForTest(bool enabled) {
1001 main_thread_only().is_enabled_for_test = enabled;
1002 EnableOrDisableWithSelector(IsQueueEnabled());
1003 }
1004
ActivateDelayedFenceIfNeeded(TimeTicks now)1005 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(TimeTicks now) {
1006 if (!main_thread_only().delayed_fence)
1007 return;
1008 if (main_thread_only().delayed_fence.value() > now)
1009 return;
1010 InsertFence(TaskQueue::InsertFencePosition::kNow);
1011 main_thread_only().delayed_fence = nullopt;
1012 }
1013
1014 } // namespace internal
1015 } // namespace sequence_manager
1016 } // namespace base
1017