• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/task_queue_impl.h"
6 
7 #include <inttypes.h>
8 
9 #include <memory>
10 #include <optional>
11 #include <utility>
12 
13 #include "base/check.h"
14 #include "base/compiler_specific.h"
15 #include "base/feature_list.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/metrics/histogram_macros.h"
19 #include "base/notreached.h"
20 #include "base/observer_list.h"
21 #include "base/ranges/algorithm.h"
22 #include "base/sequence_token.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/task/common/scoped_defer_task_posting.h"
25 #include "base/task/default_delayed_task_handle_delegate.h"
26 #include "base/task/sequence_manager/associated_thread_id.h"
27 #include "base/task/sequence_manager/delayed_task_handle_delegate.h"
28 #include "base/task/sequence_manager/fence.h"
29 #include "base/task/sequence_manager/sequence_manager_impl.h"
30 #include "base/task/sequence_manager/task_order.h"
31 #include "base/task/sequence_manager/wake_up_queue.h"
32 #include "base/task/sequence_manager/work_queue.h"
33 #include "base/task/single_thread_task_runner.h"
34 #include "base/task/task_features.h"
35 #include "base/task/task_observer.h"
36 #include "base/threading/thread_restrictions.h"
37 #include "base/time/time.h"
38 #include "base/trace_event/base_tracing.h"
39 #include "build/build_config.h"
40 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
41 
42 namespace base {
43 namespace sequence_manager {
44 
45 namespace internal {
46 
47 // This class outside the anonymous namespace exists to allow being a friend of
48 // `SingleThreadTaskRunner::CurrentDefaultHandle` in order to access
49 // `SingleThreadTaskRunner::CurrentDefaultHandle::MayAlreadyExist`.
50 class CurrentDefaultHandleOverrideForRunOrPostTask {
51  public:
CurrentDefaultHandleOverrideForRunOrPostTask(scoped_refptr<SequencedTaskRunner> task_runner)52   explicit CurrentDefaultHandleOverrideForRunOrPostTask(
53       scoped_refptr<SequencedTaskRunner> task_runner)
54       : sttr_override_(
55             nullptr,
56             SingleThreadTaskRunner::CurrentDefaultHandle::MayAlreadyExist{}),
57         str_override_(std::move(task_runner)) {}
58 
59  private:
60   SingleThreadTaskRunner::CurrentDefaultHandle sttr_override_;
61   SequencedTaskRunner::CurrentDefaultHandle str_override_;
62 };
63 
64 namespace {
65 
66 // An atomic is used here because the value is queried from other threads when
67 // tasks are posted cross-thread, which can race with its initialization.
68 std::atomic<base::TimeDelta> g_max_precise_delay{kDefaultMaxPreciseDelay};
69 #if BUILDFLAG(IS_WIN)
70 // An atomic is used here because the flag is queried from other threads when
71 // tasks are posted cross-thread, which can race with its initialization.
72 std::atomic_bool g_explicit_high_resolution_timer_win{true};
73 #endif  // BUILDFLAG(IS_WIN)
74 
RunTaskSynchronously(AssociatedThreadId * associated_thread,scoped_refptr<SingleThreadTaskRunner> task_runner,OnceClosure closure)75 void RunTaskSynchronously(AssociatedThreadId* associated_thread,
76                           scoped_refptr<SingleThreadTaskRunner> task_runner,
77                           OnceClosure closure) {
78   base::internal::TaskScope sequence_scope(
79       associated_thread->GetBoundSequenceToken(),
80       /* is_thread_bound=*/false,
81       /* is_running_synchronously=*/true);
82   CurrentDefaultHandleOverrideForRunOrPostTask task_runner_override(
83       std::move(task_runner));
84   associated_thread->StartInSequenceWithCurrentThread();
85   std::move(closure).Run();
86   associated_thread->StopInSequenceWithCurrentThread();
87 }
88 
89 }  // namespace
90 
GuardedTaskPoster(TaskQueueImpl * outer)91 TaskQueueImpl::GuardedTaskPoster::GuardedTaskPoster(TaskQueueImpl* outer)
92     : outer_(outer) {}
93 
94 TaskQueueImpl::GuardedTaskPoster::~GuardedTaskPoster() = default;
95 
PostTask(PostedTask task)96 bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) {
97   // Do not process new PostTasks while we are handling a PostTask (tracing
98   // has to do this) as it can lead to a deadlock and defer it instead.
99   ScopedDeferTaskPosting disallow_task_posting;
100 
101   auto token = operations_controller_.TryBeginOperation();
102   if (!token)
103     return false;
104 
105   outer_->PostTask(std::move(task));
106   return true;
107 }
108 
PostCancelableTask(PostedTask task)109 DelayedTaskHandle TaskQueueImpl::GuardedTaskPoster::PostCancelableTask(
110     PostedTask task) {
111   // Do not process new PostTasks while we are handling a PostTask (tracing
112   // has to do this) as it can lead to a deadlock and defer it instead.
113   ScopedDeferTaskPosting disallow_task_posting;
114 
115   auto token = operations_controller_.TryBeginOperation();
116   if (!token)
117     return DelayedTaskHandle();
118 
119   auto delayed_task_handle_delegate =
120       std::make_unique<DelayedTaskHandleDelegate>(outer_);
121   task.delayed_task_handle_delegate = delayed_task_handle_delegate->AsWeakPtr();
122 
123   outer_->PostTask(std::move(task));
124   DCHECK(delayed_task_handle_delegate->IsValid());
125   return DelayedTaskHandle(std::move(delayed_task_handle_delegate));
126 }
127 
RunOrPostTask(PostedTask task)128 bool TaskQueueImpl::GuardedTaskPoster::RunOrPostTask(PostedTask task) {
129   auto token = operations_controller_.TryBeginOperation();
130   if (!token) {
131     return false;
132   }
133 
134   auto sync_work_auth =
135       outer_->sequence_manager_->TryAcquireSyncWorkAuthorization();
136   // The queue may be disabled immediately after checking
137   // `IsQueueEnabledFromAnyThread()`. That won't prevent the task from running.
138   if (sync_work_auth.IsValid() && outer_->IsQueueEnabledFromAnyThread()) {
139     RunTaskSynchronously(outer_->associated_thread_.get(),
140                          outer_->sequence_manager_->GetTaskRunner(),
141                          std::move(task.callback));
142     return true;
143   }
144 
145   return PostTask(std::move(task));
146 }
147 
TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,scoped_refptr<AssociatedThreadId> associated_thread,TaskType task_type)148 TaskQueueImpl::TaskRunner::TaskRunner(
149     scoped_refptr<GuardedTaskPoster> task_poster,
150     scoped_refptr<AssociatedThreadId> associated_thread,
151     TaskType task_type)
152     : task_poster_(std::move(task_poster)),
153       associated_thread_(std::move(associated_thread)),
154       task_type_(task_type) {}
155 
156 TaskQueueImpl::TaskRunner::~TaskRunner() = default;
157 
PostDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)158 bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,
159                                                 OnceClosure callback,
160                                                 TimeDelta delay) {
161   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
162                                            delay, Nestable::kNestable,
163                                            task_type_));
164 }
165 
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)166 bool TaskQueueImpl::TaskRunner::PostDelayedTaskAt(
167     subtle::PostDelayedTaskPassKey,
168     const Location& location,
169     OnceClosure callback,
170     TimeTicks delayed_run_time,
171     base::subtle::DelayPolicy delay_policy) {
172   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
173                                            delayed_run_time, delay_policy,
174                                            Nestable::kNestable, task_type_));
175 }
176 
PostCancelableDelayedTaskAt(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)177 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTaskAt(
178     subtle::PostDelayedTaskPassKey pass_key,
179     const Location& location,
180     OnceClosure callback,
181     TimeTicks delayed_run_time,
182     base::subtle::DelayPolicy delay_policy) {
183   return task_poster_->PostCancelableTask(
184       PostedTask(this, std::move(callback), location, delayed_run_time,
185                  delay_policy, Nestable::kNestable, task_type_));
186 }
187 
PostCancelableDelayedTask(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeDelta delay)188 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTask(
189     subtle::PostDelayedTaskPassKey pass_key,
190     const Location& location,
191     OnceClosure callback,
192     TimeDelta delay) {
193   return task_poster_->PostCancelableTask(
194       PostedTask(this, std::move(callback), location, delay,
195                  Nestable::kNestable, task_type_));
196 }
197 
PostNonNestableDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)198 bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask(
199     const Location& location,
200     OnceClosure callback,
201     TimeDelta delay) {
202   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
203                                            delay, Nestable::kNonNestable,
204                                            task_type_));
205 }
206 
RunOrPostTask(subtle::RunOrPostTaskPassKey,const Location & location,OnceClosure callback)207 bool TaskQueueImpl::TaskRunner::RunOrPostTask(subtle::RunOrPostTaskPassKey,
208                                               const Location& location,
209                                               OnceClosure callback) {
210   return task_poster_->RunOrPostTask(
211       PostedTask(this, std::move(callback), location, TimeDelta(),
212                  Nestable::kNestable, task_type_));
213 }
214 
BelongsToCurrentThread() const215 bool TaskQueueImpl::TaskRunner::BelongsToCurrentThread() const {
216   return associated_thread_->IsBoundToCurrentThread();
217 }
218 
RunsTasksInCurrentSequence() const219 bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const {
220   // Return true on the bound thread. This works even after `thread_local`
221   // destruction.
222   if (BelongsToCurrentThread()) {
223     return true;
224   }
225 
226   // Return true in a `RunOrPostTask` callback running synchronously on a
227   // different thread.
228   if (associated_thread_->IsBound() &&
229       associated_thread_->GetBoundSequenceToken() ==
230           base::internal::SequenceToken::GetForCurrentThread()) {
231     return true;
232   }
233 
234   return false;
235 }
236 
237 // static
InitializeFeatures()238 void TaskQueueImpl::InitializeFeatures() {
239   g_max_precise_delay = kMaxPreciseDelay.Get();
240 #if BUILDFLAG(IS_WIN)
241   g_explicit_high_resolution_timer_win.store(
242       FeatureList::IsEnabled(kExplicitHighResolutionTimerWin),
243       std::memory_order_relaxed);
244 #endif  // BUILDFLAG(IS_WIN)
245 }
246 
TaskQueueImpl(SequenceManagerImpl * sequence_manager,WakeUpQueue * wake_up_queue,const TaskQueue::Spec & spec)247 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
248                              WakeUpQueue* wake_up_queue,
249                              const TaskQueue::Spec& spec)
250     : name_(spec.name),
251       sequence_manager_(sequence_manager),
252       associated_thread_(sequence_manager
253                              ? sequence_manager->associated_thread()
254                              : AssociatedThreadId::CreateBound()),
255       task_poster_(MakeRefCounted<GuardedTaskPoster>(this)),
256       main_thread_only_(this, wake_up_queue),
257       empty_queues_to_reload_handle_(
258           sequence_manager
259               ? sequence_manager->GetFlagToRequestReloadForEmptyQueue(this)
260               : AtomicFlagSet::AtomicFlag()),
261       should_monitor_quiescence_(spec.should_monitor_quiescence),
262       should_notify_observers_(spec.should_notify_observers),
263       delayed_fence_allowed_(spec.delayed_fence_allowed),
264       default_task_runner_(CreateTaskRunner(kTaskTypeNone)) {
265   UpdateCrossThreadQueueStateLocked();
266   // SequenceManager can't be set later, so we need to prevent task runners
267   // from posting any tasks.
268   if (sequence_manager_)
269     task_poster_->StartAcceptingOperations();
270 }
271 
~TaskQueueImpl()272 TaskQueueImpl::~TaskQueueImpl() {
273 #if DCHECK_IS_ON()
274   base::internal::CheckedAutoLock lock(any_thread_lock_);
275   // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
276   // contains a strong reference to this TaskQueueImpl and the
277   // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
278   // queues.
279   DCHECK(any_thread_.unregistered)
280       << "UnregisterTaskQueue must be called first!";
281 #endif
282 }
283 
284 TaskQueueImpl::AnyThread::AnyThread() = default;
285 TaskQueueImpl::AnyThread::~AnyThread() = default;
286 
287 TaskQueueImpl::AnyThread::TracingOnly::TracingOnly() = default;
288 TaskQueueImpl::AnyThread::TracingOnly::~TracingOnly() = default;
289 
MainThreadOnly(TaskQueueImpl * task_queue,WakeUpQueue * wake_up_queue)290 TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue,
291                                               WakeUpQueue* wake_up_queue)
292     : wake_up_queue(wake_up_queue),
293       delayed_work_queue(
294           new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
295       immediate_work_queue(new WorkQueue(task_queue,
296                                          "immediate",
297                                          WorkQueue::QueueType::kImmediate)) {}
298 
299 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
300 
CreateTaskRunner(TaskType task_type) const301 scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(
302     TaskType task_type) const {
303   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
304   return MakeRefCounted<TaskRunner>(task_poster_, associated_thread_,
305                                     task_type);
306 }
307 
task_runner() const308 const scoped_refptr<SingleThreadTaskRunner>& TaskQueueImpl::task_runner()
309     const {
310   return default_task_runner_;
311 }
312 
UnregisterTaskQueue()313 void TaskQueueImpl::UnregisterTaskQueue() {
314   TRACE_EVENT0("base", "TaskQueueImpl::UnregisterTaskQueue");
315   // Invalidate weak pointers now so no voters reference this in a partially
316   // torn down state.
317   voter_weak_ptr_factory_.InvalidateWeakPtrs();
318   // Detach task runners.
319   {
320     ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
321     task_poster_->ShutdownAndWaitForZeroOperations();
322   }
323 
324   TaskDeque immediate_incoming_queue;
325   base::flat_map<raw_ptr<OnTaskPostedCallbackHandleImpl>, OnTaskPostedHandler>
326       on_task_posted_handlers;
327 
328   {
329     base::internal::CheckedAutoLock lock(any_thread_lock_);
330     any_thread_.unregistered = true;
331     immediate_incoming_queue.swap(any_thread_.immediate_incoming_queue);
332 
333     for (auto& handler : any_thread_.on_task_posted_handlers)
334       handler.first->UnregisterTaskQueue();
335     any_thread_.on_task_posted_handlers.swap(on_task_posted_handlers);
336   }
337 
338   if (main_thread_only().wake_up_queue) {
339     main_thread_only().wake_up_queue->UnregisterQueue(this);
340   }
341 
342   main_thread_only().on_task_started_handler = OnTaskStartedHandler();
343   main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
344   main_thread_only().wake_up_queue = nullptr;
345   main_thread_only().throttler = nullptr;
346   empty_queues_to_reload_handle_.ReleaseAtomicFlag();
347 
348   // It is possible for a task to hold a scoped_refptr to this, which
349   // will lead to TaskQueueImpl destructor being called when deleting a task.
350   // To avoid use-after-free, we need to clear all fields of a task queue
351   // before starting to delete the tasks.
352   // All work queues and priority queues containing tasks should be moved to
353   // local variables on stack (std::move for unique_ptrs and swap for queues)
354   // before clearing them and deleting tasks.
355 
356   // Flush the queues outside of the lock because TSAN complains about a lock
357   // order inversion for tasks that are posted from within a lock, with a
358   // destructor that acquires the same lock.
359 
360   DelayedIncomingQueue delayed_incoming_queue;
361   delayed_incoming_queue.swap(&main_thread_only().delayed_incoming_queue);
362   std::unique_ptr<WorkQueue> immediate_work_queue =
363       std::move(main_thread_only().immediate_work_queue);
364   std::unique_ptr<WorkQueue> delayed_work_queue =
365       std::move(main_thread_only().delayed_work_queue);
366 }
367 
GetName() const368 const char* TaskQueueImpl::GetName() const {
369   return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_);
370 }
371 
GetProtoName() const372 QueueName TaskQueueImpl::GetProtoName() const {
373   return name_;
374 }
375 
PostTask(PostedTask task)376 void TaskQueueImpl::PostTask(PostedTask task) {
377   CurrentThread current_thread =
378       associated_thread_->IsBoundToCurrentThread()
379           ? TaskQueueImpl::CurrentThread::kMainThread
380           : TaskQueueImpl::CurrentThread::kNotMainThread;
381 
382 #if DCHECK_IS_ON()
383   TimeDelta delay = GetTaskDelayAdjustment(current_thread);
384   if (absl::holds_alternative<base::TimeTicks>(
385           task.delay_or_delayed_run_time)) {
386     absl::get<base::TimeTicks>(task.delay_or_delayed_run_time) += delay;
387   } else {
388     absl::get<base::TimeDelta>(task.delay_or_delayed_run_time) += delay;
389   }
390 #endif  // DCHECK_IS_ON()
391 
392   if (!task.is_delayed()) {
393     PostImmediateTaskImpl(std::move(task), current_thread);
394   } else {
395     PostDelayedTaskImpl(std::move(task), current_thread);
396   }
397 }
398 
RemoveCancelableTask(HeapHandle heap_handle)399 void TaskQueueImpl::RemoveCancelableTask(HeapHandle heap_handle) {
400   associated_thread_->AssertInSequenceWithCurrentThread();
401   DCHECK(heap_handle.IsValid());
402 
403   main_thread_only().delayed_incoming_queue.remove(heap_handle);
404 
405   // Only update the delayed wake up if the top task is removed and we're
406   // running on the main thread (a `RunOrPostTask` callback may run outside the
407   // main thread, but in sequence with it -- it's not safe to invoke
408   // `MessagePump::ScheduleDelayedWork()` in that context).
409   if (heap_handle.index() == 0u &&
410       associated_thread_->IsBoundToCurrentThread()) {
411     LazyNow lazy_now(sequence_manager_->main_thread_clock());
412     UpdateWakeUp(&lazy_now);
413   }
414 }
415 
GetTaskDelayAdjustment(CurrentThread current_thread)416 TimeDelta TaskQueueImpl::GetTaskDelayAdjustment(CurrentThread current_thread) {
417 #if DCHECK_IS_ON()
418   if (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread) {
419     base::internal::CheckedAutoLock lock(any_thread_lock_);
420     // Add a per-priority delay to cross thread tasks. This can help diagnose
421     // scheduler induced flakiness by making things flake most of the time.
422     return sequence_manager_->settings()
423         .priority_settings
424         .per_priority_cross_thread_task_delay()[any_thread_.queue_set_index];
425   } else {
426     return sequence_manager_->settings()
427         .priority_settings.per_priority_same_thread_task_delay()
428             [main_thread_only().immediate_work_queue->work_queue_set_index()];
429   }
430 #else
431   // No delay adjustment.
432   return TimeDelta();
433 #endif  // DCHECK_IS_ON()
434 }
435 
PostImmediateTaskImpl(PostedTask task,CurrentThread current_thread)436 void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,
437                                           CurrentThread current_thread) {
438   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
439   // for details.
440   CHECK(task.callback);
441 
442   bool should_schedule_work = false;
443   {
444     // TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue
445     // See https://crbug.com/901800
446     base::internal::CheckedAutoLock lock(any_thread_lock_);
447     bool add_queue_time_to_tasks = sequence_manager_->GetAddQueueTimeToTasks();
448     TimeTicks queue_time;
449     bool config_category_enabled = false;
450 #if BUILDFLAG(ENABLE_BASE_TRACING)
451     config_category_enabled =
452         TRACE_EVENT_CATEGORY_ENABLED("config.scheduler.record_task_post_time");
453 #endif
454     if (config_category_enabled || add_queue_time_to_tasks ||
455         delayed_fence_allowed_) {
456       queue_time = sequence_manager_->any_thread_clock()->NowTicks();
457     }
458 
459     // The sequence number must be incremented atomically with pushing onto the
460     // incoming queue. Otherwise if there are several threads posting task we
461     // risk breaking the assumption that sequence numbers increase monotonically
462     // within a queue.
463     EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
464     bool was_immediate_incoming_queue_empty =
465         any_thread_.immediate_incoming_queue.empty();
466     any_thread_.immediate_incoming_queue.push_back(
467         Task(std::move(task), sequence_number, sequence_number, queue_time));
468 
469 #if DCHECK_IS_ON()
470     any_thread_.immediate_incoming_queue.back().cross_thread_ =
471         (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread);
472 #endif
473 
474     sequence_manager_->WillQueueTask(
475         &any_thread_.immediate_incoming_queue.back());
476     MaybeReportIpcTaskQueuedFromAnyThreadLocked(
477         any_thread_.immediate_incoming_queue.back());
478 
479     for (auto& handler : any_thread_.on_task_posted_handlers) {
480       DCHECK(!handler.second.is_null());
481       handler.second.Run(any_thread_.immediate_incoming_queue.back());
482     }
483 
484     // If this queue was completely empty, then the SequenceManager needs to be
485     // informed so it can reload the work queue and add us to the
486     // TaskQueueSelector which can only be done from the main thread. In
487     // addition it may need to schedule a DoWork if this queue isn't blocked.
488     if (was_immediate_incoming_queue_empty &&
489         any_thread_.immediate_work_queue_empty) {
490       sequence_manager_->WillRequestReloadImmediateWorkQueue();
491       empty_queues_to_reload_handle_.SetActive(true);
492       should_schedule_work =
493           any_thread_.post_immediate_task_should_schedule_work;
494     }
495   }
496 
497   // On windows it's important to call this outside of a lock because calling a
498   // pump while holding a lock can result in priority inversions. See
499   // http://shortn/_ntnKNqjDQT for a discussion.
500   //
501   // Calling ScheduleWork outside the lock should be safe, only the main thread
502   // can mutate |any_thread_.post_immediate_task_should_schedule_work|. If it
503   // transitions to false we call ScheduleWork redundantly that's harmless. If
504   // it transitions to true, the side effect of
505   // |empty_queues_to_reload_handle_SetActive(true)| is guaranteed to be picked
506   // up by the ThreadController's call to SequenceManagerImpl::DelayTillNextTask
507   // when it computes what continuation (if any) is needed.
508   if (should_schedule_work)
509     sequence_manager_->ScheduleWork();
510 
511   TraceQueueSize();
512 }
513 
PostDelayedTaskImpl(PostedTask posted_task,CurrentThread current_thread)514 void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task,
515                                         CurrentThread current_thread) {
516   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
517   // for details.
518   CHECK(posted_task.callback);
519 
520   if (current_thread == CurrentThread::kMainThread) {
521     LazyNow lazy_now(sequence_manager_->main_thread_clock());
522     Task pending_task = MakeDelayedTask(std::move(posted_task), &lazy_now);
523     sequence_manager_->MaybeAddLeewayToTask(pending_task);
524     PushOntoDelayedIncomingQueueFromMainThread(
525         std::move(pending_task), &lazy_now,
526         /* notify_task_annotator */ true);
527   } else {
528     LazyNow lazy_now(sequence_manager_->any_thread_clock());
529     PushOntoDelayedIncomingQueue(
530         MakeDelayedTask(std::move(posted_task), &lazy_now));
531   }
532 }
533 
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,LazyNow * lazy_now,bool notify_task_annotator)534 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
535     Task pending_task,
536     LazyNow* lazy_now,
537     bool notify_task_annotator) {
538 #if DCHECK_IS_ON()
539   pending_task.cross_thread_ = false;
540 #endif
541 
542   if (notify_task_annotator) {
543     sequence_manager_->WillQueueTask(&pending_task);
544     MaybeReportIpcTaskQueuedFromMainThread(pending_task);
545   }
546   main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
547   UpdateWakeUp(lazy_now);
548 
549   TraceQueueSize();
550 }
551 
PushOntoDelayedIncomingQueue(Task pending_task)552 void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {
553   sequence_manager_->WillQueueTask(&pending_task);
554   MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(pending_task);
555 
556 #if DCHECK_IS_ON()
557   pending_task.cross_thread_ = true;
558 #endif
559 
560   // TODO(altimin): Add a copy method to Task to capture metadata here.
561   auto task_runner = pending_task.task_runner;
562   const auto task_type = pending_task.task_type;
563   PostImmediateTaskImpl(
564       PostedTask(std::move(task_runner),
565                  BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
566                           Unretained(this), std::move(pending_task)),
567                  FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),
568       CurrentThread::kNotMainThread);
569 }
570 
ScheduleDelayedWorkTask(Task pending_task)571 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
572   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
573   sequence_manager_->MaybeAddLeewayToTask(pending_task);
574   TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks();
575   LazyNow lazy_now(now);
576   // A delayed task is ready to run as soon as earliest_delayed_run_time() is
577   // reached.
578   if (pending_task.earliest_delayed_run_time() <= now) {
579     // If |delayed_run_time| is in the past then push it onto the work queue
580     // immediately. To ensure the right task ordering we need to temporarily
581     // push it onto the |delayed_incoming_queue|.
582     pending_task.delayed_run_time = now;
583     main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
584     MoveReadyDelayedTasksToWorkQueue(
585         &lazy_now, sequence_manager_->GetNextSequenceNumber());
586   } else {
587     // If |delayed_run_time| is in the future we can queue it as normal.
588     PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
589                                                &lazy_now, false);
590   }
591   TraceQueueSize();
592 }
593 
ReloadEmptyImmediateWorkQueue()594 void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
595   DCHECK(main_thread_only().immediate_work_queue->Empty());
596   main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();
597 
598   if (main_thread_only().throttler && IsQueueEnabled()) {
599     main_thread_only().throttler->OnHasImmediateTask();
600   }
601 }
602 
TakeImmediateIncomingQueueTasks(TaskDeque * queue)603 void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
604   DCHECK(queue->empty());
605   // Now is a good time to consider reducing the empty queue's capacity if we're
606   // wasting memory, before we make it the `immediate_incoming_queue`.
607   queue->MaybeShrinkQueue();
608 
609   base::internal::CheckedAutoLock lock(any_thread_lock_);
610   queue->swap(any_thread_.immediate_incoming_queue);
611 
612   // Activate delayed fence if necessary. This is ideologically similar to
613   // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
614   // from any thread we can't generate an enqueue order for the fence there,
615   // so we have to check all immediate tasks and use their enqueue order for
616   // a fence.
617   if (main_thread_only().delayed_fence) {
618     for (const Task& task : *queue) {
619       DCHECK(!task.queue_time.is_null());
620       DCHECK(task.delayed_run_time.is_null());
621       if (task.queue_time >= main_thread_only().delayed_fence.value()) {
622         main_thread_only().delayed_fence = std::nullopt;
623         DCHECK(!main_thread_only().current_fence);
624         main_thread_only().current_fence = Fence(task.task_order());
625         // Do not trigger WorkQueueSets notification when taking incoming
626         // immediate queue.
627         main_thread_only().immediate_work_queue->InsertFenceSilently(
628             *main_thread_only().current_fence);
629         main_thread_only().delayed_work_queue->InsertFenceSilently(
630             *main_thread_only().current_fence);
631         break;
632       }
633     }
634   }
635 
636   UpdateCrossThreadQueueStateLocked();
637 }
638 
IsEmpty() const639 bool TaskQueueImpl::IsEmpty() const {
640   if (!main_thread_only().delayed_work_queue->Empty() ||
641       !main_thread_only().delayed_incoming_queue.empty() ||
642       !main_thread_only().immediate_work_queue->Empty()) {
643     return false;
644   }
645 
646   base::internal::CheckedAutoLock lock(any_thread_lock_);
647   return any_thread_.immediate_incoming_queue.empty();
648 }
649 
GetNumberOfPendingTasks() const650 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
651   size_t task_count = 0;
652   task_count += main_thread_only().delayed_work_queue->Size();
653   task_count += main_thread_only().delayed_incoming_queue.size();
654   task_count += main_thread_only().immediate_work_queue->Size();
655 
656   base::internal::CheckedAutoLock lock(any_thread_lock_);
657   task_count += any_thread_.immediate_incoming_queue.size();
658   return task_count;
659 }
660 
HasTaskToRunImmediatelyOrReadyDelayedTask() const661 bool TaskQueueImpl::HasTaskToRunImmediatelyOrReadyDelayedTask() const {
662   // Any work queue tasks count as immediate work.
663   if (!main_thread_only().delayed_work_queue->Empty() ||
664       !main_thread_only().immediate_work_queue->Empty()) {
665     return true;
666   }
667 
668   // Tasks on |delayed_incoming_queue| that could run now, count as
669   // immediate work.
670   if (!main_thread_only().delayed_incoming_queue.empty() &&
671       main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
672           sequence_manager_->main_thread_clock()->NowTicks()) {
673     return true;
674   }
675 
676   // Finally tasks on |immediate_incoming_queue| count as immediate work.
677   base::internal::CheckedAutoLock lock(any_thread_lock_);
678   return !any_thread_.immediate_incoming_queue.empty();
679 }
680 
GetNextDesiredWakeUp()681 std::optional<WakeUp> TaskQueueImpl::GetNextDesiredWakeUp() {
682   // Note we don't scheduled a wake-up for disabled queues.
683   if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
684     return std::nullopt;
685 
686   const auto& top_task = main_thread_only().delayed_incoming_queue.top();
687 
688   // High resolution is needed if the queue contains high resolution tasks and
689   // has a priority index <= kNormalPriority (precise execution time is
690   // unnecessary for a low priority queue).
691   WakeUpResolution resolution = has_pending_high_resolution_tasks() &&
692                                         GetQueuePriority() <= DefaultPriority()
693                                     ? WakeUpResolution::kHigh
694                                     : WakeUpResolution::kLow;
695   subtle::DelayPolicy delay_policy = top_task.delay_policy;
696   if (GetQueuePriority() > DefaultPriority() &&
697       delay_policy == subtle::DelayPolicy::kPrecise) {
698     delay_policy = subtle::DelayPolicy::kFlexibleNoSooner;
699   }
700   return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution,
701                 delay_policy};
702 }
703 
OnWakeUp(LazyNow * lazy_now,EnqueueOrder enqueue_order)704 void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) {
705   MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order);
706   if (main_thread_only().throttler) {
707     main_thread_only().throttler->OnWakeUp(lazy_now);
708   }
709 }
710 
RemoveAllCanceledDelayedTasksFromFront(LazyNow * lazy_now)711 bool TaskQueueImpl::RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) {
712   // Because task destructors could have a side-effect of posting new tasks, we
713   // move all the cancelled tasks into a temporary container before deleting
714   // them. This is to avoid the queue from changing while iterating over it.
715   absl::InlinedVector<Task, 8> tasks_to_delete;
716 
717   while (!main_thread_only().delayed_incoming_queue.empty()) {
718     const Task& task = main_thread_only().delayed_incoming_queue.top();
719     CHECK(task.task);
720     if (!task.task.IsCancelled())
721       break;
722 
723     tasks_to_delete.push_back(
724         main_thread_only().delayed_incoming_queue.take_top());
725   }
726 
727   if (!tasks_to_delete.empty()) {
728     UpdateWakeUp(lazy_now);
729     return true;
730   }
731 
732   return false;
733 }
734 
MoveReadyDelayedTasksToWorkQueue(LazyNow * lazy_now,EnqueueOrder enqueue_order)735 void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue(
736     LazyNow* lazy_now,
737     EnqueueOrder enqueue_order) {
738   // Enqueue all delayed tasks that should be running now, skipping any that
739   // have been canceled.
740   WorkQueue::TaskPusher delayed_work_queue_task_pusher(
741       main_thread_only().delayed_work_queue->CreateTaskPusher());
742 
743   // Because task destructors could have a side-effect of posting new tasks, we
744   // move all the cancelled tasks into a temporary container before deleting
745   // them. This is to avoid the queue from changing while iterating over it.
746   absl::InlinedVector<Task, 8> tasks_to_delete;
747 
748   while (!main_thread_only().delayed_incoming_queue.empty()) {
749     const Task& task = main_thread_only().delayed_incoming_queue.top();
750     CHECK(task.task);
751 
752     // Leave the top task alone if it hasn't been canceled and it is not ready.
753     const bool is_cancelled = task.task.IsCancelled();
754     if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now())
755       break;
756 
757     Task ready_task = main_thread_only().delayed_incoming_queue.take_top();
758     if (is_cancelled) {
759       tasks_to_delete.push_back(std::move(ready_task));
760       continue;
761     }
762 
763     // The top task is ready to run. Move it to the delayed work queue.
764 #if DCHECK_IS_ON()
765     if (sequence_manager_->settings().log_task_delay_expiry)
766       VLOG(0) << GetName() << " Delay expired for "
767               << ready_task.posted_from.ToString();
768 #endif  // DCHECK_IS_ON()
769     DCHECK(!ready_task.delayed_run_time.is_null());
770     DCHECK(!ready_task.enqueue_order_set());
771     ready_task.set_enqueue_order(enqueue_order);
772     ActivateDelayedFenceIfNeeded(ready_task);
773 
774     delayed_work_queue_task_pusher.Push(std::move(ready_task));
775   }
776 
777   // Explicitly delete tasks last.
778   tasks_to_delete.clear();
779 
780   UpdateWakeUp(lazy_now);
781 }
782 
TraceQueueSize() const783 void TaskQueueImpl::TraceQueueSize() const {
784   bool is_tracing;
785   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
786       TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
787   if (!is_tracing)
788     return;
789 
790   // It's only safe to access the work queues from the main thread.
791   // TODO(alexclarke): We should find another way of tracing this
792   if (!associated_thread_->IsBoundToCurrentThread())
793     return;
794 
795   size_t total_task_count;
796   {
797     base::internal::CheckedAutoLock lock(any_thread_lock_);
798     total_task_count = any_thread_.immediate_incoming_queue.size() +
799                        main_thread_only().immediate_work_queue->Size() +
800                        main_thread_only().delayed_work_queue->Size() +
801                        main_thread_only().delayed_incoming_queue.size();
802   }
803   TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
804                  total_task_count);
805 }
806 
SetQueuePriority(TaskQueue::QueuePriority priority)807 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
808   const TaskQueue::QueuePriority previous_priority = GetQueuePriority();
809   if (priority == previous_priority)
810     return;
811   sequence_manager_->main_thread_only().selector.SetQueuePriority(this,
812                                                                   priority);
813 
814 #if BUILDFLAG(IS_WIN)
815   // Updating queue priority can change whether high resolution timer is needed.
816   LazyNow lazy_now(sequence_manager_->main_thread_clock());
817   UpdateWakeUp(&lazy_now);
818 #endif
819 
820   if (priority > DefaultPriority()) {
821     // |priority| is now lower than the default, so update accordingly.
822     main_thread_only()
823         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
824         EnqueueOrder::max();
825   } else if (previous_priority > DefaultPriority()) {
826     // |priority| is no longer lower than the default, so record current
827     // sequence number.
828     DCHECK_EQ(
829         main_thread_only()
830             .enqueue_order_at_which_we_became_unblocked_with_normal_priority,
831         EnqueueOrder::max());
832     main_thread_only()
833         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
834         sequence_manager_->GetNextSequenceNumber();
835   }
836 }
837 
GetQueuePriority() const838 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
839   size_t set_index = immediate_work_queue()->work_queue_set_index();
840   DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
841   return static_cast<TaskQueue::QueuePriority>(set_index);
842 }
843 
AsValue(TimeTicks now,bool force_verbose) const844 Value::Dict TaskQueueImpl::AsValue(TimeTicks now, bool force_verbose) const {
845   base::internal::CheckedAutoLock lock(any_thread_lock_);
846   Value::Dict state;
847   state.Set("name", GetName());
848   if (any_thread_.unregistered) {
849     state.Set("unregistered", true);
850     return state;
851   }
852   DCHECK(main_thread_only().delayed_work_queue);
853   DCHECK(main_thread_only().immediate_work_queue);
854 
855   state.Set("task_queue_id",
856             StringPrintf("0x%" PRIx64, static_cast<uint64_t>(
857                                            reinterpret_cast<uintptr_t>(this))));
858   state.Set("enabled", IsQueueEnabled());
859   // TODO(crbug.com/40228085): Make base::Value able to store an int64_t and
860   // remove the various static_casts below.
861   state.Set("any_thread_.immediate_incoming_queuesize",
862             static_cast<int>(any_thread_.immediate_incoming_queue.size()));
863   state.Set("delayed_incoming_queue_size",
864             static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
865   state.Set("immediate_work_queue_size",
866             static_cast<int>(main_thread_only().immediate_work_queue->Size()));
867   state.Set("delayed_work_queue_size",
868             static_cast<int>(main_thread_only().delayed_work_queue->Size()));
869 
870   state.Set("any_thread_.immediate_incoming_queuecapacity",
871             static_cast<int>(any_thread_.immediate_incoming_queue.capacity()));
872   state.Set("immediate_work_queue_capacity",
873             static_cast<int>(immediate_work_queue()->Capacity()));
874   state.Set("delayed_work_queue_capacity",
875             static_cast<int>(delayed_work_queue()->Capacity()));
876 
877   if (!main_thread_only().delayed_incoming_queue.empty()) {
878     TimeDelta delay_to_next_task =
879         (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
880          sequence_manager_->main_thread_clock()->NowTicks());
881     state.Set("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF());
882   }
883   if (main_thread_only().current_fence) {
884     Value::Dict fence_state;
885     fence_state.Set(
886         "enqueue_order",
887         static_cast<int>(
888             main_thread_only().current_fence->task_order().enqueue_order()));
889     fence_state.Set("activated_in_wake_up", !main_thread_only()
890                                                  .current_fence->task_order()
891                                                  .delayed_run_time()
892                                                  .is_null());
893     state.Set("current_fence", std::move(fence_state));
894   }
895   if (main_thread_only().delayed_fence) {
896     state.Set("delayed_fence_seconds_from_now",
897               (main_thread_only().delayed_fence.value() - now).InSecondsF());
898   }
899 
900   bool verbose = false;
901   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
902       TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
903       &verbose);
904 
905   if (verbose || force_verbose) {
906     state.Set("immediate_incoming_queue",
907               QueueAsValue(any_thread_.immediate_incoming_queue, now));
908     state.Set("delayed_work_queue",
909               main_thread_only().delayed_work_queue->AsValue(now));
910     state.Set("immediate_work_queue",
911               main_thread_only().immediate_work_queue->AsValue(now));
912     state.Set("delayed_incoming_queue",
913               main_thread_only().delayed_incoming_queue.AsValue(now));
914   }
915   state.Set("priority", GetQueuePriority());
916   return state;
917 }
918 
AddTaskObserver(TaskObserver * task_observer)919 void TaskQueueImpl::AddTaskObserver(TaskObserver* task_observer) {
920   main_thread_only().task_observers.AddObserver(task_observer);
921 }
922 
RemoveTaskObserver(TaskObserver * task_observer)923 void TaskQueueImpl::RemoveTaskObserver(TaskObserver* task_observer) {
924   main_thread_only().task_observers.RemoveObserver(task_observer);
925 }
926 
NotifyWillProcessTask(const Task & task,bool was_blocked_or_low_priority)927 void TaskQueueImpl::NotifyWillProcessTask(const Task& task,
928                                           bool was_blocked_or_low_priority) {
929   DCHECK(should_notify_observers_);
930 
931   for (auto& observer : main_thread_only().task_observers)
932     observer.WillProcessTask(task, was_blocked_or_low_priority);
933 }
934 
NotifyDidProcessTask(const Task & task)935 void TaskQueueImpl::NotifyDidProcessTask(const Task& task) {
936   DCHECK(should_notify_observers_);
937   for (auto& observer : main_thread_only().task_observers)
938     observer.DidProcessTask(task);
939 }
940 
InsertFence(TaskQueue::InsertFencePosition position)941 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
942   Fence new_fence = position == TaskQueue::InsertFencePosition::kNow
943                         ? Fence::CreateWithEnqueueOrder(
944                               sequence_manager_->GetNextSequenceNumber())
945                         : Fence::BlockingFence();
946   InsertFence(new_fence);
947 }
948 
InsertFence(Fence current_fence)949 void TaskQueueImpl::InsertFence(Fence current_fence) {
950   // Only one fence may be present at a time.
951   main_thread_only().delayed_fence = std::nullopt;
952 
953   std::optional<Fence> previous_fence = main_thread_only().current_fence;
954 
955   // Tasks posted after this point will have a strictly higher enqueue order
956   // and will be blocked from running.
957   main_thread_only().current_fence = current_fence;
958   bool front_task_unblocked =
959       main_thread_only().immediate_work_queue->InsertFence(current_fence);
960   front_task_unblocked |=
961       main_thread_only().delayed_work_queue->InsertFence(current_fence);
962 
963   {
964     base::internal::CheckedAutoLock lock(any_thread_lock_);
965     if (!front_task_unblocked && previous_fence &&
966         previous_fence->task_order() < current_fence.task_order()) {
967       if (!any_thread_.immediate_incoming_queue.empty() &&
968           any_thread_.immediate_incoming_queue.front().task_order() >
969               previous_fence->task_order() &&
970           any_thread_.immediate_incoming_queue.front().task_order() <
971               current_fence.task_order()) {
972         front_task_unblocked = true;
973       }
974     }
975 
976     UpdateCrossThreadQueueStateLocked();
977   }
978 
979   if (IsQueueEnabled() && front_task_unblocked) {
980     OnQueueUnblocked();
981     sequence_manager_->ScheduleWork();
982   }
983 }
984 
InsertFenceAt(TimeTicks time)985 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
986   DCHECK(delayed_fence_allowed_)
987       << "Delayed fences are not supported for this queue. Enable them "
988          "explicitly in TaskQueue::Spec when creating the queue";
989 
990   // Task queue can have only one fence, delayed or not.
991   RemoveFence();
992   main_thread_only().delayed_fence = time;
993 }
994 
RemoveFence()995 void TaskQueueImpl::RemoveFence() {
996   std::optional<Fence> previous_fence = main_thread_only().current_fence;
997   main_thread_only().current_fence = std::nullopt;
998   main_thread_only().delayed_fence = std::nullopt;
999 
1000   bool front_task_unblocked =
1001       main_thread_only().immediate_work_queue->RemoveFence();
1002   front_task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
1003 
1004   {
1005     base::internal::CheckedAutoLock lock(any_thread_lock_);
1006     if (!front_task_unblocked && previous_fence) {
1007       if (!any_thread_.immediate_incoming_queue.empty() &&
1008           any_thread_.immediate_incoming_queue.front().task_order() >
1009               previous_fence->task_order()) {
1010         front_task_unblocked = true;
1011       }
1012     }
1013 
1014     UpdateCrossThreadQueueStateLocked();
1015   }
1016 
1017   if (IsQueueEnabled() && front_task_unblocked) {
1018     OnQueueUnblocked();
1019     sequence_manager_->ScheduleWork();
1020   }
1021 }
1022 
BlockedByFence() const1023 bool TaskQueueImpl::BlockedByFence() const {
1024   if (!main_thread_only().current_fence)
1025     return false;
1026 
1027   if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
1028       !main_thread_only().delayed_work_queue->BlockedByFence()) {
1029     return false;
1030   }
1031 
1032   base::internal::CheckedAutoLock lock(any_thread_lock_);
1033   if (any_thread_.immediate_incoming_queue.empty())
1034     return true;
1035 
1036   return any_thread_.immediate_incoming_queue.front().task_order() >
1037          main_thread_only().current_fence->task_order();
1038 }
1039 
HasActiveFence()1040 bool TaskQueueImpl::HasActiveFence() {
1041   if (main_thread_only().delayed_fence &&
1042       sequence_manager_->main_thread_clock()->NowTicks() >
1043           main_thread_only().delayed_fence.value()) {
1044     return true;
1045   }
1046   return !!main_thread_only().current_fence;
1047 }
1048 
CouldTaskRun(EnqueueOrder enqueue_order) const1049 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
1050   if (!IsQueueEnabled())
1051     return false;
1052 
1053   if (!main_thread_only().current_fence)
1054     return true;
1055 
1056   // TODO(crbug.com/40791504): This should use TaskOrder. This is currently only
1057   // used for tests and is fine as-is, but we should be using `TaskOrder` for
1058   // task comparisons. Also this test should be renamed with a testing suffix as
1059   // it is not used in production.
1060   return enqueue_order <
1061          main_thread_only().current_fence->task_order().enqueue_order();
1062 }
1063 
WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const1064 bool TaskQueueImpl::WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const {
1065   return enqueue_order <
1066          main_thread_only()
1067              .enqueue_order_at_which_we_became_unblocked_with_normal_priority;
1068 }
1069 
1070 // static
QueueAsValue(const TaskDeque & queue,TimeTicks now)1071 Value::List TaskQueueImpl::QueueAsValue(const TaskDeque& queue, TimeTicks now) {
1072   Value::List state;
1073   for (const Task& task : queue)
1074     state.Append(TaskAsValue(task, now));
1075   return state;
1076 }
1077 
1078 // static
TaskAsValue(const Task & task,TimeTicks now)1079 Value::Dict TaskQueueImpl::TaskAsValue(const Task& task, TimeTicks now) {
1080   Value::Dict state;
1081   state.Set("posted_from", task.posted_from.ToString());
1082   if (task.enqueue_order_set())
1083     state.Set("enqueue_order", static_cast<int>(task.enqueue_order()));
1084   state.Set("sequence_num", task.sequence_num);
1085   state.Set("nestable", task.nestable == Nestable::kNestable);
1086   state.Set("is_high_res", task.is_high_res);
1087   state.Set("is_cancelled", task.task.IsCancelled());
1088   state.Set("delayed_run_time",
1089             (task.delayed_run_time - TimeTicks()).InMillisecondsF());
1090   const TimeDelta delayed_run_time_milliseconds_from_now =
1091       task.delayed_run_time.is_null() ? TimeDelta()
1092                                       : (task.delayed_run_time - now);
1093   state.Set("delayed_run_time_milliseconds_from_now",
1094             delayed_run_time_milliseconds_from_now.InMillisecondsF());
1095   return state;
1096 }
1097 
MakeDelayedTask(PostedTask delayed_task,LazyNow * lazy_now) const1098 Task TaskQueueImpl::MakeDelayedTask(PostedTask delayed_task,
1099                                     LazyNow* lazy_now) const {
1100   EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
1101   base::TimeDelta delay;
1102   WakeUpResolution resolution = WakeUpResolution::kLow;
1103 #if BUILDFLAG(IS_WIN)
1104   const bool explicit_high_resolution_timer_win =
1105       g_explicit_high_resolution_timer_win.load(std::memory_order_relaxed);
1106 #endif  // BUILDFLAG(IS_WIN)
1107   if (absl::holds_alternative<base::TimeDelta>(
1108           delayed_task.delay_or_delayed_run_time)) {
1109     delay = absl::get<base::TimeDelta>(delayed_task.delay_or_delayed_run_time);
1110     delayed_task.delay_or_delayed_run_time = lazy_now->Now() + delay;
1111   } else {
1112     delay = absl::get<base::TimeTicks>(delayed_task.delay_or_delayed_run_time) -
1113             lazy_now->Now();
1114   }
1115 #if BUILDFLAG(IS_WIN)
1116   if (!explicit_high_resolution_timer_win &&
1117       delay < (2 * base::Milliseconds(Time::kMinLowResolutionThresholdMs))) {
1118     // Outside the kExplicitHighResolutionTimerWin experiment, We consider the
1119     // task needs a high resolution timer if the delay is more than 0 and less
1120     // than 32ms. This caps the relative error to less than 50% : a 33ms wait
1121     // can wake at 48ms since the default resolution on Windows is between 10
1122     // and 15ms.
1123     resolution = WakeUpResolution::kHigh;
1124   }
1125 #endif  // BUILDFLAG(IS_WIN)
1126   delayed_task.delay_policy = subtle::MaybeOverrideDelayPolicy(
1127       delayed_task.delay_policy, delay,
1128       g_max_precise_delay.load(std::memory_order_relaxed));
1129   // leeway isn't specified yet since this may be called from any thread.
1130   return Task(std::move(delayed_task), sequence_number, EnqueueOrder(),
1131               lazy_now->Now(), resolution);
1132 }
1133 
IsQueueEnabled() const1134 bool TaskQueueImpl::IsQueueEnabled() const {
1135   return main_thread_only().is_enabled;
1136 }
1137 
SetQueueEnabled(bool enabled)1138 void TaskQueueImpl::SetQueueEnabled(bool enabled) {
1139   if (main_thread_only().is_enabled == enabled)
1140     return;
1141 
1142   // Update the |main_thread_only_| struct.
1143   main_thread_only().is_enabled = enabled;
1144   main_thread_only().disabled_time = std::nullopt;
1145 
1146   // |sequence_manager_| can be null in tests.
1147   if (!sequence_manager_)
1148     return;
1149 
1150   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1151 
1152   if (!enabled) {
1153     bool tracing_enabled = false;
1154     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1155                                        &tracing_enabled);
1156     main_thread_only().disabled_time = lazy_now.Now();
1157   } else {
1158     // Override reporting if the queue is becoming enabled again.
1159     main_thread_only().should_report_posted_tasks_when_disabled = false;
1160   }
1161 
1162   // If there is a throttler, it will be notified of pending delayed and
1163   // immediate tasks inside UpdateWakeUp().
1164   UpdateWakeUp(&lazy_now);
1165 
1166   {
1167     base::internal::CheckedAutoLock lock(any_thread_lock_);
1168     UpdateCrossThreadQueueStateLocked();
1169 
1170     // Copy over the task-reporting related state.
1171     any_thread_.is_enabled = enabled;
1172     any_thread_.tracing_only.disabled_time = main_thread_only().disabled_time;
1173     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1174         main_thread_only().should_report_posted_tasks_when_disabled;
1175   }
1176 
1177   // Finally, enable or disable the queue with the selector.
1178   if (enabled) {
1179     // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
1180     // a DoWork if needed.
1181     sequence_manager_->main_thread_only().selector.EnableQueue(this);
1182 
1183     if (!BlockedByFence())
1184       OnQueueUnblocked();
1185   } else {
1186     sequence_manager_->main_thread_only().selector.DisableQueue(this);
1187   }
1188 }
1189 
SetShouldReportPostedTasksWhenDisabled(bool should_report)1190 void TaskQueueImpl::SetShouldReportPostedTasksWhenDisabled(bool should_report) {
1191   if (main_thread_only().should_report_posted_tasks_when_disabled ==
1192       should_report)
1193     return;
1194 
1195   // Only observe transitions turning the reporting on if tracing is enabled.
1196   if (should_report) {
1197     bool tracing_enabled = false;
1198     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1199                                        &tracing_enabled);
1200     if (!tracing_enabled)
1201       return;
1202   }
1203 
1204   main_thread_only().should_report_posted_tasks_when_disabled = should_report;
1205 
1206   // Mirror the state to the AnyThread struct as well.
1207   {
1208     base::internal::CheckedAutoLock lock(any_thread_lock_);
1209     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1210         should_report;
1211   }
1212 }
1213 
UpdateCrossThreadQueueStateLocked()1214 void TaskQueueImpl::UpdateCrossThreadQueueStateLocked() {
1215   any_thread_.immediate_work_queue_empty =
1216       main_thread_only().immediate_work_queue->Empty();
1217   any_thread_.is_enabled = main_thread_only().is_enabled;
1218 
1219   if (main_thread_only().throttler) {
1220     // If there's a Throttler, always ScheduleWork() when immediate work is
1221     // posted and the queue is enabled, to ensure that
1222     // Throttler::OnHasImmediateTask() is invoked.
1223     any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled();
1224   } else {
1225     // Otherwise, ScheduleWork() only if the queue is enabled and there isn't a
1226     // fence to prevent the task from being executed.
1227     any_thread_.post_immediate_task_should_schedule_work =
1228         IsQueueEnabled() && !main_thread_only().current_fence;
1229   }
1230 
1231 #if DCHECK_IS_ON()
1232   any_thread_.queue_set_index =
1233       main_thread_only().immediate_work_queue->work_queue_set_index();
1234 #endif
1235 }
1236 
ReclaimMemory(TimeTicks now)1237 void TaskQueueImpl::ReclaimMemory(TimeTicks now) {
1238   if (main_thread_only().delayed_incoming_queue.empty())
1239     return;
1240 
1241   main_thread_only().delayed_incoming_queue.SweepCancelledTasks(
1242       sequence_manager_);
1243 
1244   // If deleting one of the cancelled tasks shut down this queue, bail out.
1245   // Note that in this scenario |this| is still valid, but some fields of the
1246   // queue have been cleared out by |UnregisterTaskQueue|.
1247   if (!main_thread_only().delayed_work_queue) {
1248     return;
1249   }
1250 
1251   LazyNow lazy_now(now);
1252   UpdateWakeUp(&lazy_now);
1253 
1254   // Also consider shrinking the work queue if it's wasting memory.
1255   main_thread_only().delayed_work_queue->MaybeShrinkQueue();
1256   main_thread_only().immediate_work_queue->MaybeShrinkQueue();
1257 
1258   {
1259     base::internal::CheckedAutoLock lock(any_thread_lock_);
1260     any_thread_.immediate_incoming_queue.MaybeShrinkQueue();
1261   }
1262 }
1263 
PushImmediateIncomingTaskForTest(Task task)1264 void TaskQueueImpl::PushImmediateIncomingTaskForTest(Task task) {
1265   base::internal::CheckedAutoLock lock(any_thread_lock_);
1266   any_thread_.immediate_incoming_queue.push_back(std::move(task));
1267 }
1268 
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)1269 void TaskQueueImpl::RequeueDeferredNonNestableTask(
1270     DeferredNonNestableTask task) {
1271   DCHECK(task.task.nestable == Nestable::kNonNestable);
1272 
1273   // It's possible that the queue was unregistered since the task was posted.
1274   // Skip the task in that case.
1275   if (!main_thread_only().delayed_work_queue)
1276     return;
1277 
1278   // The re-queued tasks have to be pushed onto the front because we'd otherwise
1279   // violate the strict monotonically increasing enqueue order within the
1280   // WorkQueue.  We can't assign them a new enqueue order here because that will
1281   // not behave correctly with fences and things will break (e.g Idle TQ).
1282   if (task.work_queue_type == WorkQueueType::kDelayed) {
1283     main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
1284         std::move(task.task));
1285   } else {
1286     // We're about to push |task| onto an empty |immediate_work_queue|
1287     // (bypassing |immediate_incoming_queue_|). As such, we no longer need to
1288     // reload if we were planning to. The flag must be cleared while holding
1289     // the lock to avoid a cross-thread post task setting it again before
1290     // we actually make |immediate_work_queue| non-empty.
1291     if (main_thread_only().immediate_work_queue->Empty()) {
1292       base::internal::CheckedAutoLock lock(any_thread_lock_);
1293       empty_queues_to_reload_handle_.SetActive(false);
1294 
1295       any_thread_.immediate_work_queue_empty = false;
1296       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1297           std::move(task.task));
1298 
1299     } else {
1300       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1301           std::move(task.task));
1302     }
1303   }
1304 }
1305 
SetThrottler(TaskQueue::Throttler * throttler)1306 void TaskQueueImpl::SetThrottler(TaskQueue::Throttler* throttler) {
1307   DCHECK(throttler);
1308   DCHECK(!main_thread_only().throttler)
1309       << "Can't assign two different throttlers to "
1310          "base::sequence_manager:TaskQueue";
1311   // `throttler` is guaranteed to outlive this object.
1312   main_thread_only().throttler = throttler;
1313 }
1314 
ResetThrottler()1315 void TaskQueueImpl::ResetThrottler() {
1316   main_thread_only().throttler = nullptr;
1317   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1318   // The current delayed wake up may have been determined by the Throttler.
1319   // Update it now that there is no Throttler.
1320   UpdateWakeUp(&lazy_now);
1321 }
1322 
UpdateWakeUp(LazyNow * lazy_now)1323 void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) {
1324   std::optional<WakeUp> wake_up = GetNextDesiredWakeUp();
1325   if (main_thread_only().throttler && IsQueueEnabled()) {
1326     // GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is
1327     // nullopt, e.g. to throttle immediate tasks.
1328     wake_up = main_thread_only().throttler->GetNextAllowedWakeUp(
1329         lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask());
1330   }
1331   SetNextWakeUp(lazy_now, wake_up);
1332 }
1333 
SetNextWakeUp(LazyNow * lazy_now,std::optional<WakeUp> wake_up)1334 void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now,
1335                                   std::optional<WakeUp> wake_up) {
1336   if (main_thread_only().scheduled_wake_up == wake_up)
1337     return;
1338   main_thread_only().scheduled_wake_up = wake_up;
1339   main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now,
1340                                                           wake_up);
1341 }
1342 
HasTaskToRunImmediately() const1343 bool TaskQueueImpl::HasTaskToRunImmediately() const {
1344   // Any work queue tasks count as immediate work.
1345   if (!main_thread_only().delayed_work_queue->Empty() ||
1346       !main_thread_only().immediate_work_queue->Empty()) {
1347     return true;
1348   }
1349 
1350   // Finally tasks on |immediate_incoming_queue| count as immediate work.
1351   base::internal::CheckedAutoLock lock(any_thread_lock_);
1352   return !any_thread_.immediate_incoming_queue.empty();
1353 }
1354 
HasTaskToRunImmediatelyLocked() const1355 bool TaskQueueImpl::HasTaskToRunImmediatelyLocked() const {
1356   return !main_thread_only().delayed_work_queue->Empty() ||
1357          !main_thread_only().immediate_work_queue->Empty() ||
1358          !any_thread_.immediate_incoming_queue.empty();
1359 }
1360 
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)1361 void TaskQueueImpl::SetOnTaskStartedHandler(
1362     TaskQueueImpl::OnTaskStartedHandler handler) {
1363   DCHECK(should_notify_observers_ || handler.is_null());
1364   main_thread_only().on_task_started_handler = std::move(handler);
1365 }
1366 
OnTaskStarted(const Task & task,const TaskQueue::TaskTiming & task_timing)1367 void TaskQueueImpl::OnTaskStarted(const Task& task,
1368                                   const TaskQueue::TaskTiming& task_timing) {
1369   if (!main_thread_only().on_task_started_handler.is_null())
1370     main_thread_only().on_task_started_handler.Run(task, task_timing);
1371 }
1372 
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)1373 void TaskQueueImpl::SetOnTaskCompletedHandler(
1374     TaskQueueImpl::OnTaskCompletedHandler handler) {
1375   DCHECK(should_notify_observers_ || handler.is_null());
1376   main_thread_only().on_task_completed_handler = std::move(handler);
1377 }
1378 
OnTaskCompleted(const Task & task,TaskQueue::TaskTiming * task_timing,LazyNow * lazy_now)1379 void TaskQueueImpl::OnTaskCompleted(const Task& task,
1380                                     TaskQueue::TaskTiming* task_timing,
1381                                     LazyNow* lazy_now) {
1382   if (!main_thread_only().on_task_completed_handler.is_null()) {
1383     main_thread_only().on_task_completed_handler.Run(task, task_timing,
1384                                                      lazy_now);
1385   }
1386 }
1387 
RequiresTaskTiming() const1388 bool TaskQueueImpl::RequiresTaskTiming() const {
1389   return !main_thread_only().on_task_started_handler.is_null() ||
1390          !main_thread_only().on_task_completed_handler.is_null();
1391 }
1392 
1393 std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler)1394 TaskQueueImpl::AddOnTaskPostedHandler(OnTaskPostedHandler handler) {
1395   DCHECK(should_notify_observers_ && !handler.is_null());
1396   std::unique_ptr<OnTaskPostedCallbackHandleImpl> handle =
1397       std::make_unique<OnTaskPostedCallbackHandleImpl>(this,
1398                                                        associated_thread_);
1399   base::internal::CheckedAutoLock lock(any_thread_lock_);
1400   any_thread_.on_task_posted_handlers.insert(
1401       {handle.get(), std::move(handler)});
1402   return handle;
1403 }
1404 
RemoveOnTaskPostedHandler(TaskQueueImpl::OnTaskPostedCallbackHandleImpl * on_task_posted_callback_handle)1405 void TaskQueueImpl::RemoveOnTaskPostedHandler(
1406     TaskQueueImpl::OnTaskPostedCallbackHandleImpl*
1407         on_task_posted_callback_handle) {
1408   base::internal::CheckedAutoLock lock(any_thread_lock_);
1409   any_thread_.on_task_posted_handlers.erase(on_task_posted_callback_handle);
1410 }
1411 
SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger)1412 void TaskQueueImpl::SetTaskExecutionTraceLogger(
1413     TaskExecutionTraceLogger logger) {
1414   DCHECK(should_notify_observers_ || logger.is_null());
1415   main_thread_only().task_execution_trace_logger = std::move(logger);
1416 }
1417 
IsUnregistered() const1418 bool TaskQueueImpl::IsUnregistered() const {
1419   base::internal::CheckedAutoLock lock(any_thread_lock_);
1420   return any_thread_.unregistered;
1421 }
1422 
GetSequenceManagerWeakPtr()1423 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
1424   return sequence_manager_->GetWeakPtr();
1425 }
1426 
ActivateDelayedFenceIfNeeded(const Task & task)1427 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) {
1428   if (!main_thread_only().delayed_fence)
1429     return;
1430   if (main_thread_only().delayed_fence.value() > task.delayed_run_time)
1431     return;
1432   InsertFence(Fence(task.task_order()));
1433   main_thread_only().delayed_fence = std::nullopt;
1434 }
1435 
MaybeReportIpcTaskQueuedFromMainThread(const Task & pending_task)1436 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread(
1437     const Task& pending_task) {
1438   if (!pending_task.ipc_hash)
1439     return;
1440 
1441   // It's possible that tracing was just enabled and no disabled time has been
1442   // stored. In that case, skip emitting the event.
1443   if (!main_thread_only().disabled_time)
1444     return;
1445 
1446   bool tracing_enabled = false;
1447   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1448                                      &tracing_enabled);
1449   if (!tracing_enabled)
1450     return;
1451 
1452   if (main_thread_only().is_enabled ||
1453       !main_thread_only().should_report_posted_tasks_when_disabled) {
1454     return;
1455   }
1456 
1457   base::TimeDelta time_since_disabled =
1458       sequence_manager_->main_thread_clock()->NowTicks() -
1459       main_thread_only().disabled_time.value();
1460 
1461   ReportIpcTaskQueued(pending_task, time_since_disabled);
1462 }
1463 
ShouldReportIpcTaskQueuedFromAnyThreadLocked(base::TimeDelta * time_since_disabled)1464 bool TaskQueueImpl::ShouldReportIpcTaskQueuedFromAnyThreadLocked(
1465     base::TimeDelta* time_since_disabled) {
1466   // It's possible that tracing was just enabled and no disabled time has been
1467   // stored. In that case, skip emitting the event.
1468   if (!any_thread_.tracing_only.disabled_time)
1469     return false;
1470 
1471   if (any_thread_.is_enabled ||
1472       any_thread_.tracing_only.should_report_posted_tasks_when_disabled) {
1473     return false;
1474   }
1475 
1476   *time_since_disabled = sequence_manager_->any_thread_clock()->NowTicks() -
1477                          any_thread_.tracing_only.disabled_time.value();
1478   return true;
1479 }
1480 
MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task & pending_task)1481 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadLocked(
1482     const Task& pending_task) {
1483   if (!pending_task.ipc_hash)
1484     return;
1485 
1486   bool tracing_enabled = false;
1487   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1488                                      &tracing_enabled);
1489   if (!tracing_enabled)
1490     return;
1491 
1492   base::TimeDelta time_since_disabled;
1493   if (ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled))
1494     ReportIpcTaskQueued(pending_task, time_since_disabled);
1495 }
1496 
MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task & pending_task)1497 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(
1498     const Task& pending_task) {
1499   if (!pending_task.ipc_hash)
1500     return;
1501 
1502   bool tracing_enabled = false;
1503   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1504                                      &tracing_enabled);
1505   if (!tracing_enabled)
1506     return;
1507 
1508   base::TimeDelta time_since_disabled;
1509   bool should_report = false;
1510   {
1511     base::internal::CheckedAutoLock lock(any_thread_lock_);
1512     should_report =
1513         ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled);
1514   }
1515 
1516   if (should_report)
1517     ReportIpcTaskQueued(pending_task, time_since_disabled);
1518 }
1519 
ReportIpcTaskQueued(const Task & pending_task,const base::TimeDelta & time_since_disabled)1520 void TaskQueueImpl::ReportIpcTaskQueued(
1521     const Task& pending_task,
1522     const base::TimeDelta& time_since_disabled) {
1523   TRACE_EVENT_INSTANT(
1524       TRACE_DISABLED_BY_DEFAULT("lifecycles"), "task_posted_to_disabled_queue",
1525       [&](perfetto::EventContext ctx) {
1526         auto* proto = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
1527                           ->set_chrome_task_posted_to_disabled_queue();
1528         proto->set_time_since_disabled_ms(
1529             checked_cast<uint64_t>(time_since_disabled.InMilliseconds()));
1530         proto->set_ipc_hash(pending_task.ipc_hash);
1531         proto->set_source_location_iid(
1532             base::trace_event::InternedSourceLocation::Get(
1533                 &ctx, pending_task.posted_from));
1534       });
1535 }
1536 
OnQueueUnblocked()1537 void TaskQueueImpl::OnQueueUnblocked() {
1538   DCHECK(IsQueueEnabled());
1539   DCHECK(!BlockedByFence());
1540 
1541   main_thread_only().enqueue_order_at_which_we_became_unblocked =
1542       sequence_manager_->GetNextSequenceNumber();
1543   if (GetQueuePriority() <= DefaultPriority()) {
1544     // We are default priority or more important so update
1545     // |enqueue_order_at_which_we_became_unblocked_with_normal_priority|.
1546     main_thread_only()
1547         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
1548         main_thread_only().enqueue_order_at_which_we_became_unblocked;
1549   }
1550 }
1551 
1552 std::unique_ptr<TaskQueue::QueueEnabledVoter>
CreateQueueEnabledVoter()1553 TaskQueueImpl::CreateQueueEnabledVoter() {
1554   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1555   return WrapUnique(
1556       new TaskQueue::QueueEnabledVoter(voter_weak_ptr_factory_.GetWeakPtr()));
1557 }
1558 
AddQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1559 void TaskQueueImpl::AddQueueEnabledVoter(bool voter_is_enabled,
1560                                          TaskQueue::QueueEnabledVoter& voter) {
1561   ++main_thread_only().voter_count;
1562   if (voter_is_enabled) {
1563     ++main_thread_only().enabled_voter_count;
1564   }
1565 }
1566 
RemoveQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1567 void TaskQueueImpl::RemoveQueueEnabledVoter(
1568     bool voter_is_enabled,
1569     TaskQueue::QueueEnabledVoter& voter) {
1570   bool was_enabled = AreAllQueueEnabledVotersEnabled();
1571   if (voter_is_enabled) {
1572     --main_thread_only().enabled_voter_count;
1573     DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1574   }
1575 
1576   --main_thread_only().voter_count;
1577   DCHECK_GE(main_thread_only().voter_count, 0);
1578 
1579   bool is_enabled = AreAllQueueEnabledVotersEnabled();
1580   if (was_enabled != is_enabled) {
1581     SetQueueEnabled(is_enabled);
1582   }
1583 }
1584 
OnQueueEnabledVoteChanged(bool enabled)1585 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
1586   bool was_enabled = AreAllQueueEnabledVotersEnabled();
1587   if (enabled) {
1588     ++main_thread_only().enabled_voter_count;
1589     DCHECK_LE(main_thread_only().enabled_voter_count,
1590               main_thread_only().voter_count);
1591   } else {
1592     --main_thread_only().enabled_voter_count;
1593     DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1594   }
1595 
1596   bool is_enabled = AreAllQueueEnabledVotersEnabled();
1597   if (was_enabled != is_enabled) {
1598     SetQueueEnabled(is_enabled);
1599   }
1600 }
1601 
CompleteInitializationOnBoundThread()1602 void TaskQueueImpl::CompleteInitializationOnBoundThread() {
1603   voter_weak_ptr_factory_.BindToCurrentSequence(
1604       subtle::BindWeakPtrFactoryPassKey());
1605 }
1606 
DefaultPriority() const1607 TaskQueue::QueuePriority TaskQueueImpl::DefaultPriority() const {
1608   return sequence_manager()->settings().priority_settings.default_priority();
1609 }
1610 
IsQueueEnabledFromAnyThread() const1611 bool TaskQueueImpl::IsQueueEnabledFromAnyThread() const {
1612   base::internal::CheckedAutoLock lock(any_thread_lock_);
1613   return any_thread_.is_enabled;
1614 }
1615 
1616 TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default;
1617 TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default;
1618 
push(Task task)1619 void TaskQueueImpl::DelayedIncomingQueue::push(Task task) {
1620   // TODO(crbug.com/40789839): Remove this once the cause of corrupted tasks in
1621   // the queue is understood.
1622   CHECK(task.task);
1623   if (task.is_high_res)
1624     pending_high_res_tasks_++;
1625   queue_.insert(std::move(task));
1626 }
1627 
remove(HeapHandle heap_handle)1628 void TaskQueueImpl::DelayedIncomingQueue::remove(HeapHandle heap_handle) {
1629   DCHECK(!empty());
1630   DCHECK_LT(heap_handle.index(), queue_.size());
1631   Task task = queue_.take(heap_handle);
1632   if (task.is_high_res) {
1633     pending_high_res_tasks_--;
1634     DCHECK_GE(pending_high_res_tasks_, 0);
1635   }
1636 }
1637 
take_top()1638 Task TaskQueueImpl::DelayedIncomingQueue::take_top() {
1639   DCHECK(!empty());
1640   if (queue_.top().is_high_res) {
1641     pending_high_res_tasks_--;
1642     DCHECK_GE(pending_high_res_tasks_, 0);
1643   }
1644   return queue_.take_top();
1645 }
1646 
swap(DelayedIncomingQueue * rhs)1647 void TaskQueueImpl::DelayedIncomingQueue::swap(DelayedIncomingQueue* rhs) {
1648   std::swap(pending_high_res_tasks_, rhs->pending_high_res_tasks_);
1649   std::swap(queue_, rhs->queue_);
1650 }
1651 
SweepCancelledTasks(SequenceManagerImpl * sequence_manager)1652 void TaskQueueImpl::DelayedIncomingQueue::SweepCancelledTasks(
1653     SequenceManagerImpl* sequence_manager) {
1654   // Note: IntrusiveHeap::EraseIf() is safe against re-entrancy caused by
1655   // deleted tasks posting new tasks.
1656   queue_.EraseIf([this](const Task& task) {
1657     if (task.task.IsCancelled()) {
1658       if (task.is_high_res) {
1659         --pending_high_res_tasks_;
1660         DCHECK_GE(pending_high_res_tasks_, 0);
1661       }
1662       return true;
1663     }
1664     return false;
1665   });
1666 }
1667 
AsValue(TimeTicks now) const1668 Value::List TaskQueueImpl::DelayedIncomingQueue::AsValue(TimeTicks now) const {
1669   Value::List state;
1670   for (const Task& task : queue_)
1671     state.Append(TaskAsValue(task, now));
1672   return state;
1673 }
1674 
operator ()(const Task & lhs,const Task & rhs) const1675 bool TaskQueueImpl::DelayedIncomingQueue::Compare::operator()(
1676     const Task& lhs,
1677     const Task& rhs) const {
1678   // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
1679   // not be the first task eligible to run, but tasks will always become ripe
1680   // before their latest_delayed_run_time().
1681   const TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
1682   const TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
1683   if (lhs_latest_delayed_run_time == rhs_latest_delayed_run_time)
1684     return lhs.sequence_num > rhs.sequence_num;
1685   return lhs_latest_delayed_run_time > rhs_latest_delayed_run_time;
1686 }
1687 
OnTaskPostedCallbackHandleImpl(TaskQueueImpl * task_queue_impl,scoped_refptr<const AssociatedThreadId> associated_thread)1688 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::OnTaskPostedCallbackHandleImpl(
1689     TaskQueueImpl* task_queue_impl,
1690     scoped_refptr<const AssociatedThreadId> associated_thread)
1691     : task_queue_impl_(task_queue_impl),
1692       associated_thread_(std::move(associated_thread)) {
1693   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1694 }
1695 
1696 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::
~OnTaskPostedCallbackHandleImpl()1697     ~OnTaskPostedCallbackHandleImpl() {
1698   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1699   if (task_queue_impl_)
1700     task_queue_impl_->RemoveOnTaskPostedHandler(this);
1701 }
1702 
1703 }  // namespace internal
1704 }  // namespace sequence_manager
1705 }  // namespace base
1706