1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task/sequence_manager/task_queue_impl.h"
6
7 #include <inttypes.h>
8
9 #include <memory>
10 #include <utility>
11
12 #include "base/check.h"
13 #include "base/compiler_specific.h"
14 #include "base/containers/stack_container.h"
15 #include "base/feature_list.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/metrics/histogram_macros.h"
19 #include "base/notreached.h"
20 #include "base/observer_list.h"
21 #include "base/ranges/algorithm.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/task/common/scoped_defer_task_posting.h"
24 #include "base/task/default_delayed_task_handle_delegate.h"
25 #include "base/task/sequence_manager/associated_thread_id.h"
26 #include "base/task/sequence_manager/delayed_task_handle_delegate.h"
27 #include "base/task/sequence_manager/fence.h"
28 #include "base/task/sequence_manager/sequence_manager_impl.h"
29 #include "base/task/sequence_manager/task_order.h"
30 #include "base/task/sequence_manager/wake_up_queue.h"
31 #include "base/task/sequence_manager/work_queue.h"
32 #include "base/task/task_features.h"
33 #include "base/task/task_observer.h"
34 #include "base/threading/thread_restrictions.h"
35 #include "base/time/time.h"
36 #include "base/trace_event/base_tracing.h"
37 #include "build/build_config.h"
38 #include "third_party/abseil-cpp/absl/types/optional.h"
39
40 namespace base {
41 namespace sequence_manager {
42
43 namespace {
44
45 // Controls whether cancelled tasks are removed from the delayed queue.
46 BASE_FEATURE(kSweepCancelledTasks,
47 "SweepCancelledTasks",
48 FEATURE_ENABLED_BY_DEFAULT);
49
50 } // namespace
51
52 namespace internal {
53
54 namespace {
55
56 // Cache of the state of the kRemoveCanceledTasksInTaskQueue,
57 // kSweepCancelledTasks and kExplicitHighResolutionTimerWin features. This
58 // avoids the need to constantly query their enabled state through
59 // FeatureList::IsEnabled().
60 bool g_is_remove_canceled_tasks_in_task_queue_enabled = true;
61 bool g_is_sweep_cancelled_tasks_enabled =
62 kSweepCancelledTasks.default_state == FEATURE_ENABLED_BY_DEFAULT;
63 #if BUILDFLAG(IS_WIN)
64 // An atomic is used here because the flag is queried from other threads when
65 // tasks are posted cross-thread, which can race with its initialization.
66 std::atomic_bool g_explicit_high_resolution_timer_win{false};
67 #endif // BUILDFLAG(IS_WIN)
68
69 } // namespace
70
GuardedTaskPoster(TaskQueueImpl * outer)71 TaskQueueImpl::GuardedTaskPoster::GuardedTaskPoster(TaskQueueImpl* outer)
72 : outer_(outer) {}
73
~GuardedTaskPoster()74 TaskQueueImpl::GuardedTaskPoster::~GuardedTaskPoster() {}
75
PostTask(PostedTask task)76 bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) {
77 // Do not process new PostTasks while we are handling a PostTask (tracing
78 // has to do this) as it can lead to a deadlock and defer it instead.
79 ScopedDeferTaskPosting disallow_task_posting;
80
81 auto token = operations_controller_.TryBeginOperation();
82 if (!token)
83 return false;
84
85 outer_->PostTask(std::move(task));
86 return true;
87 }
88
PostCancelableTask(PostedTask task)89 DelayedTaskHandle TaskQueueImpl::GuardedTaskPoster::PostCancelableTask(
90 PostedTask task) {
91 // Do not process new PostTasks while we are handling a PostTask (tracing
92 // has to do this) as it can lead to a deadlock and defer it instead.
93 ScopedDeferTaskPosting disallow_task_posting;
94
95 auto token = operations_controller_.TryBeginOperation();
96 if (!token)
97 return DelayedTaskHandle();
98
99 auto delayed_task_handle_delegate =
100 std::make_unique<DelayedTaskHandleDelegate>(outer_);
101 task.delayed_task_handle_delegate = delayed_task_handle_delegate->AsWeakPtr();
102
103 outer_->PostTask(std::move(task));
104 DCHECK(delayed_task_handle_delegate->IsValid());
105 return DelayedTaskHandle(std::move(delayed_task_handle_delegate));
106 }
107
TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,scoped_refptr<const AssociatedThreadId> associated_thread,TaskType task_type)108 TaskQueueImpl::TaskRunner::TaskRunner(
109 scoped_refptr<GuardedTaskPoster> task_poster,
110 scoped_refptr<const AssociatedThreadId> associated_thread,
111 TaskType task_type)
112 : task_poster_(std::move(task_poster)),
113 associated_thread_(std::move(associated_thread)),
114 task_type_(task_type) {}
115
~TaskRunner()116 TaskQueueImpl::TaskRunner::~TaskRunner() {}
117
PostDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)118 bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,
119 OnceClosure callback,
120 TimeDelta delay) {
121 return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
122 delay, Nestable::kNestable,
123 task_type_));
124 }
125
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)126 bool TaskQueueImpl::TaskRunner::PostDelayedTaskAt(
127 subtle::PostDelayedTaskPassKey,
128 const Location& location,
129 OnceClosure callback,
130 TimeTicks delayed_run_time,
131 base::subtle::DelayPolicy delay_policy) {
132 return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
133 delayed_run_time, delay_policy,
134 Nestable::kNestable, task_type_));
135 }
136
PostCancelableDelayedTaskAt(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)137 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTaskAt(
138 subtle::PostDelayedTaskPassKey pass_key,
139 const Location& location,
140 OnceClosure callback,
141 TimeTicks delayed_run_time,
142 base::subtle::DelayPolicy delay_policy) {
143 if (!g_is_remove_canceled_tasks_in_task_queue_enabled) {
144 // This will revert to PostDelayedTaskAt() with default DelayedTaskHandle.
145 return SequencedTaskRunner::PostCancelableDelayedTaskAt(
146 pass_key, location, std::move(callback), delayed_run_time,
147 delay_policy);
148 }
149 return task_poster_->PostCancelableTask(
150 PostedTask(this, std::move(callback), location, delayed_run_time,
151 delay_policy, Nestable::kNestable, task_type_));
152 }
153
PostCancelableDelayedTask(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeDelta delay)154 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTask(
155 subtle::PostDelayedTaskPassKey pass_key,
156 const Location& location,
157 OnceClosure callback,
158 TimeDelta delay) {
159 if (!g_is_remove_canceled_tasks_in_task_queue_enabled) {
160 return SequencedTaskRunner::PostCancelableDelayedTask(
161 pass_key, location, std::move(callback), delay);
162 }
163
164 return task_poster_->PostCancelableTask(
165 PostedTask(this, std::move(callback), location, delay,
166 Nestable::kNestable, task_type_));
167 }
168
PostNonNestableDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)169 bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask(
170 const Location& location,
171 OnceClosure callback,
172 TimeDelta delay) {
173 return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
174 delay, Nestable::kNonNestable,
175 task_type_));
176 }
177
RunsTasksInCurrentSequence() const178 bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const {
179 return associated_thread_->IsBoundToCurrentThread();
180 }
181
182 // static
InitializeFeatures()183 void TaskQueueImpl::InitializeFeatures() {
184 ApplyRemoveCanceledTasksInTaskQueue();
185 g_is_sweep_cancelled_tasks_enabled =
186 FeatureList::IsEnabled(kSweepCancelledTasks);
187 #if BUILDFLAG(IS_WIN)
188 g_explicit_high_resolution_timer_win.store(
189 FeatureList::IsEnabled(kExplicitHighResolutionTimerWin),
190 std::memory_order_relaxed);
191 #endif // BUILDFLAG(IS_WIN)
192 }
193
194 // static
ApplyRemoveCanceledTasksInTaskQueue()195 void TaskQueueImpl::ApplyRemoveCanceledTasksInTaskQueue() {
196 // Since kRemoveCanceledTasksInTaskQueue is not constexpr (forbidden for
197 // Features), it cannot be used to initialize
198 // |g_is_remove_canceled_tasks_in_task_queue_enabled| at compile time. At
199 // least DCHECK that its initial value matches the default value of the
200 // feature here.
201 DCHECK_EQ(g_is_remove_canceled_tasks_in_task_queue_enabled,
202 kRemoveCanceledTasksInTaskQueue.default_state ==
203 FEATURE_ENABLED_BY_DEFAULT);
204 g_is_remove_canceled_tasks_in_task_queue_enabled =
205 FeatureList::IsEnabled(kRemoveCanceledTasksInTaskQueue);
206 }
207
208 // static
ResetRemoveCanceledTasksInTaskQueueForTesting()209 void TaskQueueImpl::ResetRemoveCanceledTasksInTaskQueueForTesting() {
210 g_is_remove_canceled_tasks_in_task_queue_enabled =
211 kRemoveCanceledTasksInTaskQueue.default_state ==
212 FEATURE_ENABLED_BY_DEFAULT;
213 }
214
TaskQueueImpl(SequenceManagerImpl * sequence_manager,WakeUpQueue * wake_up_queue,const TaskQueue::Spec & spec)215 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
216 WakeUpQueue* wake_up_queue,
217 const TaskQueue::Spec& spec)
218 : name_(spec.name),
219 sequence_manager_(sequence_manager),
220 associated_thread_(sequence_manager
221 ? sequence_manager->associated_thread()
222 : AssociatedThreadId::CreateBound()),
223 task_poster_(MakeRefCounted<GuardedTaskPoster>(this)),
224 main_thread_only_(this, wake_up_queue),
225 empty_queues_to_reload_handle_(
226 sequence_manager
227 ? sequence_manager->GetFlagToRequestReloadForEmptyQueue(this)
228 : AtomicFlagSet::AtomicFlag()),
229 should_monitor_quiescence_(spec.should_monitor_quiescence),
230 should_notify_observers_(spec.should_notify_observers),
231 delayed_fence_allowed_(spec.delayed_fence_allowed) {
232 UpdateCrossThreadQueueStateLocked();
233 // SequenceManager can't be set later, so we need to prevent task runners
234 // from posting any tasks.
235 if (sequence_manager_)
236 task_poster_->StartAcceptingOperations();
237 }
238
~TaskQueueImpl()239 TaskQueueImpl::~TaskQueueImpl() {
240 #if DCHECK_IS_ON()
241 base::internal::CheckedAutoLock lock(any_thread_lock_);
242 // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
243 // contains a strong reference to this TaskQueueImpl and the
244 // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
245 // queues.
246 DCHECK(any_thread_.unregistered)
247 << "UnregisterTaskQueue must be called first!";
248 #endif
249 }
250
251 TaskQueueImpl::AnyThread::AnyThread() = default;
252 TaskQueueImpl::AnyThread::~AnyThread() = default;
253
254 TaskQueueImpl::AnyThread::TracingOnly::TracingOnly() = default;
255 TaskQueueImpl::AnyThread::TracingOnly::~TracingOnly() = default;
256
MainThreadOnly(TaskQueueImpl * task_queue,WakeUpQueue * wake_up_queue)257 TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue,
258 WakeUpQueue* wake_up_queue)
259 : wake_up_queue(wake_up_queue),
260 delayed_work_queue(
261 new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
262 immediate_work_queue(new WorkQueue(task_queue,
263 "immediate",
264 WorkQueue::QueueType::kImmediate)) {}
265
266 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
267
CreateTaskRunner(TaskType task_type) const268 scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(
269 TaskType task_type) const {
270 return MakeRefCounted<TaskRunner>(task_poster_, associated_thread_,
271 task_type);
272 }
273
UnregisterTaskQueue()274 void TaskQueueImpl::UnregisterTaskQueue() {
275 TRACE_EVENT0("base", "TaskQueueImpl::UnregisterTaskQueue");
276 // Detach task runners.
277 {
278 ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
279 task_poster_->ShutdownAndWaitForZeroOperations();
280 }
281
282 TaskDeque immediate_incoming_queue;
283 base::flat_map<raw_ptr<OnTaskPostedCallbackHandleImpl>, OnTaskPostedHandler>
284 on_task_posted_handlers;
285
286 {
287 base::internal::CheckedAutoLock lock(any_thread_lock_);
288 any_thread_.unregistered = true;
289 immediate_incoming_queue.swap(any_thread_.immediate_incoming_queue);
290
291 for (auto& handler : any_thread_.on_task_posted_handlers)
292 handler.first->UnregisterTaskQueue();
293 any_thread_.on_task_posted_handlers.swap(on_task_posted_handlers);
294 }
295
296 if (main_thread_only().wake_up_queue) {
297 main_thread_only().wake_up_queue->UnregisterQueue(this);
298 }
299
300 main_thread_only().on_task_started_handler = OnTaskStartedHandler();
301 main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
302 main_thread_only().wake_up_queue = nullptr;
303 main_thread_only().throttler = nullptr;
304 empty_queues_to_reload_handle_.ReleaseAtomicFlag();
305
306 // It is possible for a task to hold a scoped_refptr to this, which
307 // will lead to TaskQueueImpl destructor being called when deleting a task.
308 // To avoid use-after-free, we need to clear all fields of a task queue
309 // before starting to delete the tasks.
310 // All work queues and priority queues containing tasks should be moved to
311 // local variables on stack (std::move for unique_ptrs and swap for queues)
312 // before clearing them and deleting tasks.
313
314 // Flush the queues outside of the lock because TSAN complains about a lock
315 // order inversion for tasks that are posted from within a lock, with a
316 // destructor that acquires the same lock.
317
318 DelayedIncomingQueue delayed_incoming_queue;
319 delayed_incoming_queue.swap(&main_thread_only().delayed_incoming_queue);
320 std::unique_ptr<WorkQueue> immediate_work_queue =
321 std::move(main_thread_only().immediate_work_queue);
322 std::unique_ptr<WorkQueue> delayed_work_queue =
323 std::move(main_thread_only().delayed_work_queue);
324 }
325
GetName() const326 const char* TaskQueueImpl::GetName() const {
327 return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_);
328 }
329
GetProtoName() const330 QueueName TaskQueueImpl::GetProtoName() const {
331 return name_;
332 }
333
PostTask(PostedTask task)334 void TaskQueueImpl::PostTask(PostedTask task) {
335 CurrentThread current_thread =
336 associated_thread_->IsBoundToCurrentThread()
337 ? TaskQueueImpl::CurrentThread::kMainThread
338 : TaskQueueImpl::CurrentThread::kNotMainThread;
339
340 #if DCHECK_IS_ON()
341 TimeDelta delay = GetTaskDelayAdjustment(current_thread);
342 if (absl::holds_alternative<base::TimeTicks>(
343 task.delay_or_delayed_run_time)) {
344 absl::get<base::TimeTicks>(task.delay_or_delayed_run_time) += delay;
345 } else {
346 absl::get<base::TimeDelta>(task.delay_or_delayed_run_time) += delay;
347 }
348 #endif // DCHECK_IS_ON()
349
350 if (!task.is_delayed()) {
351 PostImmediateTaskImpl(std::move(task), current_thread);
352 } else {
353 PostDelayedTaskImpl(std::move(task), current_thread);
354 }
355 }
356
RemoveCancelableTask(HeapHandle heap_handle)357 void TaskQueueImpl::RemoveCancelableTask(HeapHandle heap_handle) {
358 // Can only cancel from the current thread.
359 DCHECK(associated_thread_->IsBoundToCurrentThread());
360 DCHECK(heap_handle.IsValid());
361
362 main_thread_only().delayed_incoming_queue.remove(heap_handle);
363
364 // Only update the delayed wake up if the top task is removed.
365 if (heap_handle.index() == 0u) {
366 LazyNow lazy_now(sequence_manager_->main_thread_clock());
367 UpdateWakeUp(&lazy_now);
368 }
369 }
370
GetTaskDelayAdjustment(CurrentThread current_thread)371 TimeDelta TaskQueueImpl::GetTaskDelayAdjustment(CurrentThread current_thread) {
372 #if DCHECK_IS_ON()
373 if (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread) {
374 base::internal::CheckedAutoLock lock(any_thread_lock_);
375 // Add a per-priority delay to cross thread tasks. This can help diagnose
376 // scheduler induced flakiness by making things flake most of the time.
377 return sequence_manager_->settings()
378 .priority_settings
379 .per_priority_cross_thread_task_delay()[any_thread_.queue_set_index];
380 } else {
381 return sequence_manager_->settings()
382 .priority_settings.per_priority_same_thread_task_delay()
383 [main_thread_only().immediate_work_queue->work_queue_set_index()];
384 }
385 #else
386 // No delay adjustment.
387 return TimeDelta();
388 #endif // DCHECK_IS_ON()
389 }
390
PostImmediateTaskImpl(PostedTask task,CurrentThread current_thread)391 void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,
392 CurrentThread current_thread) {
393 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
394 // for details.
395 CHECK(task.callback);
396
397 bool should_schedule_work = false;
398 {
399 // TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue
400 // See https://crbug.com/901800
401 base::internal::CheckedAutoLock lock(any_thread_lock_);
402 bool add_queue_time_to_tasks = sequence_manager_->GetAddQueueTimeToTasks();
403 TimeTicks queue_time;
404 if (add_queue_time_to_tasks || delayed_fence_allowed_)
405 queue_time = sequence_manager_->any_thread_clock()->NowTicks();
406
407 // The sequence number must be incremented atomically with pushing onto the
408 // incoming queue. Otherwise if there are several threads posting task we
409 // risk breaking the assumption that sequence numbers increase monotonically
410 // within a queue.
411 EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
412 bool was_immediate_incoming_queue_empty =
413 any_thread_.immediate_incoming_queue.empty();
414 any_thread_.immediate_incoming_queue.push_back(
415 Task(std::move(task), sequence_number, sequence_number, queue_time));
416
417 #if DCHECK_IS_ON()
418 any_thread_.immediate_incoming_queue.back().cross_thread_ =
419 (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread);
420 #endif
421
422 sequence_manager_->WillQueueTask(
423 &any_thread_.immediate_incoming_queue.back());
424 MaybeReportIpcTaskQueuedFromAnyThreadLocked(
425 any_thread_.immediate_incoming_queue.back());
426
427 for (auto& handler : any_thread_.on_task_posted_handlers) {
428 DCHECK(!handler.second.is_null());
429 handler.second.Run(any_thread_.immediate_incoming_queue.back());
430 }
431
432 // If this queue was completely empty, then the SequenceManager needs to be
433 // informed so it can reload the work queue and add us to the
434 // TaskQueueSelector which can only be done from the main thread. In
435 // addition it may need to schedule a DoWork if this queue isn't blocked.
436 if (was_immediate_incoming_queue_empty &&
437 any_thread_.immediate_work_queue_empty) {
438 empty_queues_to_reload_handle_.SetActive(true);
439 should_schedule_work =
440 any_thread_.post_immediate_task_should_schedule_work;
441 }
442 }
443
444 // On windows it's important to call this outside of a lock because calling a
445 // pump while holding a lock can result in priority inversions. See
446 // http://shortn/_ntnKNqjDQT for a discussion.
447 //
448 // Calling ScheduleWork outside the lock should be safe, only the main thread
449 // can mutate |any_thread_.post_immediate_task_should_schedule_work|. If it
450 // transitions to false we call ScheduleWork redundantly that's harmless. If
451 // it transitions to true, the side effect of
452 // |empty_queues_to_reload_handle_SetActive(true)| is guaranteed to be picked
453 // up by the ThreadController's call to SequenceManagerImpl::DelayTillNextTask
454 // when it computes what continuation (if any) is needed.
455 if (should_schedule_work)
456 sequence_manager_->ScheduleWork();
457
458 TraceQueueSize();
459 }
460
PostDelayedTaskImpl(PostedTask posted_task,CurrentThread current_thread)461 void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task,
462 CurrentThread current_thread) {
463 // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
464 // for details.
465 CHECK(posted_task.callback);
466
467 if (current_thread == CurrentThread::kMainThread) {
468 LazyNow lazy_now(sequence_manager_->main_thread_clock());
469 Task pending_task = MakeDelayedTask(std::move(posted_task), &lazy_now);
470 sequence_manager_->MaybeAddLeewayToTask(pending_task);
471 PushOntoDelayedIncomingQueueFromMainThread(
472 std::move(pending_task), &lazy_now,
473 /* notify_task_annotator */ true);
474 } else {
475 LazyNow lazy_now(sequence_manager_->any_thread_clock());
476 PushOntoDelayedIncomingQueue(
477 MakeDelayedTask(std::move(posted_task), &lazy_now));
478 }
479 }
480
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,LazyNow * lazy_now,bool notify_task_annotator)481 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
482 Task pending_task,
483 LazyNow* lazy_now,
484 bool notify_task_annotator) {
485 #if DCHECK_IS_ON()
486 pending_task.cross_thread_ = false;
487 #endif
488
489 if (notify_task_annotator) {
490 sequence_manager_->WillQueueTask(&pending_task);
491 MaybeReportIpcTaskQueuedFromMainThread(pending_task);
492 }
493 RecordQueuingDelayedTaskMetrics(pending_task, lazy_now);
494 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
495 UpdateWakeUp(lazy_now);
496
497 TraceQueueSize();
498 }
499
PushOntoDelayedIncomingQueue(Task pending_task)500 void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {
501 sequence_manager_->WillQueueTask(&pending_task);
502 MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(pending_task);
503
504 #if DCHECK_IS_ON()
505 pending_task.cross_thread_ = true;
506 #endif
507
508 // TODO(altimin): Add a copy method to Task to capture metadata here.
509 auto task_runner = pending_task.task_runner;
510 const auto task_type = pending_task.task_type;
511 PostImmediateTaskImpl(
512 PostedTask(std::move(task_runner),
513 BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
514 Unretained(this), std::move(pending_task)),
515 FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),
516 CurrentThread::kNotMainThread);
517 }
518
ScheduleDelayedWorkTask(Task pending_task)519 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
520 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
521 sequence_manager_->MaybeAddLeewayToTask(pending_task);
522 TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks();
523 LazyNow lazy_now(now);
524 // A delayed task is ready to run as soon as earliest_delayed_run_time() is
525 // reached.
526 if (pending_task.earliest_delayed_run_time() <= now) {
527 // If |delayed_run_time| is in the past then push it onto the work queue
528 // immediately. To ensure the right task ordering we need to temporarily
529 // push it onto the |delayed_incoming_queue|.
530 pending_task.delayed_run_time = now;
531 main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
532 MoveReadyDelayedTasksToWorkQueue(
533 &lazy_now, sequence_manager_->GetNextSequenceNumber());
534 } else {
535 // If |delayed_run_time| is in the future we can queue it as normal.
536 PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
537 &lazy_now, false);
538 }
539 TraceQueueSize();
540 }
541
RecordQueuingDelayedTaskMetrics(const Task & pending_task,LazyNow * lazy_now)542 void TaskQueueImpl::RecordQueuingDelayedTaskMetrics(const Task& pending_task,
543 LazyNow* lazy_now) {
544 // The sampling depends on having a high-resolution clock.
545 if (!base::TimeTicks::IsHighResolution())
546 return;
547
548 // A sample is taken on average every kSampleRate tasks.
549 static constexpr int kSampleRate = 10000;
550
551 // Use pseudorandom sampling to avoid "running jank," which may occur
552 // when emitting many samples to a histogram in parallel. (This function is
553 // called a lot in parallel.) See https://crbug/1254354 for more details. The
554 // current time is used as a source of pseudorandomness.
555 if (((lazy_now->Now() - TimeTicks::UnixEpoch()).InMicroseconds() ^
556 pending_task.sequence_num) %
557 kSampleRate ==
558 0) {
559 // The |delay| will be different than the delay passed to PostDelayedTask
560 // for cross-thread delayed tasks.
561 const TimeDelta delay = pending_task.delayed_run_time - lazy_now->Now();
562 UMA_HISTOGRAM_LONG_TIMES("Scheduler.TaskQueueImpl.PostDelayedTaskDelay",
563 delay);
564 UMA_HISTOGRAM_COUNTS_1000(
565 "Scheduler.TaskQueueImpl.DelayedIncomingQueueSize",
566 static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
567 }
568 }
569
ReloadEmptyImmediateWorkQueue()570 void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
571 DCHECK(main_thread_only().immediate_work_queue->Empty());
572 main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();
573
574 if (main_thread_only().throttler && IsQueueEnabled()) {
575 main_thread_only().throttler->OnHasImmediateTask();
576 }
577 }
578
TakeImmediateIncomingQueueTasks(TaskDeque * queue)579 void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
580 DCHECK(queue->empty());
581 // Now is a good time to consider reducing the empty queue's capacity if we're
582 // wasting memory, before we make it the `immediate_incoming_queue`.
583 queue->MaybeShrinkQueue();
584
585 base::internal::CheckedAutoLock lock(any_thread_lock_);
586 queue->swap(any_thread_.immediate_incoming_queue);
587
588 // Activate delayed fence if necessary. This is ideologically similar to
589 // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
590 // from any thread we can't generate an enqueue order for the fence there,
591 // so we have to check all immediate tasks and use their enqueue order for
592 // a fence.
593 if (main_thread_only().delayed_fence) {
594 for (const Task& task : *queue) {
595 DCHECK(!task.queue_time.is_null());
596 DCHECK(task.delayed_run_time.is_null());
597 if (task.queue_time >= main_thread_only().delayed_fence.value()) {
598 main_thread_only().delayed_fence = absl::nullopt;
599 DCHECK(!main_thread_only().current_fence);
600 main_thread_only().current_fence = Fence(task.task_order());
601 // Do not trigger WorkQueueSets notification when taking incoming
602 // immediate queue.
603 main_thread_only().immediate_work_queue->InsertFenceSilently(
604 *main_thread_only().current_fence);
605 main_thread_only().delayed_work_queue->InsertFenceSilently(
606 *main_thread_only().current_fence);
607 break;
608 }
609 }
610 }
611
612 UpdateCrossThreadQueueStateLocked();
613 }
614
IsEmpty() const615 bool TaskQueueImpl::IsEmpty() const {
616 if (!main_thread_only().delayed_work_queue->Empty() ||
617 !main_thread_only().delayed_incoming_queue.empty() ||
618 !main_thread_only().immediate_work_queue->Empty()) {
619 return false;
620 }
621
622 base::internal::CheckedAutoLock lock(any_thread_lock_);
623 return any_thread_.immediate_incoming_queue.empty();
624 }
625
GetNumberOfPendingTasks() const626 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
627 size_t task_count = 0;
628 task_count += main_thread_only().delayed_work_queue->Size();
629 task_count += main_thread_only().delayed_incoming_queue.size();
630 task_count += main_thread_only().immediate_work_queue->Size();
631
632 base::internal::CheckedAutoLock lock(any_thread_lock_);
633 task_count += any_thread_.immediate_incoming_queue.size();
634 return task_count;
635 }
636
HasTaskToRunImmediatelyOrReadyDelayedTask() const637 bool TaskQueueImpl::HasTaskToRunImmediatelyOrReadyDelayedTask() const {
638 // Any work queue tasks count as immediate work.
639 if (!main_thread_only().delayed_work_queue->Empty() ||
640 !main_thread_only().immediate_work_queue->Empty()) {
641 return true;
642 }
643
644 // Tasks on |delayed_incoming_queue| that could run now, count as
645 // immediate work.
646 if (!main_thread_only().delayed_incoming_queue.empty() &&
647 main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
648 sequence_manager_->main_thread_clock()->NowTicks()) {
649 return true;
650 }
651
652 // Finally tasks on |immediate_incoming_queue| count as immediate work.
653 base::internal::CheckedAutoLock lock(any_thread_lock_);
654 return !any_thread_.immediate_incoming_queue.empty();
655 }
656
GetNextDesiredWakeUp()657 absl::optional<WakeUp> TaskQueueImpl::GetNextDesiredWakeUp() {
658 // Note we don't scheduled a wake-up for disabled queues.
659 if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
660 return absl::nullopt;
661
662 // High resolution is needed if the queue contains high resolution tasks and
663 // has a priority index <= kNormalPriority (precise execution time is
664 // unnecessary for a low priority queue).
665 WakeUpResolution resolution = has_pending_high_resolution_tasks() &&
666 GetQueuePriority() <= DefaultPriority()
667 ? WakeUpResolution::kHigh
668 : WakeUpResolution::kLow;
669
670 const auto& top_task = main_thread_only().delayed_incoming_queue.top();
671 return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution,
672 top_task.delay_policy};
673 }
674
OnWakeUp(LazyNow * lazy_now,EnqueueOrder enqueue_order)675 void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) {
676 MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order);
677 if (main_thread_only().throttler) {
678 main_thread_only().throttler->OnWakeUp(lazy_now);
679 }
680 }
681
RemoveAllCanceledDelayedTasksFromFront(LazyNow * lazy_now)682 bool TaskQueueImpl::RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) {
683 // Because task destructors could have a side-effect of posting new tasks, we
684 // move all the cancelled tasks into a temporary container before deleting
685 // them. This is to avoid the queue from changing while iterating over it.
686 StackVector<Task, 8> tasks_to_delete;
687
688 while (!main_thread_only().delayed_incoming_queue.empty()) {
689 const Task& task = main_thread_only().delayed_incoming_queue.top();
690 CHECK(task.task);
691 if (!task.task.IsCancelled())
692 break;
693
694 tasks_to_delete->push_back(
695 main_thread_only().delayed_incoming_queue.take_top());
696 }
697
698 if (!tasks_to_delete->empty()) {
699 UpdateWakeUp(lazy_now);
700 return true;
701 }
702
703 return false;
704 }
705
MoveReadyDelayedTasksToWorkQueue(LazyNow * lazy_now,EnqueueOrder enqueue_order)706 void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue(
707 LazyNow* lazy_now,
708 EnqueueOrder enqueue_order) {
709 // Enqueue all delayed tasks that should be running now, skipping any that
710 // have been canceled.
711 WorkQueue::TaskPusher delayed_work_queue_task_pusher(
712 main_thread_only().delayed_work_queue->CreateTaskPusher());
713
714 // Because task destructors could have a side-effect of posting new tasks, we
715 // move all the cancelled tasks into a temporary container before deleting
716 // them. This is to avoid the queue from changing while iterating over it.
717 StackVector<Task, 8> tasks_to_delete;
718
719 while (!main_thread_only().delayed_incoming_queue.empty()) {
720 const Task& task = main_thread_only().delayed_incoming_queue.top();
721 CHECK(task.task);
722
723 // Leave the top task alone if it hasn't been canceled and it is not ready.
724 const bool is_cancelled = task.task.IsCancelled();
725 if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now())
726 break;
727
728 Task ready_task = main_thread_only().delayed_incoming_queue.take_top();
729 if (is_cancelled) {
730 tasks_to_delete->push_back(std::move(ready_task));
731 continue;
732 }
733
734 // The top task is ready to run. Move it to the delayed work queue.
735 #if DCHECK_IS_ON()
736 if (sequence_manager_->settings().log_task_delay_expiry)
737 VLOG(0) << GetName() << " Delay expired for "
738 << ready_task.posted_from.ToString();
739 #endif // DCHECK_IS_ON()
740 DCHECK(!ready_task.delayed_run_time.is_null());
741 DCHECK(!ready_task.enqueue_order_set());
742 ready_task.set_enqueue_order(enqueue_order);
743 ActivateDelayedFenceIfNeeded(ready_task);
744
745 delayed_work_queue_task_pusher.Push(std::move(ready_task));
746 }
747
748 // Explicitly delete tasks last.
749 tasks_to_delete->clear();
750
751 UpdateWakeUp(lazy_now);
752 }
753
TraceQueueSize() const754 void TaskQueueImpl::TraceQueueSize() const {
755 bool is_tracing;
756 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
757 TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
758 if (!is_tracing)
759 return;
760
761 // It's only safe to access the work queues from the main thread.
762 // TODO(alexclarke): We should find another way of tracing this
763 if (!associated_thread_->IsBoundToCurrentThread())
764 return;
765
766 size_t total_task_count;
767 {
768 base::internal::CheckedAutoLock lock(any_thread_lock_);
769 total_task_count = any_thread_.immediate_incoming_queue.size() +
770 main_thread_only().immediate_work_queue->Size() +
771 main_thread_only().delayed_work_queue->Size() +
772 main_thread_only().delayed_incoming_queue.size();
773 }
774 TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
775 total_task_count);
776 }
777
SetQueuePriority(TaskQueue::QueuePriority priority)778 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
779 const TaskQueue::QueuePriority previous_priority = GetQueuePriority();
780 if (priority == previous_priority)
781 return;
782 sequence_manager_->main_thread_only().selector.SetQueuePriority(this,
783 priority);
784
785 #if BUILDFLAG(IS_WIN)
786 // Updating queue priority can change whether high resolution timer is needed.
787 LazyNow lazy_now(sequence_manager_->main_thread_clock());
788 UpdateWakeUp(&lazy_now);
789 #endif
790
791 if (priority > DefaultPriority()) {
792 // |priority| is now lower than the default, so update accordingly.
793 main_thread_only()
794 .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
795 EnqueueOrder::max();
796 } else if (previous_priority > DefaultPriority()) {
797 // |priority| is no longer lower than the default, so record current
798 // sequence number.
799 DCHECK_EQ(
800 main_thread_only()
801 .enqueue_order_at_which_we_became_unblocked_with_normal_priority,
802 EnqueueOrder::max());
803 main_thread_only()
804 .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
805 sequence_manager_->GetNextSequenceNumber();
806 }
807 }
808
GetQueuePriority() const809 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
810 size_t set_index = immediate_work_queue()->work_queue_set_index();
811 DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
812 return static_cast<TaskQueue::QueuePriority>(set_index);
813 }
814
AsValue(TimeTicks now,bool force_verbose) const815 Value::Dict TaskQueueImpl::AsValue(TimeTicks now, bool force_verbose) const {
816 base::internal::CheckedAutoLock lock(any_thread_lock_);
817 Value::Dict state;
818 state.Set("name", GetName());
819 if (any_thread_.unregistered) {
820 state.Set("unregistered", true);
821 return state;
822 }
823 DCHECK(main_thread_only().delayed_work_queue);
824 DCHECK(main_thread_only().immediate_work_queue);
825
826 state.Set("task_queue_id",
827 StringPrintf("0x%" PRIx64, static_cast<uint64_t>(
828 reinterpret_cast<uintptr_t>(this))));
829 state.Set("enabled", IsQueueEnabled());
830 // TODO(crbug.com/1334256): Make base::Value able to store an int64_t and
831 // remove the various static_casts below.
832 state.Set("any_thread_.immediate_incoming_queuesize",
833 static_cast<int>(any_thread_.immediate_incoming_queue.size()));
834 state.Set("delayed_incoming_queue_size",
835 static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
836 state.Set("immediate_work_queue_size",
837 static_cast<int>(main_thread_only().immediate_work_queue->Size()));
838 state.Set("delayed_work_queue_size",
839 static_cast<int>(main_thread_only().delayed_work_queue->Size()));
840
841 state.Set("any_thread_.immediate_incoming_queuecapacity",
842 static_cast<int>(any_thread_.immediate_incoming_queue.capacity()));
843 state.Set("immediate_work_queue_capacity",
844 static_cast<int>(immediate_work_queue()->Capacity()));
845 state.Set("delayed_work_queue_capacity",
846 static_cast<int>(delayed_work_queue()->Capacity()));
847
848 if (!main_thread_only().delayed_incoming_queue.empty()) {
849 TimeDelta delay_to_next_task =
850 (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
851 sequence_manager_->main_thread_clock()->NowTicks());
852 state.Set("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF());
853 }
854 if (main_thread_only().current_fence) {
855 Value::Dict fence_state;
856 fence_state.Set(
857 "enqueue_order",
858 static_cast<int>(
859 main_thread_only().current_fence->task_order().enqueue_order()));
860 fence_state.Set("activated_in_wake_up", !main_thread_only()
861 .current_fence->task_order()
862 .delayed_run_time()
863 .is_null());
864 state.Set("current_fence", std::move(fence_state));
865 }
866 if (main_thread_only().delayed_fence) {
867 state.Set("delayed_fence_seconds_from_now",
868 (main_thread_only().delayed_fence.value() - now).InSecondsF());
869 }
870
871 bool verbose = false;
872 TRACE_EVENT_CATEGORY_GROUP_ENABLED(
873 TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
874 &verbose);
875
876 if (verbose || force_verbose) {
877 state.Set("immediate_incoming_queue",
878 QueueAsValue(any_thread_.immediate_incoming_queue, now));
879 state.Set("delayed_work_queue",
880 main_thread_only().delayed_work_queue->AsValue(now));
881 state.Set("immediate_work_queue",
882 main_thread_only().immediate_work_queue->AsValue(now));
883 state.Set("delayed_incoming_queue",
884 main_thread_only().delayed_incoming_queue.AsValue(now));
885 }
886 state.Set("priority", GetQueuePriority());
887 return state;
888 }
889
AddTaskObserver(TaskObserver * task_observer)890 void TaskQueueImpl::AddTaskObserver(TaskObserver* task_observer) {
891 main_thread_only().task_observers.AddObserver(task_observer);
892 }
893
RemoveTaskObserver(TaskObserver * task_observer)894 void TaskQueueImpl::RemoveTaskObserver(TaskObserver* task_observer) {
895 main_thread_only().task_observers.RemoveObserver(task_observer);
896 }
897
NotifyWillProcessTask(const Task & task,bool was_blocked_or_low_priority)898 void TaskQueueImpl::NotifyWillProcessTask(const Task& task,
899 bool was_blocked_or_low_priority) {
900 DCHECK(should_notify_observers_);
901
902 for (auto& observer : main_thread_only().task_observers)
903 observer.WillProcessTask(task, was_blocked_or_low_priority);
904 }
905
NotifyDidProcessTask(const Task & task)906 void TaskQueueImpl::NotifyDidProcessTask(const Task& task) {
907 DCHECK(should_notify_observers_);
908 for (auto& observer : main_thread_only().task_observers)
909 observer.DidProcessTask(task);
910 }
911
InsertFence(TaskQueue::InsertFencePosition position)912 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
913 Fence new_fence = position == TaskQueue::InsertFencePosition::kNow
914 ? Fence::CreateWithEnqueueOrder(
915 sequence_manager_->GetNextSequenceNumber())
916 : Fence::BlockingFence();
917 InsertFence(new_fence);
918 }
919
InsertFence(Fence current_fence)920 void TaskQueueImpl::InsertFence(Fence current_fence) {
921 // Only one fence may be present at a time.
922 main_thread_only().delayed_fence = absl::nullopt;
923
924 absl::optional<Fence> previous_fence = main_thread_only().current_fence;
925
926 // Tasks posted after this point will have a strictly higher enqueue order
927 // and will be blocked from running.
928 main_thread_only().current_fence = current_fence;
929 bool front_task_unblocked =
930 main_thread_only().immediate_work_queue->InsertFence(current_fence);
931 front_task_unblocked |=
932 main_thread_only().delayed_work_queue->InsertFence(current_fence);
933
934 {
935 base::internal::CheckedAutoLock lock(any_thread_lock_);
936 if (!front_task_unblocked && previous_fence &&
937 previous_fence->task_order() < current_fence.task_order()) {
938 if (!any_thread_.immediate_incoming_queue.empty() &&
939 any_thread_.immediate_incoming_queue.front().task_order() >
940 previous_fence->task_order() &&
941 any_thread_.immediate_incoming_queue.front().task_order() <
942 current_fence.task_order()) {
943 front_task_unblocked = true;
944 }
945 }
946
947 UpdateCrossThreadQueueStateLocked();
948 }
949
950 if (IsQueueEnabled() && front_task_unblocked) {
951 OnQueueUnblocked();
952 sequence_manager_->ScheduleWork();
953 }
954 }
955
InsertFenceAt(TimeTicks time)956 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
957 DCHECK(delayed_fence_allowed_)
958 << "Delayed fences are not supported for this queue. Enable them "
959 "explicitly in TaskQueue::Spec when creating the queue";
960
961 // Task queue can have only one fence, delayed or not.
962 RemoveFence();
963 main_thread_only().delayed_fence = time;
964 }
965
RemoveFence()966 void TaskQueueImpl::RemoveFence() {
967 absl::optional<Fence> previous_fence = main_thread_only().current_fence;
968 main_thread_only().current_fence = absl::nullopt;
969 main_thread_only().delayed_fence = absl::nullopt;
970
971 bool front_task_unblocked =
972 main_thread_only().immediate_work_queue->RemoveFence();
973 front_task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
974
975 {
976 base::internal::CheckedAutoLock lock(any_thread_lock_);
977 if (!front_task_unblocked && previous_fence) {
978 if (!any_thread_.immediate_incoming_queue.empty() &&
979 any_thread_.immediate_incoming_queue.front().task_order() >
980 previous_fence->task_order()) {
981 front_task_unblocked = true;
982 }
983 }
984
985 UpdateCrossThreadQueueStateLocked();
986 }
987
988 if (IsQueueEnabled() && front_task_unblocked) {
989 OnQueueUnblocked();
990 sequence_manager_->ScheduleWork();
991 }
992 }
993
BlockedByFence() const994 bool TaskQueueImpl::BlockedByFence() const {
995 if (!main_thread_only().current_fence)
996 return false;
997
998 if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
999 !main_thread_only().delayed_work_queue->BlockedByFence()) {
1000 return false;
1001 }
1002
1003 base::internal::CheckedAutoLock lock(any_thread_lock_);
1004 if (any_thread_.immediate_incoming_queue.empty())
1005 return true;
1006
1007 return any_thread_.immediate_incoming_queue.front().task_order() >
1008 main_thread_only().current_fence->task_order();
1009 }
1010
HasActiveFence()1011 bool TaskQueueImpl::HasActiveFence() {
1012 if (main_thread_only().delayed_fence &&
1013 sequence_manager_->main_thread_clock()->NowTicks() >
1014 main_thread_only().delayed_fence.value()) {
1015 return true;
1016 }
1017 return !!main_thread_only().current_fence;
1018 }
1019
CouldTaskRun(EnqueueOrder enqueue_order) const1020 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
1021 if (!IsQueueEnabled())
1022 return false;
1023
1024 if (!main_thread_only().current_fence)
1025 return true;
1026
1027 // TODO(crbug.com/1249857): This should use TaskOrder. This is currently only
1028 // used for tests and is fine as-is, but we should be using `TaskOrder` for
1029 // task comparisons. Also this test should be renamed with a testing suffix as
1030 // it is not used in production.
1031 return enqueue_order <
1032 main_thread_only().current_fence->task_order().enqueue_order();
1033 }
1034
WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const1035 bool TaskQueueImpl::WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const {
1036 return enqueue_order <
1037 main_thread_only()
1038 .enqueue_order_at_which_we_became_unblocked_with_normal_priority;
1039 }
1040
1041 // static
QueueAsValue(const TaskDeque & queue,TimeTicks now)1042 Value::List TaskQueueImpl::QueueAsValue(const TaskDeque& queue, TimeTicks now) {
1043 Value::List state;
1044 for (const Task& task : queue)
1045 state.Append(TaskAsValue(task, now));
1046 return state;
1047 }
1048
1049 // static
TaskAsValue(const Task & task,TimeTicks now)1050 Value::Dict TaskQueueImpl::TaskAsValue(const Task& task, TimeTicks now) {
1051 Value::Dict state;
1052 state.Set("posted_from", task.posted_from.ToString());
1053 if (task.enqueue_order_set())
1054 state.Set("enqueue_order", static_cast<int>(task.enqueue_order()));
1055 state.Set("sequence_num", task.sequence_num);
1056 state.Set("nestable", task.nestable == Nestable::kNestable);
1057 state.Set("is_high_res", task.is_high_res);
1058 state.Set("is_cancelled", task.task.IsCancelled());
1059 state.Set("delayed_run_time",
1060 (task.delayed_run_time - TimeTicks()).InMillisecondsF());
1061 const TimeDelta delayed_run_time_milliseconds_from_now =
1062 task.delayed_run_time.is_null() ? TimeDelta()
1063 : (task.delayed_run_time - now);
1064 state.Set("delayed_run_time_milliseconds_from_now",
1065 delayed_run_time_milliseconds_from_now.InMillisecondsF());
1066 return state;
1067 }
1068
MakeDelayedTask(PostedTask delayed_task,LazyNow * lazy_now) const1069 Task TaskQueueImpl::MakeDelayedTask(PostedTask delayed_task,
1070 LazyNow* lazy_now) const {
1071 EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
1072 base::TimeDelta delay;
1073 WakeUpResolution resolution = WakeUpResolution::kLow;
1074 #if BUILDFLAG(IS_WIN)
1075 const bool explicit_high_resolution_timer_win =
1076 g_explicit_high_resolution_timer_win.load(std::memory_order_relaxed);
1077 #endif // BUILDFLAG(IS_WIN)
1078 if (absl::holds_alternative<base::TimeDelta>(
1079 delayed_task.delay_or_delayed_run_time)) {
1080 delay = absl::get<base::TimeDelta>(delayed_task.delay_or_delayed_run_time);
1081 delayed_task.delay_or_delayed_run_time = lazy_now->Now() + delay;
1082 }
1083 #if BUILDFLAG(IS_WIN)
1084 else if (!explicit_high_resolution_timer_win) {
1085 delay = absl::get<base::TimeTicks>(delayed_task.delay_or_delayed_run_time) -
1086 lazy_now->Now();
1087 }
1088 if (!explicit_high_resolution_timer_win &&
1089 delay < (2 * base::Milliseconds(Time::kMinLowResolutionThresholdMs))) {
1090 // Outside the kExplicitHighResolutionTimerWin experiment, We consider the
1091 // task needs a high resolution timer if the delay is more than 0 and less
1092 // than 32ms. This caps the relative error to less than 50% : a 33ms wait
1093 // can wake at 48ms since the default resolution on Windows is between 10
1094 // and 15ms.
1095 resolution = WakeUpResolution::kHigh;
1096 }
1097 #endif // BUILDFLAG(IS_WIN)
1098 // leeway isn't specified yet since this may be called from any thread.
1099 return Task(std::move(delayed_task), sequence_number, EnqueueOrder(),
1100 lazy_now->Now(), resolution);
1101 }
1102
IsQueueEnabled() const1103 bool TaskQueueImpl::IsQueueEnabled() const {
1104 return main_thread_only().is_enabled;
1105 }
1106
SetQueueEnabled(bool enabled)1107 void TaskQueueImpl::SetQueueEnabled(bool enabled) {
1108 if (main_thread_only().is_enabled == enabled)
1109 return;
1110
1111 // Update the |main_thread_only_| struct.
1112 main_thread_only().is_enabled = enabled;
1113 main_thread_only().disabled_time = absl::nullopt;
1114
1115 // |sequence_manager_| can be null in tests.
1116 if (!sequence_manager_)
1117 return;
1118
1119 LazyNow lazy_now(sequence_manager_->main_thread_clock());
1120
1121 if (!enabled) {
1122 bool tracing_enabled = false;
1123 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1124 &tracing_enabled);
1125 main_thread_only().disabled_time = lazy_now.Now();
1126 } else {
1127 // Override reporting if the queue is becoming enabled again.
1128 main_thread_only().should_report_posted_tasks_when_disabled = false;
1129 }
1130
1131 // If there is a throttler, it will be notified of pending delayed and
1132 // immediate tasks inside UpdateWakeUp().
1133 UpdateWakeUp(&lazy_now);
1134
1135 {
1136 base::internal::CheckedAutoLock lock(any_thread_lock_);
1137 UpdateCrossThreadQueueStateLocked();
1138
1139 // Copy over the task-reporting related state.
1140 any_thread_.tracing_only.is_enabled = enabled;
1141 any_thread_.tracing_only.disabled_time = main_thread_only().disabled_time;
1142 any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1143 main_thread_only().should_report_posted_tasks_when_disabled;
1144 }
1145
1146 // Finally, enable or disable the queue with the selector.
1147 if (enabled) {
1148 // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
1149 // a DoWork if needed.
1150 sequence_manager_->main_thread_only().selector.EnableQueue(this);
1151
1152 if (!BlockedByFence())
1153 OnQueueUnblocked();
1154 } else {
1155 sequence_manager_->main_thread_only().selector.DisableQueue(this);
1156 }
1157 }
1158
SetShouldReportPostedTasksWhenDisabled(bool should_report)1159 void TaskQueueImpl::SetShouldReportPostedTasksWhenDisabled(bool should_report) {
1160 if (main_thread_only().should_report_posted_tasks_when_disabled ==
1161 should_report)
1162 return;
1163
1164 // Only observe transitions turning the reporting on if tracing is enabled.
1165 if (should_report) {
1166 bool tracing_enabled = false;
1167 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1168 &tracing_enabled);
1169 if (!tracing_enabled)
1170 return;
1171 }
1172
1173 main_thread_only().should_report_posted_tasks_when_disabled = should_report;
1174
1175 // Mirror the state to the AnyThread struct as well.
1176 {
1177 base::internal::CheckedAutoLock lock(any_thread_lock_);
1178 any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1179 should_report;
1180 }
1181 }
1182
UpdateCrossThreadQueueStateLocked()1183 void TaskQueueImpl::UpdateCrossThreadQueueStateLocked() {
1184 any_thread_.immediate_work_queue_empty =
1185 main_thread_only().immediate_work_queue->Empty();
1186
1187 if (main_thread_only().throttler) {
1188 // If there's a Throttler, always ScheduleWork() when immediate work is
1189 // posted and the queue is enabled, to ensure that
1190 // Throttler::OnHasImmediateTask() is invoked.
1191 any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled();
1192 } else {
1193 // Otherwise, ScheduleWork() only if the queue is enabled and there isn't a
1194 // fence to prevent the task from being executed.
1195 any_thread_.post_immediate_task_should_schedule_work =
1196 IsQueueEnabled() && !main_thread_only().current_fence;
1197 }
1198
1199 #if DCHECK_IS_ON()
1200 any_thread_.queue_set_index =
1201 main_thread_only().immediate_work_queue->work_queue_set_index();
1202 #endif
1203 }
1204
ReclaimMemory(TimeTicks now)1205 void TaskQueueImpl::ReclaimMemory(TimeTicks now) {
1206 if (main_thread_only().delayed_incoming_queue.empty())
1207 return;
1208
1209 if (g_is_sweep_cancelled_tasks_enabled) {
1210 main_thread_only().delayed_incoming_queue.SweepCancelledTasks(
1211 sequence_manager_);
1212
1213 // If deleting one of the cancelled tasks shut down this queue, bail out.
1214 // Note that in this scenario |this| is still valid, but some fields of the
1215 // queue have been cleared out by |UnregisterTaskQueue|.
1216 if (!main_thread_only().delayed_work_queue)
1217 return;
1218
1219 LazyNow lazy_now(now);
1220 UpdateWakeUp(&lazy_now);
1221 }
1222
1223 // Also consider shrinking the work queue if it's wasting memory.
1224 main_thread_only().delayed_work_queue->MaybeShrinkQueue();
1225 main_thread_only().immediate_work_queue->MaybeShrinkQueue();
1226
1227 {
1228 base::internal::CheckedAutoLock lock(any_thread_lock_);
1229 any_thread_.immediate_incoming_queue.MaybeShrinkQueue();
1230 }
1231 }
1232
PushImmediateIncomingTaskForTest(Task task)1233 void TaskQueueImpl::PushImmediateIncomingTaskForTest(Task task) {
1234 base::internal::CheckedAutoLock lock(any_thread_lock_);
1235 any_thread_.immediate_incoming_queue.push_back(std::move(task));
1236 }
1237
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)1238 void TaskQueueImpl::RequeueDeferredNonNestableTask(
1239 DeferredNonNestableTask task) {
1240 DCHECK(task.task.nestable == Nestable::kNonNestable);
1241
1242 // It's possible that the queue was unregistered since the task was posted.
1243 // Skip the task in that case.
1244 if (!main_thread_only().delayed_work_queue)
1245 return;
1246
1247 // The re-queued tasks have to be pushed onto the front because we'd otherwise
1248 // violate the strict monotonically increasing enqueue order within the
1249 // WorkQueue. We can't assign them a new enqueue order here because that will
1250 // not behave correctly with fences and things will break (e.g Idle TQ).
1251 if (task.work_queue_type == WorkQueueType::kDelayed) {
1252 main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
1253 std::move(task.task));
1254 } else {
1255 // We're about to push |task| onto an empty |immediate_work_queue|
1256 // (bypassing |immediate_incoming_queue_|). As such, we no longer need to
1257 // reload if we were planning to. The flag must be cleared while holding
1258 // the lock to avoid a cross-thread post task setting it again before
1259 // we actually make |immediate_work_queue| non-empty.
1260 if (main_thread_only().immediate_work_queue->Empty()) {
1261 base::internal::CheckedAutoLock lock(any_thread_lock_);
1262 empty_queues_to_reload_handle_.SetActive(false);
1263
1264 any_thread_.immediate_work_queue_empty = false;
1265 main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1266 std::move(task.task));
1267
1268 } else {
1269 main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1270 std::move(task.task));
1271 }
1272 }
1273 }
1274
SetThrottler(TaskQueue::Throttler * throttler)1275 void TaskQueueImpl::SetThrottler(TaskQueue::Throttler* throttler) {
1276 DCHECK(throttler);
1277 DCHECK(!main_thread_only().throttler)
1278 << "Can't assign two different throttlers to "
1279 "base::sequence_manager:TaskQueue";
1280
1281 main_thread_only().throttler = throttler;
1282 }
1283
ResetThrottler()1284 void TaskQueueImpl::ResetThrottler() {
1285 main_thread_only().throttler = nullptr;
1286 LazyNow lazy_now(sequence_manager_->main_thread_clock());
1287 // The current delayed wake up may have been determined by the Throttler.
1288 // Update it now that there is no Throttler.
1289 UpdateWakeUp(&lazy_now);
1290 }
1291
UpdateWakeUp(LazyNow * lazy_now)1292 void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) {
1293 absl::optional<WakeUp> wake_up = GetNextDesiredWakeUp();
1294 if (main_thread_only().throttler && IsQueueEnabled()) {
1295 // GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is
1296 // nullopt, e.g. to throttle immediate tasks.
1297 wake_up = main_thread_only().throttler->GetNextAllowedWakeUp(
1298 lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask());
1299 }
1300 SetNextWakeUp(lazy_now, wake_up);
1301 }
1302
SetNextWakeUp(LazyNow * lazy_now,absl::optional<WakeUp> wake_up)1303 void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now,
1304 absl::optional<WakeUp> wake_up) {
1305 if (main_thread_only().scheduled_wake_up == wake_up)
1306 return;
1307 main_thread_only().scheduled_wake_up = wake_up;
1308 main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now,
1309 wake_up);
1310 }
1311
HasTaskToRunImmediately() const1312 bool TaskQueueImpl::HasTaskToRunImmediately() const {
1313 // Any work queue tasks count as immediate work.
1314 if (!main_thread_only().delayed_work_queue->Empty() ||
1315 !main_thread_only().immediate_work_queue->Empty()) {
1316 return true;
1317 }
1318
1319 // Finally tasks on |immediate_incoming_queue| count as immediate work.
1320 base::internal::CheckedAutoLock lock(any_thread_lock_);
1321 return !any_thread_.immediate_incoming_queue.empty();
1322 }
1323
HasTaskToRunImmediatelyLocked() const1324 bool TaskQueueImpl::HasTaskToRunImmediatelyLocked() const {
1325 return !main_thread_only().delayed_work_queue->Empty() ||
1326 !main_thread_only().immediate_work_queue->Empty() ||
1327 !any_thread_.immediate_incoming_queue.empty();
1328 }
1329
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)1330 void TaskQueueImpl::SetOnTaskStartedHandler(
1331 TaskQueueImpl::OnTaskStartedHandler handler) {
1332 DCHECK(should_notify_observers_ || handler.is_null());
1333 main_thread_only().on_task_started_handler = std::move(handler);
1334 }
1335
OnTaskStarted(const Task & task,const TaskQueue::TaskTiming & task_timing)1336 void TaskQueueImpl::OnTaskStarted(const Task& task,
1337 const TaskQueue::TaskTiming& task_timing) {
1338 if (!main_thread_only().on_task_started_handler.is_null())
1339 main_thread_only().on_task_started_handler.Run(task, task_timing);
1340 }
1341
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)1342 void TaskQueueImpl::SetOnTaskCompletedHandler(
1343 TaskQueueImpl::OnTaskCompletedHandler handler) {
1344 DCHECK(should_notify_observers_ || handler.is_null());
1345 main_thread_only().on_task_completed_handler = std::move(handler);
1346 }
1347
OnTaskCompleted(const Task & task,TaskQueue::TaskTiming * task_timing,LazyNow * lazy_now)1348 void TaskQueueImpl::OnTaskCompleted(const Task& task,
1349 TaskQueue::TaskTiming* task_timing,
1350 LazyNow* lazy_now) {
1351 if (!main_thread_only().on_task_completed_handler.is_null()) {
1352 main_thread_only().on_task_completed_handler.Run(task, task_timing,
1353 lazy_now);
1354 }
1355 }
1356
RequiresTaskTiming() const1357 bool TaskQueueImpl::RequiresTaskTiming() const {
1358 return !main_thread_only().on_task_started_handler.is_null() ||
1359 !main_thread_only().on_task_completed_handler.is_null();
1360 }
1361
1362 std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler)1363 TaskQueueImpl::AddOnTaskPostedHandler(OnTaskPostedHandler handler) {
1364 DCHECK(should_notify_observers_ && !handler.is_null());
1365 std::unique_ptr<OnTaskPostedCallbackHandleImpl> handle =
1366 std::make_unique<OnTaskPostedCallbackHandleImpl>(this,
1367 associated_thread_);
1368 base::internal::CheckedAutoLock lock(any_thread_lock_);
1369 any_thread_.on_task_posted_handlers.insert(
1370 {handle.get(), std::move(handler)});
1371 return handle;
1372 }
1373
RemoveOnTaskPostedHandler(TaskQueueImpl::OnTaskPostedCallbackHandleImpl * on_task_posted_callback_handle)1374 void TaskQueueImpl::RemoveOnTaskPostedHandler(
1375 TaskQueueImpl::OnTaskPostedCallbackHandleImpl*
1376 on_task_posted_callback_handle) {
1377 base::internal::CheckedAutoLock lock(any_thread_lock_);
1378 any_thread_.on_task_posted_handlers.erase(on_task_posted_callback_handle);
1379 }
1380
SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger)1381 void TaskQueueImpl::SetTaskExecutionTraceLogger(
1382 TaskExecutionTraceLogger logger) {
1383 DCHECK(should_notify_observers_ || logger.is_null());
1384 main_thread_only().task_execution_trace_logger = std::move(logger);
1385 }
1386
IsUnregistered() const1387 bool TaskQueueImpl::IsUnregistered() const {
1388 base::internal::CheckedAutoLock lock(any_thread_lock_);
1389 return any_thread_.unregistered;
1390 }
1391
GetSequenceManagerWeakPtr()1392 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
1393 return sequence_manager_->GetWeakPtr();
1394 }
1395
ActivateDelayedFenceIfNeeded(const Task & task)1396 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) {
1397 if (!main_thread_only().delayed_fence)
1398 return;
1399 if (main_thread_only().delayed_fence.value() > task.delayed_run_time)
1400 return;
1401 InsertFence(Fence(task.task_order()));
1402 main_thread_only().delayed_fence = absl::nullopt;
1403 }
1404
MaybeReportIpcTaskQueuedFromMainThread(const Task & pending_task)1405 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread(
1406 const Task& pending_task) {
1407 if (!pending_task.ipc_hash)
1408 return;
1409
1410 // It's possible that tracing was just enabled and no disabled time has been
1411 // stored. In that case, skip emitting the event.
1412 if (!main_thread_only().disabled_time)
1413 return;
1414
1415 bool tracing_enabled = false;
1416 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1417 &tracing_enabled);
1418 if (!tracing_enabled)
1419 return;
1420
1421 if (main_thread_only().is_enabled ||
1422 !main_thread_only().should_report_posted_tasks_when_disabled) {
1423 return;
1424 }
1425
1426 base::TimeDelta time_since_disabled =
1427 sequence_manager_->main_thread_clock()->NowTicks() -
1428 main_thread_only().disabled_time.value();
1429
1430 ReportIpcTaskQueued(pending_task, time_since_disabled);
1431 }
1432
ShouldReportIpcTaskQueuedFromAnyThreadLocked(base::TimeDelta * time_since_disabled)1433 bool TaskQueueImpl::ShouldReportIpcTaskQueuedFromAnyThreadLocked(
1434 base::TimeDelta* time_since_disabled) {
1435 // It's possible that tracing was just enabled and no disabled time has been
1436 // stored. In that case, skip emitting the event.
1437 if (!any_thread_.tracing_only.disabled_time)
1438 return false;
1439
1440 if (any_thread_.tracing_only.is_enabled ||
1441 any_thread_.tracing_only.should_report_posted_tasks_when_disabled) {
1442 return false;
1443 }
1444
1445 *time_since_disabled = sequence_manager_->any_thread_clock()->NowTicks() -
1446 any_thread_.tracing_only.disabled_time.value();
1447 return true;
1448 }
1449
MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task & pending_task)1450 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadLocked(
1451 const Task& pending_task) {
1452 if (!pending_task.ipc_hash)
1453 return;
1454
1455 bool tracing_enabled = false;
1456 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1457 &tracing_enabled);
1458 if (!tracing_enabled)
1459 return;
1460
1461 base::TimeDelta time_since_disabled;
1462 if (ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled))
1463 ReportIpcTaskQueued(pending_task, time_since_disabled);
1464 }
1465
MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task & pending_task)1466 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(
1467 const Task& pending_task) {
1468 if (!pending_task.ipc_hash)
1469 return;
1470
1471 bool tracing_enabled = false;
1472 TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1473 &tracing_enabled);
1474 if (!tracing_enabled)
1475 return;
1476
1477 base::TimeDelta time_since_disabled;
1478 bool should_report = false;
1479 {
1480 base::internal::CheckedAutoLock lock(any_thread_lock_);
1481 should_report =
1482 ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled);
1483 }
1484
1485 if (should_report)
1486 ReportIpcTaskQueued(pending_task, time_since_disabled);
1487 }
1488
ReportIpcTaskQueued(const Task & pending_task,const base::TimeDelta & time_since_disabled)1489 void TaskQueueImpl::ReportIpcTaskQueued(
1490 const Task& pending_task,
1491 const base::TimeDelta& time_since_disabled) {
1492 TRACE_EVENT_INSTANT(
1493 TRACE_DISABLED_BY_DEFAULT("lifecycles"), "task_posted_to_disabled_queue",
1494 [&](perfetto::EventContext ctx) {
1495 auto* proto = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
1496 ->set_chrome_task_posted_to_disabled_queue();
1497 proto->set_time_since_disabled_ms(
1498 checked_cast<uint64_t>(time_since_disabled.InMilliseconds()));
1499 proto->set_ipc_hash(pending_task.ipc_hash);
1500 proto->set_source_location_iid(
1501 base::trace_event::InternedSourceLocation::Get(
1502 &ctx, pending_task.posted_from));
1503 });
1504 }
1505
OnQueueUnblocked()1506 void TaskQueueImpl::OnQueueUnblocked() {
1507 DCHECK(IsQueueEnabled());
1508 DCHECK(!BlockedByFence());
1509
1510 main_thread_only().enqueue_order_at_which_we_became_unblocked =
1511 sequence_manager_->GetNextSequenceNumber();
1512 if (GetQueuePriority() <= DefaultPriority()) {
1513 // We are default priority or more important so update
1514 // |enqueue_order_at_which_we_became_unblocked_with_normal_priority|.
1515 main_thread_only()
1516 .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
1517 main_thread_only().enqueue_order_at_which_we_became_unblocked;
1518 }
1519 }
1520
1521 TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default;
1522 TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default;
1523
push(Task task)1524 void TaskQueueImpl::DelayedIncomingQueue::push(Task task) {
1525 // TODO(crbug.com/1247285): Remove this once the cause of corrupted tasks in
1526 // the queue is understood.
1527 CHECK(task.task);
1528 if (task.is_high_res)
1529 pending_high_res_tasks_++;
1530 queue_.insert(std::move(task));
1531 }
1532
remove(HeapHandle heap_handle)1533 void TaskQueueImpl::DelayedIncomingQueue::remove(HeapHandle heap_handle) {
1534 DCHECK(!empty());
1535 DCHECK_LT(heap_handle.index(), queue_.size());
1536 Task task = queue_.take(heap_handle);
1537 if (task.is_high_res) {
1538 pending_high_res_tasks_--;
1539 DCHECK_GE(pending_high_res_tasks_, 0);
1540 }
1541 }
1542
take_top()1543 Task TaskQueueImpl::DelayedIncomingQueue::take_top() {
1544 DCHECK(!empty());
1545 if (queue_.top().is_high_res) {
1546 pending_high_res_tasks_--;
1547 DCHECK_GE(pending_high_res_tasks_, 0);
1548 }
1549 return queue_.take_top();
1550 }
1551
swap(DelayedIncomingQueue * rhs)1552 void TaskQueueImpl::DelayedIncomingQueue::swap(DelayedIncomingQueue* rhs) {
1553 std::swap(pending_high_res_tasks_, rhs->pending_high_res_tasks_);
1554 std::swap(queue_, rhs->queue_);
1555 }
1556
SweepCancelledTasks(SequenceManagerImpl * sequence_manager)1557 void TaskQueueImpl::DelayedIncomingQueue::SweepCancelledTasks(
1558 SequenceManagerImpl* sequence_manager) {
1559 // Note: IntrusiveHeap::EraseIf() is safe against re-entrancy caused by
1560 // deleted tasks posting new tasks.
1561 queue_.EraseIf([this](const Task& task) {
1562 if (task.task.IsCancelled()) {
1563 if (task.is_high_res) {
1564 --pending_high_res_tasks_;
1565 DCHECK_GE(pending_high_res_tasks_, 0);
1566 }
1567 return true;
1568 }
1569 return false;
1570 });
1571 }
1572
AsValue(TimeTicks now) const1573 Value::List TaskQueueImpl::DelayedIncomingQueue::AsValue(TimeTicks now) const {
1574 Value::List state;
1575 for (const Task& task : queue_)
1576 state.Append(TaskAsValue(task, now));
1577 return state;
1578 }
1579
operator ()(const Task & lhs,const Task & rhs) const1580 bool TaskQueueImpl::DelayedIncomingQueue::Compare::operator()(
1581 const Task& lhs,
1582 const Task& rhs) const {
1583 // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
1584 // not be the first task eligible to run, but tasks will always become ripe
1585 // before their latest_delayed_run_time().
1586 const TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
1587 const TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
1588 if (lhs_latest_delayed_run_time == rhs_latest_delayed_run_time)
1589 return lhs.sequence_num > rhs.sequence_num;
1590 return lhs_latest_delayed_run_time > rhs_latest_delayed_run_time;
1591 }
1592
OnTaskPostedCallbackHandleImpl(TaskQueueImpl * task_queue_impl,scoped_refptr<const AssociatedThreadId> associated_thread)1593 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::OnTaskPostedCallbackHandleImpl(
1594 TaskQueueImpl* task_queue_impl,
1595 scoped_refptr<const AssociatedThreadId> associated_thread)
1596 : task_queue_impl_(task_queue_impl),
1597 associated_thread_(std::move(associated_thread)) {
1598 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1599 }
1600
1601 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::
~OnTaskPostedCallbackHandleImpl()1602 ~OnTaskPostedCallbackHandleImpl() {
1603 DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1604 if (task_queue_impl_)
1605 task_queue_impl_->RemoveOnTaskPostedHandler(this);
1606 }
1607
DefaultPriority() const1608 TaskQueue::QueuePriority TaskQueueImpl::DefaultPriority() const {
1609 return sequence_manager()->settings().priority_settings.default_priority();
1610 }
1611
1612 } // namespace internal
1613 } // namespace sequence_manager
1614 } // namespace base
1615