1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/task_queue_impl.h"
6
7 #include <inttypes.h>
8
9 #include <memory>
10 #include <utility>
11
12 #include "base/check.h"
13 #include "base/compiler_specific.h"
14 #include "base/feature_list.h"
15 #include "base/logging.h"
16 #include "base/memory/scoped_refptr.h"
17 #include "base/metrics/histogram_macros.h"
18 #include "base/notreached.h"
19 #include "base/observer_list.h"
20 #include "base/ranges/algorithm.h"
21 #include "base/strings/stringprintf.h"
22 #include "base/task/common/scoped_defer_task_posting.h"
23 #include "base/task/default_delayed_task_handle_delegate.h"
24 #include "base/task/sequence_manager/associated_thread_id.h"
25 #include "base/task/sequence_manager/delayed_task_handle_delegate.h"
26 #include "base/task/sequence_manager/fence.h"
27 #include "base/task/sequence_manager/sequence_manager_impl.h"
28 #include "base/task/sequence_manager/task_order.h"
29 #include "base/task/sequence_manager/wake_up_queue.h"
30 #include "base/task/sequence_manager/work_queue.h"
31 #include "base/task/task_features.h"
32 #include "base/task/task_observer.h"
33 #include "base/threading/thread_restrictions.h"
34 #include "base/time/time.h"
35 #include "base/trace_event/base_tracing.h"
36 #include "base/types/pass_key.h"
37 #include "build/build_config.h"
38 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
39 #include "third_party/abseil-cpp/absl/types/optional.h"
40
41 namespace base {
42 namespace sequence_manager {
43
44 namespace internal {
45
46 namespace {
47
48 // An atomic is used here because the value is queried from other threads when
49 // tasks are posted cross-thread, which can race with its initialization.
50 std::atomic<base::TimeDelta> g_max_precise_delay{kDefaultMaxPreciseDelay};
51 #if BUILDFLAG(IS_WIN)
52 // An atomic is used here because the flag is queried from other threads when
53 // tasks are posted cross-thread, which can race with its initialization.
54 std::atomic_bool g_explicit_high_resolution_timer_win{true};
55 #endif // BUILDFLAG(IS_WIN)
56
57 } // namespace
58
GuardedTaskPoster(TaskQueueImpl * outer)59 TaskQueueImpl::GuardedTaskPoster::GuardedTaskPoster(TaskQueueImpl* outer)
60 : outer_(outer) {}
61
~GuardedTaskPoster()62 TaskQueueImpl::GuardedTaskPoster::~GuardedTaskPoster() {}
63
PostTask(PostedTask task)64 bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) {
65 // Do not process new PostTasks while we are handling a PostTask (tracing
66 // has to do this) as it can lead to a deadlock and defer it instead.
67 ScopedDeferTaskPosting disallow_task_posting;
68
69 auto token = operations_controller_.TryBeginOperation();
70 if (!token)
71 return false;
72
73 outer_->PostTask(std::move(task));
74 return true;
75 }
76
PostCancelableTask(PostedTask task)77 DelayedTaskHandle TaskQueueImpl::GuardedTaskPoster::PostCancelableTask(
78 PostedTask task) {
79 // Do not process new PostTasks while we are handling a PostTask (tracing
80 // has to do this) as it can lead to a deadlock and defer it instead.
81 ScopedDeferTaskPosting disallow_task_posting;
82
83 auto token = operations_controller_.TryBeginOperation();
84 if (!token)
85 return DelayedTaskHandle();
86
87 auto delayed_task_handle_delegate =
88 std::make_unique<DelayedTaskHandleDelegate>(outer_);
89 task.delayed_task_handle_delegate = delayed_task_handle_delegate->AsWeakPtr();
90
91 outer_->PostTask(std::move(task));
92 DCHECK(delayed_task_handle_delegate->IsValid());
93 return DelayedTaskHandle(std::move(delayed_task_handle_delegate));
94 }
95
TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,scoped_refptr<const AssociatedThreadId> associated_thread,TaskType task_type)96 TaskQueueImpl::TaskRunner::TaskRunner(
97 scoped_refptr<GuardedTaskPoster> task_poster,
98 scoped_refptr<const AssociatedThreadId> associated_thread,
99 TaskType task_type)
100 : task_poster_(std::move(task_poster)),
101 associated_thread_(std::move(associated_thread)),
102 task_type_(task_type) {}
103
~TaskRunner()104 TaskQueueImpl::TaskRunner::~TaskRunner() {}
105
PostDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)106 bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,
107 OnceClosure callback,
108 TimeDelta delay) {
109 return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
110 delay, Nestable::kNestable,
111 task_type_));
112 }
113
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)114 bool TaskQueueImpl::TaskRunner::PostDelayedTaskAt(
115 subtle::PostDelayedTaskPassKey,
116 const Location& location,
117 OnceClosure callback,
118 TimeTicks delayed_run_time,
119 base::subtle::DelayPolicy delay_policy) {
120 return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
121 delayed_run_time, delay_policy,
122 Nestable::kNestable, task_type_));
123 }
124
PostCancelableDelayedTaskAt(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)125 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTaskAt(
126 subtle::PostDelayedTaskPassKey pass_key,
127 const Location& location,
128 OnceClosure callback,
129 TimeTicks delayed_run_time,
130 base::subtle::DelayPolicy delay_policy) {
131 return task_poster_->PostCancelableTask(
132 PostedTask(this, std::move(callback), location, delayed_run_time,
133 delay_policy, Nestable::kNestable, task_type_));
134 }
135
PostCancelableDelayedTask(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeDelta delay)136 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTask(
137 subtle::PostDelayedTaskPassKey pass_key,
138 const Location& location,
139 OnceClosure callback,
140 TimeDelta delay) {
141 return task_poster_->PostCancelableTask(
142 PostedTask(this, std::move(callback), location, delay,
143 Nestable::kNestable, task_type_));
144 }
145
PostNonNestableDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)146 bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask(
147 const Location& location,
148 OnceClosure callback,
149 TimeDelta delay) {
150 return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
151 delay, Nestable::kNonNestable,
152 task_type_));
153 }
154
RunsTasksInCurrentSequence() const155 bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const {
156 return associated_thread_->IsBoundToCurrentThread();
157 }
158
159 // static
InitializeFeatures()160 void TaskQueueImpl::InitializeFeatures() {
161 g_max_precise_delay = kMaxPreciseDelay.Get();
162 #if BUILDFLAG(IS_WIN)
163 g_explicit_high_resolution_timer_win.store(
164 FeatureList::IsEnabled(kExplicitHighResolutionTimerWin),
165 std::memory_order_relaxed);
166 #endif // BUILDFLAG(IS_WIN)
167 }
168
TaskQueueImpl(SequenceManagerImpl * sequence_manager,WakeUpQueue * wake_up_queue,const TaskQueue::Spec & spec)169 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
170 WakeUpQueue* wake_up_queue,
171 const TaskQueue::Spec& spec)
172 : name_(spec.name),
173 sequence_manager_(sequence_manager),
174 associated_thread_(sequence_manager
175 ? sequence_manager->associated_thread()
176 : AssociatedThreadId::CreateBound()),
177 task_poster_(MakeRefCounted<GuardedTaskPoster>(this)),
178 main_thread_only_(this, wake_up_queue),
179 empty_queues_to_reload_handle_(
180 sequence_manager
181 ? sequence_manager->GetFlagToRequestReloadForEmptyQueue(this)
182 : AtomicFlagSet::AtomicFlag()),
183 should_monitor_quiescence_(spec.should_monitor_quiescence),
184 should_notify_observers_(spec.should_notify_observers),
185 delayed_fence_allowed_(spec.delayed_fence_allowed),
186 default_task_runner_(CreateTaskRunner(kTaskTypeNone)) {
187 UpdateCrossThreadQueueStateLocked();
188 // SequenceManager can't be set later, so we need to prevent task runners
189 // from posting any tasks.
190 if (sequence_manager_)
191 task_poster_->StartAcceptingOperations();
192 }
193
~TaskQueueImpl()194 TaskQueueImpl::~TaskQueueImpl() {
195 #if DCHECK_IS_ON()
196 base::internal::CheckedAutoLock lock(any_thread_lock_);
197 // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
198 // contains a strong reference to this TaskQueueImpl and the
199 // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
200 // queues.
201 DCHECK(any_thread_.unregistered)
202 << "UnregisterTaskQueue must be called first!";
203 #endif
204 }
205
206 TaskQueueImpl::AnyThread::AnyThread() = default;
207 TaskQueueImpl::AnyThread::~AnyThread() = default;
208
209 TaskQueueImpl::AnyThread::TracingOnly::TracingOnly() = default;
210 TaskQueueImpl::AnyThread::TracingOnly::~TracingOnly() = default;
211
MainThreadOnly(TaskQueueImpl * task_queue,WakeUpQueue * wake_up_queue)212 TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue,
213 WakeUpQueue* wake_up_queue)
214 : wake_up_queue(wake_up_queue),
215 delayed_work_queue(
216 new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
217 immediate_work_queue(new WorkQueue(task_queue,
218 "immediate",
219 WorkQueue::QueueType::kImmediate)) {}
220
221 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
222
CreateTaskRunner(TaskType task_type) const223 scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(
224 TaskType task_type) const {
225 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
226 return MakeRefCounted<TaskRunner>(task_poster_, associated_thread_,
227 task_type);
228 }
229
task_runner() const230 const scoped_refptr<SingleThreadTaskRunner>& TaskQueueImpl::task_runner()
231 const {
232 return default_task_runner_;
233 }
234
UnregisterTaskQueue()235 void TaskQueueImpl::UnregisterTaskQueue() {
236 TRACE_EVENT0("base", "TaskQueueImpl::UnregisterTaskQueue");
237 // Invalidate weak pointers now so no voters reference this in a partially
238 // torn down state.
239 voter_weak_ptr_factory_.InvalidateWeakPtrs();
240 // Detach task runners.
241 {
242 ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
243 task_poster_->ShutdownAndWaitForZeroOperations();
244 }
245
246 TaskDeque immediate_incoming_queue;
247 base::flat_map<raw_ptr<OnTaskPostedCallbackHandleImpl>, OnTaskPostedHandler>
248 on_task_posted_handlers;
249
250 {
251 base::internal::CheckedAutoLock lock(any_thread_lock_);
252 any_thread_.unregistered = true;
253 immediate_incoming_queue.swap(any_thread_.immediate_incoming_queue);
254
255 for (auto& handler : any_thread_.on_task_posted_handlers)
256 handler.first->UnregisterTaskQueue();
257 any_thread_.on_task_posted_handlers.swap(on_task_posted_handlers);
258 }
259
260 if (main_thread_only().wake_up_queue) {
261 main_thread_only().wake_up_queue->UnregisterQueue(this);
262 }
263
264 main_thread_only().on_task_started_handler = OnTaskStartedHandler();
265 main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
266 main_thread_only().wake_up_queue = nullptr;
267 main_thread_only().throttler = nullptr;
268 empty_queues_to_reload_handle_.ReleaseAtomicFlag();
269
270 // It is possible for a task to hold a scoped_refptr to this, which
271 // will lead to TaskQueueImpl destructor being called when deleting a task.
272 // To avoid use-after-free, we need to clear all fields of a task queue
273 // before starting to delete the tasks.
274 // All work queues and priority queues containing tasks should be moved to
275 // local variables on stack (std::move for unique_ptrs and swap for queues)
276 // before clearing them and deleting tasks.
277
278 // Flush the queues outside of the lock because TSAN complains about a lock
279 // order inversion for tasks that are posted from within a lock, with a
280 // destructor that acquires the same lock.
281
282 DelayedIncomingQueue delayed_incoming_queue;
283 delayed_incoming_queue.swap(&main_thread_only().delayed_incoming_queue);
284 std::unique_ptr<WorkQueue> immediate_work_queue =
285 std::move(main_thread_only().immediate_work_queue);
286 std::unique_ptr<WorkQueue> delayed_work_queue =
287 std::move(main_thread_only().delayed_work_queue);
288 }
289
GetName() const290 const char* TaskQueueImpl::GetName() const {
291 return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_);
292 }
293
GetProtoName() const294 QueueName TaskQueueImpl::GetProtoName() const {
295 return name_;
296 }
297
PostTask(PostedTask task)298 void TaskQueueImpl::PostTask(PostedTask task) {
299 CurrentThread current_thread =
300 associated_thread_->IsBoundToCurrentThread()
301 ? TaskQueueImpl::CurrentThread::kMainThread
302 : TaskQueueImpl::CurrentThread::kNotMainThread;
303
304 #if DCHECK_IS_ON()
305 TimeDelta delay = GetTaskDelayAdjustment(current_thread);
306 if (absl::holds_alternative<base::TimeTicks>(
307 task.delay_or_delayed_run_time)) {
308 absl::get<base::TimeTicks>(task.delay_or_delayed_run_time) += delay;
309 } else {
310 absl::get<base::TimeDelta>(task.delay_or_delayed_run_time) += delay;
311 }
312 #endif // DCHECK_IS_ON()
313
314 if (!task.is_delayed()) {
315 PostImmediateTaskImpl(std::move(task), current_thread);
316 } else {
317 PostDelayedTaskImpl(std::move(task), current_thread);
318 }
319 }
320
RemoveCancelableTask(HeapHandle heap_handle)321 void TaskQueueImpl::RemoveCancelableTask(HeapHandle heap_handle) {
322 // Can only cancel from the current thread.
323 DCHECK(associated_thread_->IsBoundToCurrentThread());
324 DCHECK(heap_handle.IsValid());
325
326 main_thread_only().delayed_incoming_queue.remove(heap_handle);
327
328 // Only update the delayed wake up if the top task is removed.
329 if (heap_handle.index() == 0u) {
330 LazyNow lazy_now(sequence_manager_->main_thread_clock());
331 UpdateWakeUp(&lazy_now);
332 }
333 }
334
GetTaskDelayAdjustment(CurrentThread current_thread)335 TimeDelta TaskQueueImpl::GetTaskDelayAdjustment(CurrentThread current_thread) {
336 #if DCHECK_IS_ON()
337 if (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread) {
338 base::internal::CheckedAutoLock lock(any_thread_lock_);
339 // Add a per-priority delay to cross thread tasks. This can help diagnose
340 // scheduler induced flakiness by making things flake most of the time.
341 return sequence_manager_->settings()
342 .priority_settings
343 .per_priority_cross_thread_task_delay()[any_thread_.queue_set_index];
344 } else {
345 return sequence_manager_->settings()
346 .priority_settings.per_priority_same_thread_task_delay()
347 [main_thread_only().immediate_work_queue->work_queue_set_index()];
348 }
349 #else
350 // No delay adjustment.
351 return TimeDelta();
352 #endif // DCHECK_IS_ON()
353 }
354
PostImmediateTaskImpl(PostedTask task,CurrentThread current_thread)355 void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,
356 CurrentThread current_thread) {
357 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
358 // for details.
359 CHECK(task.callback);
360
361 bool should_schedule_work = false;
362 {
363 // TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue
364 // See https://crbug.com/901800
365 base::internal::CheckedAutoLock lock(any_thread_lock_);
366 bool add_queue_time_to_tasks = sequence_manager_->GetAddQueueTimeToTasks();
367 TimeTicks queue_time;
368 if (add_queue_time_to_tasks || delayed_fence_allowed_)
369 queue_time = sequence_manager_->any_thread_clock()->NowTicks();
370
371 // The sequence number must be incremented atomically with pushing onto the
372 // incoming queue. Otherwise if there are several threads posting task we
373 // risk breaking the assumption that sequence numbers increase monotonically
374 // within a queue.
375 EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
376 bool was_immediate_incoming_queue_empty =
377 any_thread_.immediate_incoming_queue.empty();
378 any_thread_.immediate_incoming_queue.push_back(
379 Task(std::move(task), sequence_number, sequence_number, queue_time));
380
381 #if DCHECK_IS_ON()
382 any_thread_.immediate_incoming_queue.back().cross_thread_ =
383 (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread);
384 #endif
385
386 sequence_manager_->WillQueueTask(
387 &any_thread_.immediate_incoming_queue.back());
388 MaybeReportIpcTaskQueuedFromAnyThreadLocked(
389 any_thread_.immediate_incoming_queue.back());
390
391 for (auto& handler : any_thread_.on_task_posted_handlers) {
392 DCHECK(!handler.second.is_null());
393 handler.second.Run(any_thread_.immediate_incoming_queue.back());
394 }
395
396 // If this queue was completely empty, then the SequenceManager needs to be
397 // informed so it can reload the work queue and add us to the
398 // TaskQueueSelector which can only be done from the main thread. In
399 // addition it may need to schedule a DoWork if this queue isn't blocked.
400 if (was_immediate_incoming_queue_empty &&
401 any_thread_.immediate_work_queue_empty) {
402 empty_queues_to_reload_handle_.SetActive(true);
403 should_schedule_work =
404 any_thread_.post_immediate_task_should_schedule_work;
405 }
406 }
407
408 // On windows it's important to call this outside of a lock because calling a
409 // pump while holding a lock can result in priority inversions. See
410 // http://shortn/_ntnKNqjDQT for a discussion.
411 //
412 // Calling ScheduleWork outside the lock should be safe, only the main thread
413 // can mutate |any_thread_.post_immediate_task_should_schedule_work|. If it
414 // transitions to false we call ScheduleWork redundantly that's harmless. If
415 // it transitions to true, the side effect of
416 // |empty_queues_to_reload_handle_SetActive(true)| is guaranteed to be picked
417 // up by the ThreadController's call to SequenceManagerImpl::DelayTillNextTask
418 // when it computes what continuation (if any) is needed.
419 if (should_schedule_work)
420 sequence_manager_->ScheduleWork();
421
422 TraceQueueSize();
423 }
424
PostDelayedTaskImpl(PostedTask posted_task,CurrentThread current_thread)425 void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task,
426 CurrentThread current_thread) {
427 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
428 // for details.
429 CHECK(posted_task.callback);
430
431 if (current_thread == CurrentThread::kMainThread) {
432 LazyNow lazy_now(sequence_manager_->main_thread_clock());
433 Task pending_task = MakeDelayedTask(std::move(posted_task), &lazy_now);
434 sequence_manager_->MaybeAddLeewayToTask(pending_task);
435 PushOntoDelayedIncomingQueueFromMainThread(
436 std::move(pending_task), &lazy_now,
437 /* notify_task_annotator */ true);
438 } else {
439 LazyNow lazy_now(sequence_manager_->any_thread_clock());
440 PushOntoDelayedIncomingQueue(
441 MakeDelayedTask(std::move(posted_task), &lazy_now));
442 }
443 }
444
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,LazyNow * lazy_now,bool notify_task_annotator)445 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
446 Task pending_task,
447 LazyNow* lazy_now,
448 bool notify_task_annotator) {
449 #if DCHECK_IS_ON()
450 pending_task.cross_thread_ = false;
451 #endif
452
453 if (notify_task_annotator) {
454 sequence_manager_->WillQueueTask(&pending_task);
455 MaybeReportIpcTaskQueuedFromMainThread(pending_task);
456 }
457 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
458 UpdateWakeUp(lazy_now);
459
460 TraceQueueSize();
461 }
462
PushOntoDelayedIncomingQueue(Task pending_task)463 void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {
464 sequence_manager_->WillQueueTask(&pending_task);
465 MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(pending_task);
466
467 #if DCHECK_IS_ON()
468 pending_task.cross_thread_ = true;
469 #endif
470
471 // TODO(altimin): Add a copy method to Task to capture metadata here.
472 auto task_runner = pending_task.task_runner;
473 const auto task_type = pending_task.task_type;
474 PostImmediateTaskImpl(
475 PostedTask(std::move(task_runner),
476 BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
477 Unretained(this), std::move(pending_task)),
478 FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),
479 CurrentThread::kNotMainThread);
480 }
481
ScheduleDelayedWorkTask(Task pending_task)482 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
483 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
484 sequence_manager_->MaybeAddLeewayToTask(pending_task);
485 TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks();
486 LazyNow lazy_now(now);
487 // A delayed task is ready to run as soon as earliest_delayed_run_time() is
488 // reached.
489 if (pending_task.earliest_delayed_run_time() <= now) {
490 // If |delayed_run_time| is in the past then push it onto the work queue
491 // immediately. To ensure the right task ordering we need to temporarily
492 // push it onto the |delayed_incoming_queue|.
493 pending_task.delayed_run_time = now;
494 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
495 MoveReadyDelayedTasksToWorkQueue(
496 &lazy_now, sequence_manager_->GetNextSequenceNumber());
497 } else {
498 // If |delayed_run_time| is in the future we can queue it as normal.
499 PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
500 &lazy_now, false);
501 }
502 TraceQueueSize();
503 }
504
ReloadEmptyImmediateWorkQueue()505 void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
506 DCHECK(main_thread_only().immediate_work_queue->Empty());
507 main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();
508
509 if (main_thread_only().throttler && IsQueueEnabled()) {
510 main_thread_only().throttler->OnHasImmediateTask();
511 }
512 }
513
TakeImmediateIncomingQueueTasks(TaskDeque * queue)514 void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
515 DCHECK(queue->empty());
516 // Now is a good time to consider reducing the empty queue's capacity if we're
517 // wasting memory, before we make it the `immediate_incoming_queue`.
518 queue->MaybeShrinkQueue();
519
520 base::internal::CheckedAutoLock lock(any_thread_lock_);
521 queue->swap(any_thread_.immediate_incoming_queue);
522
523 // Activate delayed fence if necessary. This is ideologically similar to
524 // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
525 // from any thread we can't generate an enqueue order for the fence there,
526 // so we have to check all immediate tasks and use their enqueue order for
527 // a fence.
528 if (main_thread_only().delayed_fence) {
529 for (const Task& task : *queue) {
530 DCHECK(!task.queue_time.is_null());
531 DCHECK(task.delayed_run_time.is_null());
532 if (task.queue_time >= main_thread_only().delayed_fence.value()) {
533 main_thread_only().delayed_fence = absl::nullopt;
534 DCHECK(!main_thread_only().current_fence);
535 main_thread_only().current_fence = Fence(task.task_order());
536 // Do not trigger WorkQueueSets notification when taking incoming
537 // immediate queue.
538 main_thread_only().immediate_work_queue->InsertFenceSilently(
539 *main_thread_only().current_fence);
540 main_thread_only().delayed_work_queue->InsertFenceSilently(
541 *main_thread_only().current_fence);
542 break;
543 }
544 }
545 }
546
547 UpdateCrossThreadQueueStateLocked();
548 }
549
IsEmpty() const550 bool TaskQueueImpl::IsEmpty() const {
551 if (!main_thread_only().delayed_work_queue->Empty() ||
552 !main_thread_only().delayed_incoming_queue.empty() ||
553 !main_thread_only().immediate_work_queue->Empty()) {
554 return false;
555 }
556
557 base::internal::CheckedAutoLock lock(any_thread_lock_);
558 return any_thread_.immediate_incoming_queue.empty();
559 }
560
GetNumberOfPendingTasks() const561 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
562 size_t task_count = 0;
563 task_count += main_thread_only().delayed_work_queue->Size();
564 task_count += main_thread_only().delayed_incoming_queue.size();
565 task_count += main_thread_only().immediate_work_queue->Size();
566
567 base::internal::CheckedAutoLock lock(any_thread_lock_);
568 task_count += any_thread_.immediate_incoming_queue.size();
569 return task_count;
570 }
571
HasTaskToRunImmediatelyOrReadyDelayedTask() const572 bool TaskQueueImpl::HasTaskToRunImmediatelyOrReadyDelayedTask() const {
573 // Any work queue tasks count as immediate work.
574 if (!main_thread_only().delayed_work_queue->Empty() ||
575 !main_thread_only().immediate_work_queue->Empty()) {
576 return true;
577 }
578
579 // Tasks on |delayed_incoming_queue| that could run now, count as
580 // immediate work.
581 if (!main_thread_only().delayed_incoming_queue.empty() &&
582 main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
583 sequence_manager_->main_thread_clock()->NowTicks()) {
584 return true;
585 }
586
587 // Finally tasks on |immediate_incoming_queue| count as immediate work.
588 base::internal::CheckedAutoLock lock(any_thread_lock_);
589 return !any_thread_.immediate_incoming_queue.empty();
590 }
591
GetNextDesiredWakeUp()592 absl::optional<WakeUp> TaskQueueImpl::GetNextDesiredWakeUp() {
593 // Note we don't scheduled a wake-up for disabled queues.
594 if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
595 return absl::nullopt;
596
597 const auto& top_task = main_thread_only().delayed_incoming_queue.top();
598
599 // High resolution is needed if the queue contains high resolution tasks and
600 // has a priority index <= kNormalPriority (precise execution time is
601 // unnecessary for a low priority queue).
602 WakeUpResolution resolution = has_pending_high_resolution_tasks() &&
603 GetQueuePriority() <= DefaultPriority()
604 ? WakeUpResolution::kHigh
605 : WakeUpResolution::kLow;
606 subtle::DelayPolicy delay_policy = top_task.delay_policy;
607 if (GetQueuePriority() > DefaultPriority() &&
608 delay_policy == subtle::DelayPolicy::kPrecise) {
609 delay_policy = subtle::DelayPolicy::kFlexibleNoSooner;
610 }
611 return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution,
612 delay_policy};
613 }
614
OnWakeUp(LazyNow * lazy_now,EnqueueOrder enqueue_order)615 void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) {
616 MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order);
617 if (main_thread_only().throttler) {
618 main_thread_only().throttler->OnWakeUp(lazy_now);
619 }
620 }
621
RemoveAllCanceledDelayedTasksFromFront(LazyNow * lazy_now)622 bool TaskQueueImpl::RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) {
623 // Because task destructors could have a side-effect of posting new tasks, we
624 // move all the cancelled tasks into a temporary container before deleting
625 // them. This is to avoid the queue from changing while iterating over it.
626 absl::InlinedVector<Task, 8> tasks_to_delete;
627
628 while (!main_thread_only().delayed_incoming_queue.empty()) {
629 const Task& task = main_thread_only().delayed_incoming_queue.top();
630 CHECK(task.task);
631 if (!task.task.IsCancelled())
632 break;
633
634 tasks_to_delete.push_back(
635 main_thread_only().delayed_incoming_queue.take_top());
636 }
637
638 if (!tasks_to_delete.empty()) {
639 UpdateWakeUp(lazy_now);
640 return true;
641 }
642
643 return false;
644 }
645
MoveReadyDelayedTasksToWorkQueue(LazyNow * lazy_now,EnqueueOrder enqueue_order)646 void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue(
647 LazyNow* lazy_now,
648 EnqueueOrder enqueue_order) {
649 // Enqueue all delayed tasks that should be running now, skipping any that
650 // have been canceled.
651 WorkQueue::TaskPusher delayed_work_queue_task_pusher(
652 main_thread_only().delayed_work_queue->CreateTaskPusher());
653
654 // Because task destructors could have a side-effect of posting new tasks, we
655 // move all the cancelled tasks into a temporary container before deleting
656 // them. This is to avoid the queue from changing while iterating over it.
657 absl::InlinedVector<Task, 8> tasks_to_delete;
658
659 while (!main_thread_only().delayed_incoming_queue.empty()) {
660 const Task& task = main_thread_only().delayed_incoming_queue.top();
661 CHECK(task.task);
662
663 // Leave the top task alone if it hasn't been canceled and it is not ready.
664 const bool is_cancelled = task.task.IsCancelled();
665 if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now())
666 break;
667
668 Task ready_task = main_thread_only().delayed_incoming_queue.take_top();
669 if (is_cancelled) {
670 tasks_to_delete.push_back(std::move(ready_task));
671 continue;
672 }
673
674 // The top task is ready to run. Move it to the delayed work queue.
675 #if DCHECK_IS_ON()
676 if (sequence_manager_->settings().log_task_delay_expiry)
677 VLOG(0) << GetName() << " Delay expired for "
678 << ready_task.posted_from.ToString();
679 #endif // DCHECK_IS_ON()
680 DCHECK(!ready_task.delayed_run_time.is_null());
681 DCHECK(!ready_task.enqueue_order_set());
682 ready_task.set_enqueue_order(enqueue_order);
683 ActivateDelayedFenceIfNeeded(ready_task);
684
685 delayed_work_queue_task_pusher.Push(std::move(ready_task));
686 }
687
688 // Explicitly delete tasks last.
689 tasks_to_delete.clear();
690
691 UpdateWakeUp(lazy_now);
692 }
693
TraceQueueSize() const694 void TaskQueueImpl::TraceQueueSize() const {
695 bool is_tracing;
696 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
697 TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
698 if (!is_tracing)
699 return;
700
701 // It's only safe to access the work queues from the main thread.
702 // TODO(alexclarke): We should find another way of tracing this
703 if (!associated_thread_->IsBoundToCurrentThread())
704 return;
705
706 size_t total_task_count;
707 {
708 base::internal::CheckedAutoLock lock(any_thread_lock_);
709 total_task_count = any_thread_.immediate_incoming_queue.size() +
710 main_thread_only().immediate_work_queue->Size() +
711 main_thread_only().delayed_work_queue->Size() +
712 main_thread_only().delayed_incoming_queue.size();
713 }
714 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
715 total_task_count);
716 }
717
SetQueuePriority(TaskQueue::QueuePriority priority)718 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
719 const TaskQueue::QueuePriority previous_priority = GetQueuePriority();
720 if (priority == previous_priority)
721 return;
722 sequence_manager_->main_thread_only().selector.SetQueuePriority(this,
723 priority);
724
725 #if BUILDFLAG(IS_WIN)
726 // Updating queue priority can change whether high resolution timer is needed.
727 LazyNow lazy_now(sequence_manager_->main_thread_clock());
728 UpdateWakeUp(&lazy_now);
729 #endif
730
731 if (priority > DefaultPriority()) {
732 // |priority| is now lower than the default, so update accordingly.
733 main_thread_only()
734 .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
735 EnqueueOrder::max();
736 } else if (previous_priority > DefaultPriority()) {
737 // |priority| is no longer lower than the default, so record current
738 // sequence number.
739 DCHECK_EQ(
740 main_thread_only()
741 .enqueue_order_at_which_we_became_unblocked_with_normal_priority,
742 EnqueueOrder::max());
743 main_thread_only()
744 .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
745 sequence_manager_->GetNextSequenceNumber();
746 }
747 }
748
GetQueuePriority() const749 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
750 size_t set_index = immediate_work_queue()->work_queue_set_index();
751 DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
752 return static_cast<TaskQueue::QueuePriority>(set_index);
753 }
754
AsValue(TimeTicks now,bool force_verbose) const755 Value::Dict TaskQueueImpl::AsValue(TimeTicks now, bool force_verbose) const {
756 base::internal::CheckedAutoLock lock(any_thread_lock_);
757 Value::Dict state;
758 state.Set("name", GetName());
759 if (any_thread_.unregistered) {
760 state.Set("unregistered", true);
761 return state;
762 }
763 DCHECK(main_thread_only().delayed_work_queue);
764 DCHECK(main_thread_only().immediate_work_queue);
765
766 state.Set("task_queue_id",
767 StringPrintf("0x%" PRIx64, static_cast<uint64_t>(
768 reinterpret_cast<uintptr_t>(this))));
769 state.Set("enabled", IsQueueEnabled());
770 // TODO(crbug.com/1334256): Make base::Value able to store an int64_t and
771 // remove the various static_casts below.
772 state.Set("any_thread_.immediate_incoming_queuesize",
773 static_cast<int>(any_thread_.immediate_incoming_queue.size()));
774 state.Set("delayed_incoming_queue_size",
775 static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
776 state.Set("immediate_work_queue_size",
777 static_cast<int>(main_thread_only().immediate_work_queue->Size()));
778 state.Set("delayed_work_queue_size",
779 static_cast<int>(main_thread_only().delayed_work_queue->Size()));
780
781 state.Set("any_thread_.immediate_incoming_queuecapacity",
782 static_cast<int>(any_thread_.immediate_incoming_queue.capacity()));
783 state.Set("immediate_work_queue_capacity",
784 static_cast<int>(immediate_work_queue()->Capacity()));
785 state.Set("delayed_work_queue_capacity",
786 static_cast<int>(delayed_work_queue()->Capacity()));
787
788 if (!main_thread_only().delayed_incoming_queue.empty()) {
789 TimeDelta delay_to_next_task =
790 (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
791 sequence_manager_->main_thread_clock()->NowTicks());
792 state.Set("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF());
793 }
794 if (main_thread_only().current_fence) {
795 Value::Dict fence_state;
796 fence_state.Set(
797 "enqueue_order",
798 static_cast<int>(
799 main_thread_only().current_fence->task_order().enqueue_order()));
800 fence_state.Set("activated_in_wake_up", !main_thread_only()
801 .current_fence->task_order()
802 .delayed_run_time()
803 .is_null());
804 state.Set("current_fence", std::move(fence_state));
805 }
806 if (main_thread_only().delayed_fence) {
807 state.Set("delayed_fence_seconds_from_now",
808 (main_thread_only().delayed_fence.value() - now).InSecondsF());
809 }
810
811 bool verbose = false;
812 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
813 TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
814 &verbose);
815
816 if (verbose || force_verbose) {
817 state.Set("immediate_incoming_queue",
818 QueueAsValue(any_thread_.immediate_incoming_queue, now));
819 state.Set("delayed_work_queue",
820 main_thread_only().delayed_work_queue->AsValue(now));
821 state.Set("immediate_work_queue",
822 main_thread_only().immediate_work_queue->AsValue(now));
823 state.Set("delayed_incoming_queue",
824 main_thread_only().delayed_incoming_queue.AsValue(now));
825 }
826 state.Set("priority", GetQueuePriority());
827 return state;
828 }
829
AddTaskObserver(TaskObserver * task_observer)830 void TaskQueueImpl::AddTaskObserver(TaskObserver* task_observer) {
831 main_thread_only().task_observers.AddObserver(task_observer);
832 }
833
RemoveTaskObserver(TaskObserver * task_observer)834 void TaskQueueImpl::RemoveTaskObserver(TaskObserver* task_observer) {
835 main_thread_only().task_observers.RemoveObserver(task_observer);
836 }
837
NotifyWillProcessTask(const Task & task,bool was_blocked_or_low_priority)838 void TaskQueueImpl::NotifyWillProcessTask(const Task& task,
839 bool was_blocked_or_low_priority) {
840 DCHECK(should_notify_observers_);
841
842 for (auto& observer : main_thread_only().task_observers)
843 observer.WillProcessTask(task, was_blocked_or_low_priority);
844 }
845
NotifyDidProcessTask(const Task & task)846 void TaskQueueImpl::NotifyDidProcessTask(const Task& task) {
847 DCHECK(should_notify_observers_);
848 for (auto& observer : main_thread_only().task_observers)
849 observer.DidProcessTask(task);
850 }
851
InsertFence(TaskQueue::InsertFencePosition position)852 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
853 Fence new_fence = position == TaskQueue::InsertFencePosition::kNow
854 ? Fence::CreateWithEnqueueOrder(
855 sequence_manager_->GetNextSequenceNumber())
856 : Fence::BlockingFence();
857 InsertFence(new_fence);
858 }
859
InsertFence(Fence current_fence)860 void TaskQueueImpl::InsertFence(Fence current_fence) {
861 // Only one fence may be present at a time.
862 main_thread_only().delayed_fence = absl::nullopt;
863
864 absl::optional<Fence> previous_fence = main_thread_only().current_fence;
865
866 // Tasks posted after this point will have a strictly higher enqueue order
867 // and will be blocked from running.
868 main_thread_only().current_fence = current_fence;
869 bool front_task_unblocked =
870 main_thread_only().immediate_work_queue->InsertFence(current_fence);
871 front_task_unblocked |=
872 main_thread_only().delayed_work_queue->InsertFence(current_fence);
873
874 {
875 base::internal::CheckedAutoLock lock(any_thread_lock_);
876 if (!front_task_unblocked && previous_fence &&
877 previous_fence->task_order() < current_fence.task_order()) {
878 if (!any_thread_.immediate_incoming_queue.empty() &&
879 any_thread_.immediate_incoming_queue.front().task_order() >
880 previous_fence->task_order() &&
881 any_thread_.immediate_incoming_queue.front().task_order() <
882 current_fence.task_order()) {
883 front_task_unblocked = true;
884 }
885 }
886
887 UpdateCrossThreadQueueStateLocked();
888 }
889
890 if (IsQueueEnabled() && front_task_unblocked) {
891 OnQueueUnblocked();
892 sequence_manager_->ScheduleWork();
893 }
894 }
895
InsertFenceAt(TimeTicks time)896 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
897 DCHECK(delayed_fence_allowed_)
898 << "Delayed fences are not supported for this queue. Enable them "
899 "explicitly in TaskQueue::Spec when creating the queue";
900
901 // Task queue can have only one fence, delayed or not.
902 RemoveFence();
903 main_thread_only().delayed_fence = time;
904 }
905
RemoveFence()906 void TaskQueueImpl::RemoveFence() {
907 absl::optional<Fence> previous_fence = main_thread_only().current_fence;
908 main_thread_only().current_fence = absl::nullopt;
909 main_thread_only().delayed_fence = absl::nullopt;
910
911 bool front_task_unblocked =
912 main_thread_only().immediate_work_queue->RemoveFence();
913 front_task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
914
915 {
916 base::internal::CheckedAutoLock lock(any_thread_lock_);
917 if (!front_task_unblocked && previous_fence) {
918 if (!any_thread_.immediate_incoming_queue.empty() &&
919 any_thread_.immediate_incoming_queue.front().task_order() >
920 previous_fence->task_order()) {
921 front_task_unblocked = true;
922 }
923 }
924
925 UpdateCrossThreadQueueStateLocked();
926 }
927
928 if (IsQueueEnabled() && front_task_unblocked) {
929 OnQueueUnblocked();
930 sequence_manager_->ScheduleWork();
931 }
932 }
933
BlockedByFence() const934 bool TaskQueueImpl::BlockedByFence() const {
935 if (!main_thread_only().current_fence)
936 return false;
937
938 if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
939 !main_thread_only().delayed_work_queue->BlockedByFence()) {
940 return false;
941 }
942
943 base::internal::CheckedAutoLock lock(any_thread_lock_);
944 if (any_thread_.immediate_incoming_queue.empty())
945 return true;
946
947 return any_thread_.immediate_incoming_queue.front().task_order() >
948 main_thread_only().current_fence->task_order();
949 }
950
HasActiveFence()951 bool TaskQueueImpl::HasActiveFence() {
952 if (main_thread_only().delayed_fence &&
953 sequence_manager_->main_thread_clock()->NowTicks() >
954 main_thread_only().delayed_fence.value()) {
955 return true;
956 }
957 return !!main_thread_only().current_fence;
958 }
959
CouldTaskRun(EnqueueOrder enqueue_order) const960 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
961 if (!IsQueueEnabled())
962 return false;
963
964 if (!main_thread_only().current_fence)
965 return true;
966
967 // TODO(crbug.com/1249857): This should use TaskOrder. This is currently only
968 // used for tests and is fine as-is, but we should be using `TaskOrder` for
969 // task comparisons. Also this test should be renamed with a testing suffix as
970 // it is not used in production.
971 return enqueue_order <
972 main_thread_only().current_fence->task_order().enqueue_order();
973 }
974
WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const975 bool TaskQueueImpl::WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const {
976 return enqueue_order <
977 main_thread_only()
978 .enqueue_order_at_which_we_became_unblocked_with_normal_priority;
979 }
980
981 // static
QueueAsValue(const TaskDeque & queue,TimeTicks now)982 Value::List TaskQueueImpl::QueueAsValue(const TaskDeque& queue, TimeTicks now) {
983 Value::List state;
984 for (const Task& task : queue)
985 state.Append(TaskAsValue(task, now));
986 return state;
987 }
988
989 // static
TaskAsValue(const Task & task,TimeTicks now)990 Value::Dict TaskQueueImpl::TaskAsValue(const Task& task, TimeTicks now) {
991 Value::Dict state;
992 state.Set("posted_from", task.posted_from.ToString());
993 if (task.enqueue_order_set())
994 state.Set("enqueue_order", static_cast<int>(task.enqueue_order()));
995 state.Set("sequence_num", task.sequence_num);
996 state.Set("nestable", task.nestable == Nestable::kNestable);
997 state.Set("is_high_res", task.is_high_res);
998 state.Set("is_cancelled", task.task.IsCancelled());
999 state.Set("delayed_run_time",
1000 (task.delayed_run_time - TimeTicks()).InMillisecondsF());
1001 const TimeDelta delayed_run_time_milliseconds_from_now =
1002 task.delayed_run_time.is_null() ? TimeDelta()
1003 : (task.delayed_run_time - now);
1004 state.Set("delayed_run_time_milliseconds_from_now",
1005 delayed_run_time_milliseconds_from_now.InMillisecondsF());
1006 return state;
1007 }
1008
MakeDelayedTask(PostedTask delayed_task,LazyNow * lazy_now) const1009 Task TaskQueueImpl::MakeDelayedTask(PostedTask delayed_task,
1010 LazyNow* lazy_now) const {
1011 EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
1012 base::TimeDelta delay;
1013 WakeUpResolution resolution = WakeUpResolution::kLow;
1014 #if BUILDFLAG(IS_WIN)
1015 const bool explicit_high_resolution_timer_win =
1016 g_explicit_high_resolution_timer_win.load(std::memory_order_relaxed);
1017 #endif // BUILDFLAG(IS_WIN)
1018 if (absl::holds_alternative<base::TimeDelta>(
1019 delayed_task.delay_or_delayed_run_time)) {
1020 delay = absl::get<base::TimeDelta>(delayed_task.delay_or_delayed_run_time);
1021 delayed_task.delay_or_delayed_run_time = lazy_now->Now() + delay;
1022 } else {
1023 delay = absl::get<base::TimeTicks>(delayed_task.delay_or_delayed_run_time) -
1024 lazy_now->Now();
1025 }
1026 #if BUILDFLAG(IS_WIN)
1027 if (!explicit_high_resolution_timer_win &&
1028 delay < (2 * base::Milliseconds(Time::kMinLowResolutionThresholdMs))) {
1029 // Outside the kExplicitHighResolutionTimerWin experiment, We consider the
1030 // task needs a high resolution timer if the delay is more than 0 and less
1031 // than 32ms. This caps the relative error to less than 50% : a 33ms wait
1032 // can wake at 48ms since the default resolution on Windows is between 10
1033 // and 15ms.
1034 resolution = WakeUpResolution::kHigh;
1035 }
1036 #endif // BUILDFLAG(IS_WIN)
1037 delayed_task.delay_policy = subtle::MaybeOverrideDelayPolicy(
1038 delayed_task.delay_policy, delay,
1039 g_max_precise_delay.load(std::memory_order_relaxed));
1040 // leeway isn't specified yet since this may be called from any thread.
1041 return Task(std::move(delayed_task), sequence_number, EnqueueOrder(),
1042 lazy_now->Now(), resolution);
1043 }
1044
IsQueueEnabled() const1045 bool TaskQueueImpl::IsQueueEnabled() const {
1046 return main_thread_only().is_enabled;
1047 }
1048
SetQueueEnabled(bool enabled)1049 void TaskQueueImpl::SetQueueEnabled(bool enabled) {
1050 if (main_thread_only().is_enabled == enabled)
1051 return;
1052
1053 // Update the |main_thread_only_| struct.
1054 main_thread_only().is_enabled = enabled;
1055 main_thread_only().disabled_time = absl::nullopt;
1056
1057 // |sequence_manager_| can be null in tests.
1058 if (!sequence_manager_)
1059 return;
1060
1061 LazyNow lazy_now(sequence_manager_->main_thread_clock());
1062
1063 if (!enabled) {
1064 bool tracing_enabled = false;
1065 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1066 &tracing_enabled);
1067 main_thread_only().disabled_time = lazy_now.Now();
1068 } else {
1069 // Override reporting if the queue is becoming enabled again.
1070 main_thread_only().should_report_posted_tasks_when_disabled = false;
1071 }
1072
1073 // If there is a throttler, it will be notified of pending delayed and
1074 // immediate tasks inside UpdateWakeUp().
1075 UpdateWakeUp(&lazy_now);
1076
1077 {
1078 base::internal::CheckedAutoLock lock(any_thread_lock_);
1079 UpdateCrossThreadQueueStateLocked();
1080
1081 // Copy over the task-reporting related state.
1082 any_thread_.tracing_only.is_enabled = enabled;
1083 any_thread_.tracing_only.disabled_time = main_thread_only().disabled_time;
1084 any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1085 main_thread_only().should_report_posted_tasks_when_disabled;
1086 }
1087
1088 // Finally, enable or disable the queue with the selector.
1089 if (enabled) {
1090 // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
1091 // a DoWork if needed.
1092 sequence_manager_->main_thread_only().selector.EnableQueue(this);
1093
1094 if (!BlockedByFence())
1095 OnQueueUnblocked();
1096 } else {
1097 sequence_manager_->main_thread_only().selector.DisableQueue(this);
1098 }
1099 }
1100
SetShouldReportPostedTasksWhenDisabled(bool should_report)1101 void TaskQueueImpl::SetShouldReportPostedTasksWhenDisabled(bool should_report) {
1102 if (main_thread_only().should_report_posted_tasks_when_disabled ==
1103 should_report)
1104 return;
1105
1106 // Only observe transitions turning the reporting on if tracing is enabled.
1107 if (should_report) {
1108 bool tracing_enabled = false;
1109 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1110 &tracing_enabled);
1111 if (!tracing_enabled)
1112 return;
1113 }
1114
1115 main_thread_only().should_report_posted_tasks_when_disabled = should_report;
1116
1117 // Mirror the state to the AnyThread struct as well.
1118 {
1119 base::internal::CheckedAutoLock lock(any_thread_lock_);
1120 any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1121 should_report;
1122 }
1123 }
1124
UpdateCrossThreadQueueStateLocked()1125 void TaskQueueImpl::UpdateCrossThreadQueueStateLocked() {
1126 any_thread_.immediate_work_queue_empty =
1127 main_thread_only().immediate_work_queue->Empty();
1128
1129 if (main_thread_only().throttler) {
1130 // If there's a Throttler, always ScheduleWork() when immediate work is
1131 // posted and the queue is enabled, to ensure that
1132 // Throttler::OnHasImmediateTask() is invoked.
1133 any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled();
1134 } else {
1135 // Otherwise, ScheduleWork() only if the queue is enabled and there isn't a
1136 // fence to prevent the task from being executed.
1137 any_thread_.post_immediate_task_should_schedule_work =
1138 IsQueueEnabled() && !main_thread_only().current_fence;
1139 }
1140
1141 #if DCHECK_IS_ON()
1142 any_thread_.queue_set_index =
1143 main_thread_only().immediate_work_queue->work_queue_set_index();
1144 #endif
1145 }
1146
ReclaimMemory(TimeTicks now)1147 void TaskQueueImpl::ReclaimMemory(TimeTicks now) {
1148 if (main_thread_only().delayed_incoming_queue.empty())
1149 return;
1150
1151 main_thread_only().delayed_incoming_queue.SweepCancelledTasks(
1152 sequence_manager_);
1153
1154 // If deleting one of the cancelled tasks shut down this queue, bail out.
1155 // Note that in this scenario |this| is still valid, but some fields of the
1156 // queue have been cleared out by |UnregisterTaskQueue|.
1157 if (!main_thread_only().delayed_work_queue) {
1158 return;
1159 }
1160
1161 LazyNow lazy_now(now);
1162 UpdateWakeUp(&lazy_now);
1163
1164 // Also consider shrinking the work queue if it's wasting memory.
1165 main_thread_only().delayed_work_queue->MaybeShrinkQueue();
1166 main_thread_only().immediate_work_queue->MaybeShrinkQueue();
1167
1168 {
1169 base::internal::CheckedAutoLock lock(any_thread_lock_);
1170 any_thread_.immediate_incoming_queue.MaybeShrinkQueue();
1171 }
1172 }
1173
PushImmediateIncomingTaskForTest(Task task)1174 void TaskQueueImpl::PushImmediateIncomingTaskForTest(Task task) {
1175 base::internal::CheckedAutoLock lock(any_thread_lock_);
1176 any_thread_.immediate_incoming_queue.push_back(std::move(task));
1177 }
1178
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)1179 void TaskQueueImpl::RequeueDeferredNonNestableTask(
1180 DeferredNonNestableTask task) {
1181 DCHECK(task.task.nestable == Nestable::kNonNestable);
1182
1183 // It's possible that the queue was unregistered since the task was posted.
1184 // Skip the task in that case.
1185 if (!main_thread_only().delayed_work_queue)
1186 return;
1187
1188 // The re-queued tasks have to be pushed onto the front because we'd otherwise
1189 // violate the strict monotonically increasing enqueue order within the
1190 // WorkQueue. We can't assign them a new enqueue order here because that will
1191 // not behave correctly with fences and things will break (e.g Idle TQ).
1192 if (task.work_queue_type == WorkQueueType::kDelayed) {
1193 main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
1194 std::move(task.task));
1195 } else {
1196 // We're about to push |task| onto an empty |immediate_work_queue|
1197 // (bypassing |immediate_incoming_queue_|). As such, we no longer need to
1198 // reload if we were planning to. The flag must be cleared while holding
1199 // the lock to avoid a cross-thread post task setting it again before
1200 // we actually make |immediate_work_queue| non-empty.
1201 if (main_thread_only().immediate_work_queue->Empty()) {
1202 base::internal::CheckedAutoLock lock(any_thread_lock_);
1203 empty_queues_to_reload_handle_.SetActive(false);
1204
1205 any_thread_.immediate_work_queue_empty = false;
1206 main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1207 std::move(task.task));
1208
1209 } else {
1210 main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1211 std::move(task.task));
1212 }
1213 }
1214 }
1215
SetThrottler(TaskQueue::Throttler * throttler)1216 void TaskQueueImpl::SetThrottler(TaskQueue::Throttler* throttler) {
1217 DCHECK(throttler);
1218 DCHECK(!main_thread_only().throttler)
1219 << "Can't assign two different throttlers to "
1220 "base::sequence_manager:TaskQueue";
1221 // `throttler` is guaranteed to outlive this object.
1222 main_thread_only().throttler = throttler;
1223 }
1224
ResetThrottler()1225 void TaskQueueImpl::ResetThrottler() {
1226 main_thread_only().throttler = nullptr;
1227 LazyNow lazy_now(sequence_manager_->main_thread_clock());
1228 // The current delayed wake up may have been determined by the Throttler.
1229 // Update it now that there is no Throttler.
1230 UpdateWakeUp(&lazy_now);
1231 }
1232
UpdateWakeUp(LazyNow * lazy_now)1233 void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) {
1234 absl::optional<WakeUp> wake_up = GetNextDesiredWakeUp();
1235 if (main_thread_only().throttler && IsQueueEnabled()) {
1236 // GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is
1237 // nullopt, e.g. to throttle immediate tasks.
1238 wake_up = main_thread_only().throttler->GetNextAllowedWakeUp(
1239 lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask());
1240 }
1241 SetNextWakeUp(lazy_now, wake_up);
1242 }
1243
SetNextWakeUp(LazyNow * lazy_now,absl::optional<WakeUp> wake_up)1244 void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now,
1245 absl::optional<WakeUp> wake_up) {
1246 if (main_thread_only().scheduled_wake_up == wake_up)
1247 return;
1248 main_thread_only().scheduled_wake_up = wake_up;
1249 main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now,
1250 wake_up);
1251 }
1252
HasTaskToRunImmediately() const1253 bool TaskQueueImpl::HasTaskToRunImmediately() const {
1254 // Any work queue tasks count as immediate work.
1255 if (!main_thread_only().delayed_work_queue->Empty() ||
1256 !main_thread_only().immediate_work_queue->Empty()) {
1257 return true;
1258 }
1259
1260 // Finally tasks on |immediate_incoming_queue| count as immediate work.
1261 base::internal::CheckedAutoLock lock(any_thread_lock_);
1262 return !any_thread_.immediate_incoming_queue.empty();
1263 }
1264
HasTaskToRunImmediatelyLocked() const1265 bool TaskQueueImpl::HasTaskToRunImmediatelyLocked() const {
1266 return !main_thread_only().delayed_work_queue->Empty() ||
1267 !main_thread_only().immediate_work_queue->Empty() ||
1268 !any_thread_.immediate_incoming_queue.empty();
1269 }
1270
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)1271 void TaskQueueImpl::SetOnTaskStartedHandler(
1272 TaskQueueImpl::OnTaskStartedHandler handler) {
1273 DCHECK(should_notify_observers_ || handler.is_null());
1274 main_thread_only().on_task_started_handler = std::move(handler);
1275 }
1276
OnTaskStarted(const Task & task,const TaskQueue::TaskTiming & task_timing)1277 void TaskQueueImpl::OnTaskStarted(const Task& task,
1278 const TaskQueue::TaskTiming& task_timing) {
1279 if (!main_thread_only().on_task_started_handler.is_null())
1280 main_thread_only().on_task_started_handler.Run(task, task_timing);
1281 }
1282
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)1283 void TaskQueueImpl::SetOnTaskCompletedHandler(
1284 TaskQueueImpl::OnTaskCompletedHandler handler) {
1285 DCHECK(should_notify_observers_ || handler.is_null());
1286 main_thread_only().on_task_completed_handler = std::move(handler);
1287 }
1288
OnTaskCompleted(const Task & task,TaskQueue::TaskTiming * task_timing,LazyNow * lazy_now)1289 void TaskQueueImpl::OnTaskCompleted(const Task& task,
1290 TaskQueue::TaskTiming* task_timing,
1291 LazyNow* lazy_now) {
1292 if (!main_thread_only().on_task_completed_handler.is_null()) {
1293 main_thread_only().on_task_completed_handler.Run(task, task_timing,
1294 lazy_now);
1295 }
1296 }
1297
RequiresTaskTiming() const1298 bool TaskQueueImpl::RequiresTaskTiming() const {
1299 return !main_thread_only().on_task_started_handler.is_null() ||
1300 !main_thread_only().on_task_completed_handler.is_null();
1301 }
1302
1303 std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler)1304 TaskQueueImpl::AddOnTaskPostedHandler(OnTaskPostedHandler handler) {
1305 DCHECK(should_notify_observers_ && !handler.is_null());
1306 std::unique_ptr<OnTaskPostedCallbackHandleImpl> handle =
1307 std::make_unique<OnTaskPostedCallbackHandleImpl>(this,
1308 associated_thread_);
1309 base::internal::CheckedAutoLock lock(any_thread_lock_);
1310 any_thread_.on_task_posted_handlers.insert(
1311 {handle.get(), std::move(handler)});
1312 return handle;
1313 }
1314
RemoveOnTaskPostedHandler(TaskQueueImpl::OnTaskPostedCallbackHandleImpl * on_task_posted_callback_handle)1315 void TaskQueueImpl::RemoveOnTaskPostedHandler(
1316 TaskQueueImpl::OnTaskPostedCallbackHandleImpl*
1317 on_task_posted_callback_handle) {
1318 base::internal::CheckedAutoLock lock(any_thread_lock_);
1319 any_thread_.on_task_posted_handlers.erase(on_task_posted_callback_handle);
1320 }
1321
SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger)1322 void TaskQueueImpl::SetTaskExecutionTraceLogger(
1323 TaskExecutionTraceLogger logger) {
1324 DCHECK(should_notify_observers_ || logger.is_null());
1325 main_thread_only().task_execution_trace_logger = std::move(logger);
1326 }
1327
IsUnregistered() const1328 bool TaskQueueImpl::IsUnregistered() const {
1329 base::internal::CheckedAutoLock lock(any_thread_lock_);
1330 return any_thread_.unregistered;
1331 }
1332
GetSequenceManagerWeakPtr()1333 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
1334 return sequence_manager_->GetWeakPtr();
1335 }
1336
ActivateDelayedFenceIfNeeded(const Task & task)1337 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) {
1338 if (!main_thread_only().delayed_fence)
1339 return;
1340 if (main_thread_only().delayed_fence.value() > task.delayed_run_time)
1341 return;
1342 InsertFence(Fence(task.task_order()));
1343 main_thread_only().delayed_fence = absl::nullopt;
1344 }
1345
MaybeReportIpcTaskQueuedFromMainThread(const Task & pending_task)1346 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread(
1347 const Task& pending_task) {
1348 if (!pending_task.ipc_hash)
1349 return;
1350
1351 // It's possible that tracing was just enabled and no disabled time has been
1352 // stored. In that case, skip emitting the event.
1353 if (!main_thread_only().disabled_time)
1354 return;
1355
1356 bool tracing_enabled = false;
1357 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1358 &tracing_enabled);
1359 if (!tracing_enabled)
1360 return;
1361
1362 if (main_thread_only().is_enabled ||
1363 !main_thread_only().should_report_posted_tasks_when_disabled) {
1364 return;
1365 }
1366
1367 base::TimeDelta time_since_disabled =
1368 sequence_manager_->main_thread_clock()->NowTicks() -
1369 main_thread_only().disabled_time.value();
1370
1371 ReportIpcTaskQueued(pending_task, time_since_disabled);
1372 }
1373
ShouldReportIpcTaskQueuedFromAnyThreadLocked(base::TimeDelta * time_since_disabled)1374 bool TaskQueueImpl::ShouldReportIpcTaskQueuedFromAnyThreadLocked(
1375 base::TimeDelta* time_since_disabled) {
1376 // It's possible that tracing was just enabled and no disabled time has been
1377 // stored. In that case, skip emitting the event.
1378 if (!any_thread_.tracing_only.disabled_time)
1379 return false;
1380
1381 if (any_thread_.tracing_only.is_enabled ||
1382 any_thread_.tracing_only.should_report_posted_tasks_when_disabled) {
1383 return false;
1384 }
1385
1386 *time_since_disabled = sequence_manager_->any_thread_clock()->NowTicks() -
1387 any_thread_.tracing_only.disabled_time.value();
1388 return true;
1389 }
1390
MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task & pending_task)1391 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadLocked(
1392 const Task& pending_task) {
1393 if (!pending_task.ipc_hash)
1394 return;
1395
1396 bool tracing_enabled = false;
1397 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1398 &tracing_enabled);
1399 if (!tracing_enabled)
1400 return;
1401
1402 base::TimeDelta time_since_disabled;
1403 if (ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled))
1404 ReportIpcTaskQueued(pending_task, time_since_disabled);
1405 }
1406
MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task & pending_task)1407 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(
1408 const Task& pending_task) {
1409 if (!pending_task.ipc_hash)
1410 return;
1411
1412 bool tracing_enabled = false;
1413 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1414 &tracing_enabled);
1415 if (!tracing_enabled)
1416 return;
1417
1418 base::TimeDelta time_since_disabled;
1419 bool should_report = false;
1420 {
1421 base::internal::CheckedAutoLock lock(any_thread_lock_);
1422 should_report =
1423 ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled);
1424 }
1425
1426 if (should_report)
1427 ReportIpcTaskQueued(pending_task, time_since_disabled);
1428 }
1429
ReportIpcTaskQueued(const Task & pending_task,const base::TimeDelta & time_since_disabled)1430 void TaskQueueImpl::ReportIpcTaskQueued(
1431 const Task& pending_task,
1432 const base::TimeDelta& time_since_disabled) {
1433 TRACE_EVENT_INSTANT(
1434 TRACE_DISABLED_BY_DEFAULT("lifecycles"), "task_posted_to_disabled_queue",
1435 [&](perfetto::EventContext ctx) {
1436 auto* proto = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
1437 ->set_chrome_task_posted_to_disabled_queue();
1438 proto->set_time_since_disabled_ms(
1439 checked_cast<uint64_t>(time_since_disabled.InMilliseconds()));
1440 proto->set_ipc_hash(pending_task.ipc_hash);
1441 proto->set_source_location_iid(
1442 base::trace_event::InternedSourceLocation::Get(
1443 &ctx, pending_task.posted_from));
1444 });
1445 }
1446
OnQueueUnblocked()1447 void TaskQueueImpl::OnQueueUnblocked() {
1448 DCHECK(IsQueueEnabled());
1449 DCHECK(!BlockedByFence());
1450
1451 main_thread_only().enqueue_order_at_which_we_became_unblocked =
1452 sequence_manager_->GetNextSequenceNumber();
1453 if (GetQueuePriority() <= DefaultPriority()) {
1454 // We are default priority or more important so update
1455 // |enqueue_order_at_which_we_became_unblocked_with_normal_priority|.
1456 main_thread_only()
1457 .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
1458 main_thread_only().enqueue_order_at_which_we_became_unblocked;
1459 }
1460 }
1461
1462 std::unique_ptr<TaskQueue::QueueEnabledVoter>
CreateQueueEnabledVoter()1463 TaskQueueImpl::CreateQueueEnabledVoter() {
1464 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1465 return WrapUnique(
1466 new TaskQueue::QueueEnabledVoter(voter_weak_ptr_factory_.GetWeakPtr()));
1467 }
1468
AddQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1469 void TaskQueueImpl::AddQueueEnabledVoter(bool voter_is_enabled,
1470 TaskQueue::QueueEnabledVoter& voter) {
1471 ++main_thread_only().voter_count;
1472 if (voter_is_enabled) {
1473 ++main_thread_only().enabled_voter_count;
1474 }
1475 }
1476
RemoveQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1477 void TaskQueueImpl::RemoveQueueEnabledVoter(
1478 bool voter_is_enabled,
1479 TaskQueue::QueueEnabledVoter& voter) {
1480 bool was_enabled = AreAllQueueEnabledVotersEnabled();
1481 if (voter_is_enabled) {
1482 --main_thread_only().enabled_voter_count;
1483 DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1484 }
1485
1486 --main_thread_only().voter_count;
1487 DCHECK_GE(main_thread_only().voter_count, 0);
1488
1489 bool is_enabled = AreAllQueueEnabledVotersEnabled();
1490 if (was_enabled != is_enabled) {
1491 SetQueueEnabled(is_enabled);
1492 }
1493 }
1494
OnQueueEnabledVoteChanged(bool enabled)1495 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
1496 bool was_enabled = AreAllQueueEnabledVotersEnabled();
1497 if (enabled) {
1498 ++main_thread_only().enabled_voter_count;
1499 DCHECK_LE(main_thread_only().enabled_voter_count,
1500 main_thread_only().voter_count);
1501 } else {
1502 --main_thread_only().enabled_voter_count;
1503 DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1504 }
1505
1506 bool is_enabled = AreAllQueueEnabledVotersEnabled();
1507 if (was_enabled != is_enabled) {
1508 SetQueueEnabled(is_enabled);
1509 }
1510 }
1511
CompleteInitializationOnBoundThread()1512 void TaskQueueImpl::CompleteInitializationOnBoundThread() {
1513 voter_weak_ptr_factory_.BindToCurrentSequence(PassKey<TaskQueueImpl>());
1514 }
1515
DefaultPriority() const1516 TaskQueue::QueuePriority TaskQueueImpl::DefaultPriority() const {
1517 return sequence_manager()->settings().priority_settings.default_priority();
1518 }
1519
1520 TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default;
1521 TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default;
1522
push(Task task)1523 void TaskQueueImpl::DelayedIncomingQueue::push(Task task) {
1524 // TODO(crbug.com/1247285): Remove this once the cause of corrupted tasks in
1525 // the queue is understood.
1526 CHECK(task.task);
1527 if (task.is_high_res)
1528 pending_high_res_tasks_++;
1529 queue_.insert(std::move(task));
1530 }
1531
remove(HeapHandle heap_handle)1532 void TaskQueueImpl::DelayedIncomingQueue::remove(HeapHandle heap_handle) {
1533 DCHECK(!empty());
1534 DCHECK_LT(heap_handle.index(), queue_.size());
1535 Task task = queue_.take(heap_handle);
1536 if (task.is_high_res) {
1537 pending_high_res_tasks_--;
1538 DCHECK_GE(pending_high_res_tasks_, 0);
1539 }
1540 }
1541
take_top()1542 Task TaskQueueImpl::DelayedIncomingQueue::take_top() {
1543 DCHECK(!empty());
1544 if (queue_.top().is_high_res) {
1545 pending_high_res_tasks_--;
1546 DCHECK_GE(pending_high_res_tasks_, 0);
1547 }
1548 return queue_.take_top();
1549 }
1550
swap(DelayedIncomingQueue * rhs)1551 void TaskQueueImpl::DelayedIncomingQueue::swap(DelayedIncomingQueue* rhs) {
1552 std::swap(pending_high_res_tasks_, rhs->pending_high_res_tasks_);
1553 std::swap(queue_, rhs->queue_);
1554 }
1555
SweepCancelledTasks(SequenceManagerImpl * sequence_manager)1556 void TaskQueueImpl::DelayedIncomingQueue::SweepCancelledTasks(
1557 SequenceManagerImpl* sequence_manager) {
1558 // Note: IntrusiveHeap::EraseIf() is safe against re-entrancy caused by
1559 // deleted tasks posting new tasks.
1560 queue_.EraseIf([this](const Task& task) {
1561 if (task.task.IsCancelled()) {
1562 if (task.is_high_res) {
1563 --pending_high_res_tasks_;
1564 DCHECK_GE(pending_high_res_tasks_, 0);
1565 }
1566 return true;
1567 }
1568 return false;
1569 });
1570 }
1571
AsValue(TimeTicks now) const1572 Value::List TaskQueueImpl::DelayedIncomingQueue::AsValue(TimeTicks now) const {
1573 Value::List state;
1574 for (const Task& task : queue_)
1575 state.Append(TaskAsValue(task, now));
1576 return state;
1577 }
1578
operator ()(const Task & lhs,const Task & rhs) const1579 bool TaskQueueImpl::DelayedIncomingQueue::Compare::operator()(
1580 const Task& lhs,
1581 const Task& rhs) const {
1582 // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
1583 // not be the first task eligible to run, but tasks will always become ripe
1584 // before their latest_delayed_run_time().
1585 const TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
1586 const TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
1587 if (lhs_latest_delayed_run_time == rhs_latest_delayed_run_time)
1588 return lhs.sequence_num > rhs.sequence_num;
1589 return lhs_latest_delayed_run_time > rhs_latest_delayed_run_time;
1590 }
1591
OnTaskPostedCallbackHandleImpl(TaskQueueImpl * task_queue_impl,scoped_refptr<const AssociatedThreadId> associated_thread)1592 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::OnTaskPostedCallbackHandleImpl(
1593 TaskQueueImpl* task_queue_impl,
1594 scoped_refptr<const AssociatedThreadId> associated_thread)
1595 : task_queue_impl_(task_queue_impl),
1596 associated_thread_(std::move(associated_thread)) {
1597 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1598 }
1599
1600 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::
~OnTaskPostedCallbackHandleImpl()1601 ~OnTaskPostedCallbackHandleImpl() {
1602 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1603 if (task_queue_impl_)
1604 task_queue_impl_->RemoveOnTaskPostedHandler(this);
1605 }
1606
1607 } // namespace internal
1608 } // namespace sequence_manager
1609 } // namespace base
1610