• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/task_queue_impl.h"
6 
7 #include <inttypes.h>
8 
9 #include <memory>
10 #include <utility>
11 
12 #include "base/check.h"
13 #include "base/compiler_specific.h"
14 #include "base/feature_list.h"
15 #include "base/logging.h"
16 #include "base/memory/scoped_refptr.h"
17 #include "base/metrics/histogram_macros.h"
18 #include "base/notreached.h"
19 #include "base/observer_list.h"
20 #include "base/ranges/algorithm.h"
21 #include "base/strings/stringprintf.h"
22 #include "base/task/common/scoped_defer_task_posting.h"
23 #include "base/task/default_delayed_task_handle_delegate.h"
24 #include "base/task/sequence_manager/associated_thread_id.h"
25 #include "base/task/sequence_manager/delayed_task_handle_delegate.h"
26 #include "base/task/sequence_manager/fence.h"
27 #include "base/task/sequence_manager/sequence_manager_impl.h"
28 #include "base/task/sequence_manager/task_order.h"
29 #include "base/task/sequence_manager/wake_up_queue.h"
30 #include "base/task/sequence_manager/work_queue.h"
31 #include "base/task/task_features.h"
32 #include "base/task/task_observer.h"
33 #include "base/threading/thread_restrictions.h"
34 #include "base/time/time.h"
35 #include "base/trace_event/base_tracing.h"
36 #include "base/types/pass_key.h"
37 #include "build/build_config.h"
38 #include "third_party/abseil-cpp/absl/container/inlined_vector.h"
39 #include "third_party/abseil-cpp/absl/types/optional.h"
40 
41 namespace base {
42 namespace sequence_manager {
43 
44 namespace internal {
45 
46 namespace {
47 
48 // An atomic is used here because the value is queried from other threads when
49 // tasks are posted cross-thread, which can race with its initialization.
50 std::atomic<base::TimeDelta> g_max_precise_delay{kDefaultMaxPreciseDelay};
51 #if BUILDFLAG(IS_WIN)
52 // An atomic is used here because the flag is queried from other threads when
53 // tasks are posted cross-thread, which can race with its initialization.
54 std::atomic_bool g_explicit_high_resolution_timer_win{true};
55 #endif  // BUILDFLAG(IS_WIN)
56 
57 }  // namespace
58 
GuardedTaskPoster(TaskQueueImpl * outer)59 TaskQueueImpl::GuardedTaskPoster::GuardedTaskPoster(TaskQueueImpl* outer)
60     : outer_(outer) {}
61 
~GuardedTaskPoster()62 TaskQueueImpl::GuardedTaskPoster::~GuardedTaskPoster() {}
63 
PostTask(PostedTask task)64 bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) {
65   // Do not process new PostTasks while we are handling a PostTask (tracing
66   // has to do this) as it can lead to a deadlock and defer it instead.
67   ScopedDeferTaskPosting disallow_task_posting;
68 
69   auto token = operations_controller_.TryBeginOperation();
70   if (!token)
71     return false;
72 
73   outer_->PostTask(std::move(task));
74   return true;
75 }
76 
PostCancelableTask(PostedTask task)77 DelayedTaskHandle TaskQueueImpl::GuardedTaskPoster::PostCancelableTask(
78     PostedTask task) {
79   // Do not process new PostTasks while we are handling a PostTask (tracing
80   // has to do this) as it can lead to a deadlock and defer it instead.
81   ScopedDeferTaskPosting disallow_task_posting;
82 
83   auto token = operations_controller_.TryBeginOperation();
84   if (!token)
85     return DelayedTaskHandle();
86 
87   auto delayed_task_handle_delegate =
88       std::make_unique<DelayedTaskHandleDelegate>(outer_);
89   task.delayed_task_handle_delegate = delayed_task_handle_delegate->AsWeakPtr();
90 
91   outer_->PostTask(std::move(task));
92   DCHECK(delayed_task_handle_delegate->IsValid());
93   return DelayedTaskHandle(std::move(delayed_task_handle_delegate));
94 }
95 
TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,scoped_refptr<const AssociatedThreadId> associated_thread,TaskType task_type)96 TaskQueueImpl::TaskRunner::TaskRunner(
97     scoped_refptr<GuardedTaskPoster> task_poster,
98     scoped_refptr<const AssociatedThreadId> associated_thread,
99     TaskType task_type)
100     : task_poster_(std::move(task_poster)),
101       associated_thread_(std::move(associated_thread)),
102       task_type_(task_type) {}
103 
~TaskRunner()104 TaskQueueImpl::TaskRunner::~TaskRunner() {}
105 
PostDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)106 bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,
107                                                 OnceClosure callback,
108                                                 TimeDelta delay) {
109   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
110                                            delay, Nestable::kNestable,
111                                            task_type_));
112 }
113 
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)114 bool TaskQueueImpl::TaskRunner::PostDelayedTaskAt(
115     subtle::PostDelayedTaskPassKey,
116     const Location& location,
117     OnceClosure callback,
118     TimeTicks delayed_run_time,
119     base::subtle::DelayPolicy delay_policy) {
120   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
121                                            delayed_run_time, delay_policy,
122                                            Nestable::kNestable, task_type_));
123 }
124 
PostCancelableDelayedTaskAt(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)125 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTaskAt(
126     subtle::PostDelayedTaskPassKey pass_key,
127     const Location& location,
128     OnceClosure callback,
129     TimeTicks delayed_run_time,
130     base::subtle::DelayPolicy delay_policy) {
131   return task_poster_->PostCancelableTask(
132       PostedTask(this, std::move(callback), location, delayed_run_time,
133                  delay_policy, Nestable::kNestable, task_type_));
134 }
135 
PostCancelableDelayedTask(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeDelta delay)136 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTask(
137     subtle::PostDelayedTaskPassKey pass_key,
138     const Location& location,
139     OnceClosure callback,
140     TimeDelta delay) {
141   return task_poster_->PostCancelableTask(
142       PostedTask(this, std::move(callback), location, delay,
143                  Nestable::kNestable, task_type_));
144 }
145 
PostNonNestableDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)146 bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask(
147     const Location& location,
148     OnceClosure callback,
149     TimeDelta delay) {
150   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
151                                            delay, Nestable::kNonNestable,
152                                            task_type_));
153 }
154 
RunsTasksInCurrentSequence() const155 bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const {
156   return associated_thread_->IsBoundToCurrentThread();
157 }
158 
159 // static
InitializeFeatures()160 void TaskQueueImpl::InitializeFeatures() {
161   g_max_precise_delay = kMaxPreciseDelay.Get();
162 #if BUILDFLAG(IS_WIN)
163   g_explicit_high_resolution_timer_win.store(
164       FeatureList::IsEnabled(kExplicitHighResolutionTimerWin),
165       std::memory_order_relaxed);
166 #endif  // BUILDFLAG(IS_WIN)
167 }
168 
TaskQueueImpl(SequenceManagerImpl * sequence_manager,WakeUpQueue * wake_up_queue,const TaskQueue::Spec & spec)169 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
170                              WakeUpQueue* wake_up_queue,
171                              const TaskQueue::Spec& spec)
172     : name_(spec.name),
173       sequence_manager_(sequence_manager),
174       associated_thread_(sequence_manager
175                              ? sequence_manager->associated_thread()
176                              : AssociatedThreadId::CreateBound()),
177       task_poster_(MakeRefCounted<GuardedTaskPoster>(this)),
178       main_thread_only_(this, wake_up_queue),
179       empty_queues_to_reload_handle_(
180           sequence_manager
181               ? sequence_manager->GetFlagToRequestReloadForEmptyQueue(this)
182               : AtomicFlagSet::AtomicFlag()),
183       should_monitor_quiescence_(spec.should_monitor_quiescence),
184       should_notify_observers_(spec.should_notify_observers),
185       delayed_fence_allowed_(spec.delayed_fence_allowed),
186       default_task_runner_(CreateTaskRunner(kTaskTypeNone)) {
187   UpdateCrossThreadQueueStateLocked();
188   // SequenceManager can't be set later, so we need to prevent task runners
189   // from posting any tasks.
190   if (sequence_manager_)
191     task_poster_->StartAcceptingOperations();
192 }
193 
~TaskQueueImpl()194 TaskQueueImpl::~TaskQueueImpl() {
195 #if DCHECK_IS_ON()
196   base::internal::CheckedAutoLock lock(any_thread_lock_);
197   // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
198   // contains a strong reference to this TaskQueueImpl and the
199   // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
200   // queues.
201   DCHECK(any_thread_.unregistered)
202       << "UnregisterTaskQueue must be called first!";
203 #endif
204 }
205 
206 TaskQueueImpl::AnyThread::AnyThread() = default;
207 TaskQueueImpl::AnyThread::~AnyThread() = default;
208 
209 TaskQueueImpl::AnyThread::TracingOnly::TracingOnly() = default;
210 TaskQueueImpl::AnyThread::TracingOnly::~TracingOnly() = default;
211 
MainThreadOnly(TaskQueueImpl * task_queue,WakeUpQueue * wake_up_queue)212 TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue,
213                                               WakeUpQueue* wake_up_queue)
214     : wake_up_queue(wake_up_queue),
215       delayed_work_queue(
216           new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
217       immediate_work_queue(new WorkQueue(task_queue,
218                                          "immediate",
219                                          WorkQueue::QueueType::kImmediate)) {}
220 
221 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
222 
CreateTaskRunner(TaskType task_type) const223 scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(
224     TaskType task_type) const {
225   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
226   return MakeRefCounted<TaskRunner>(task_poster_, associated_thread_,
227                                     task_type);
228 }
229 
task_runner() const230 const scoped_refptr<SingleThreadTaskRunner>& TaskQueueImpl::task_runner()
231     const {
232   return default_task_runner_;
233 }
234 
UnregisterTaskQueue()235 void TaskQueueImpl::UnregisterTaskQueue() {
236   TRACE_EVENT0("base", "TaskQueueImpl::UnregisterTaskQueue");
237   // Invalidate weak pointers now so no voters reference this in a partially
238   // torn down state.
239   voter_weak_ptr_factory_.InvalidateWeakPtrs();
240   // Detach task runners.
241   {
242     ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
243     task_poster_->ShutdownAndWaitForZeroOperations();
244   }
245 
246   TaskDeque immediate_incoming_queue;
247   base::flat_map<raw_ptr<OnTaskPostedCallbackHandleImpl>, OnTaskPostedHandler>
248       on_task_posted_handlers;
249 
250   {
251     base::internal::CheckedAutoLock lock(any_thread_lock_);
252     any_thread_.unregistered = true;
253     immediate_incoming_queue.swap(any_thread_.immediate_incoming_queue);
254 
255     for (auto& handler : any_thread_.on_task_posted_handlers)
256       handler.first->UnregisterTaskQueue();
257     any_thread_.on_task_posted_handlers.swap(on_task_posted_handlers);
258   }
259 
260   if (main_thread_only().wake_up_queue) {
261     main_thread_only().wake_up_queue->UnregisterQueue(this);
262   }
263 
264   main_thread_only().on_task_started_handler = OnTaskStartedHandler();
265   main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
266   main_thread_only().wake_up_queue = nullptr;
267   main_thread_only().throttler = nullptr;
268   empty_queues_to_reload_handle_.ReleaseAtomicFlag();
269 
270   // It is possible for a task to hold a scoped_refptr to this, which
271   // will lead to TaskQueueImpl destructor being called when deleting a task.
272   // To avoid use-after-free, we need to clear all fields of a task queue
273   // before starting to delete the tasks.
274   // All work queues and priority queues containing tasks should be moved to
275   // local variables on stack (std::move for unique_ptrs and swap for queues)
276   // before clearing them and deleting tasks.
277 
278   // Flush the queues outside of the lock because TSAN complains about a lock
279   // order inversion for tasks that are posted from within a lock, with a
280   // destructor that acquires the same lock.
281 
282   DelayedIncomingQueue delayed_incoming_queue;
283   delayed_incoming_queue.swap(&main_thread_only().delayed_incoming_queue);
284   std::unique_ptr<WorkQueue> immediate_work_queue =
285       std::move(main_thread_only().immediate_work_queue);
286   std::unique_ptr<WorkQueue> delayed_work_queue =
287       std::move(main_thread_only().delayed_work_queue);
288 }
289 
GetName() const290 const char* TaskQueueImpl::GetName() const {
291   return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_);
292 }
293 
GetProtoName() const294 QueueName TaskQueueImpl::GetProtoName() const {
295   return name_;
296 }
297 
PostTask(PostedTask task)298 void TaskQueueImpl::PostTask(PostedTask task) {
299   CurrentThread current_thread =
300       associated_thread_->IsBoundToCurrentThread()
301           ? TaskQueueImpl::CurrentThread::kMainThread
302           : TaskQueueImpl::CurrentThread::kNotMainThread;
303 
304 #if DCHECK_IS_ON()
305   TimeDelta delay = GetTaskDelayAdjustment(current_thread);
306   if (absl::holds_alternative<base::TimeTicks>(
307           task.delay_or_delayed_run_time)) {
308     absl::get<base::TimeTicks>(task.delay_or_delayed_run_time) += delay;
309   } else {
310     absl::get<base::TimeDelta>(task.delay_or_delayed_run_time) += delay;
311   }
312 #endif  // DCHECK_IS_ON()
313 
314   if (!task.is_delayed()) {
315     PostImmediateTaskImpl(std::move(task), current_thread);
316   } else {
317     PostDelayedTaskImpl(std::move(task), current_thread);
318   }
319 }
320 
RemoveCancelableTask(HeapHandle heap_handle)321 void TaskQueueImpl::RemoveCancelableTask(HeapHandle heap_handle) {
322   // Can only cancel from the current thread.
323   DCHECK(associated_thread_->IsBoundToCurrentThread());
324   DCHECK(heap_handle.IsValid());
325 
326   main_thread_only().delayed_incoming_queue.remove(heap_handle);
327 
328   // Only update the delayed wake up if the top task is removed.
329   if (heap_handle.index() == 0u) {
330     LazyNow lazy_now(sequence_manager_->main_thread_clock());
331     UpdateWakeUp(&lazy_now);
332   }
333 }
334 
GetTaskDelayAdjustment(CurrentThread current_thread)335 TimeDelta TaskQueueImpl::GetTaskDelayAdjustment(CurrentThread current_thread) {
336 #if DCHECK_IS_ON()
337   if (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread) {
338     base::internal::CheckedAutoLock lock(any_thread_lock_);
339     // Add a per-priority delay to cross thread tasks. This can help diagnose
340     // scheduler induced flakiness by making things flake most of the time.
341     return sequence_manager_->settings()
342         .priority_settings
343         .per_priority_cross_thread_task_delay()[any_thread_.queue_set_index];
344   } else {
345     return sequence_manager_->settings()
346         .priority_settings.per_priority_same_thread_task_delay()
347             [main_thread_only().immediate_work_queue->work_queue_set_index()];
348   }
349 #else
350   // No delay adjustment.
351   return TimeDelta();
352 #endif  // DCHECK_IS_ON()
353 }
354 
PostImmediateTaskImpl(PostedTask task,CurrentThread current_thread)355 void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,
356                                           CurrentThread current_thread) {
357   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
358   // for details.
359   CHECK(task.callback);
360 
361   bool should_schedule_work = false;
362   {
363     // TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue
364     // See https://crbug.com/901800
365     base::internal::CheckedAutoLock lock(any_thread_lock_);
366     bool add_queue_time_to_tasks = sequence_manager_->GetAddQueueTimeToTasks();
367     TimeTicks queue_time;
368     if (add_queue_time_to_tasks || delayed_fence_allowed_)
369       queue_time = sequence_manager_->any_thread_clock()->NowTicks();
370 
371     // The sequence number must be incremented atomically with pushing onto the
372     // incoming queue. Otherwise if there are several threads posting task we
373     // risk breaking the assumption that sequence numbers increase monotonically
374     // within a queue.
375     EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
376     bool was_immediate_incoming_queue_empty =
377         any_thread_.immediate_incoming_queue.empty();
378     any_thread_.immediate_incoming_queue.push_back(
379         Task(std::move(task), sequence_number, sequence_number, queue_time));
380 
381 #if DCHECK_IS_ON()
382     any_thread_.immediate_incoming_queue.back().cross_thread_ =
383         (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread);
384 #endif
385 
386     sequence_manager_->WillQueueTask(
387         &any_thread_.immediate_incoming_queue.back());
388     MaybeReportIpcTaskQueuedFromAnyThreadLocked(
389         any_thread_.immediate_incoming_queue.back());
390 
391     for (auto& handler : any_thread_.on_task_posted_handlers) {
392       DCHECK(!handler.second.is_null());
393       handler.second.Run(any_thread_.immediate_incoming_queue.back());
394     }
395 
396     // If this queue was completely empty, then the SequenceManager needs to be
397     // informed so it can reload the work queue and add us to the
398     // TaskQueueSelector which can only be done from the main thread. In
399     // addition it may need to schedule a DoWork if this queue isn't blocked.
400     if (was_immediate_incoming_queue_empty &&
401         any_thread_.immediate_work_queue_empty) {
402       empty_queues_to_reload_handle_.SetActive(true);
403       should_schedule_work =
404           any_thread_.post_immediate_task_should_schedule_work;
405     }
406   }
407 
408   // On windows it's important to call this outside of a lock because calling a
409   // pump while holding a lock can result in priority inversions. See
410   // http://shortn/_ntnKNqjDQT for a discussion.
411   //
412   // Calling ScheduleWork outside the lock should be safe, only the main thread
413   // can mutate |any_thread_.post_immediate_task_should_schedule_work|. If it
414   // transitions to false we call ScheduleWork redundantly that's harmless. If
415   // it transitions to true, the side effect of
416   // |empty_queues_to_reload_handle_SetActive(true)| is guaranteed to be picked
417   // up by the ThreadController's call to SequenceManagerImpl::DelayTillNextTask
418   // when it computes what continuation (if any) is needed.
419   if (should_schedule_work)
420     sequence_manager_->ScheduleWork();
421 
422   TraceQueueSize();
423 }
424 
PostDelayedTaskImpl(PostedTask posted_task,CurrentThread current_thread)425 void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task,
426                                         CurrentThread current_thread) {
427   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
428   // for details.
429   CHECK(posted_task.callback);
430 
431   if (current_thread == CurrentThread::kMainThread) {
432     LazyNow lazy_now(sequence_manager_->main_thread_clock());
433     Task pending_task = MakeDelayedTask(std::move(posted_task), &lazy_now);
434     sequence_manager_->MaybeAddLeewayToTask(pending_task);
435     PushOntoDelayedIncomingQueueFromMainThread(
436         std::move(pending_task), &lazy_now,
437         /* notify_task_annotator */ true);
438   } else {
439     LazyNow lazy_now(sequence_manager_->any_thread_clock());
440     PushOntoDelayedIncomingQueue(
441         MakeDelayedTask(std::move(posted_task), &lazy_now));
442   }
443 }
444 
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,LazyNow * lazy_now,bool notify_task_annotator)445 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
446     Task pending_task,
447     LazyNow* lazy_now,
448     bool notify_task_annotator) {
449 #if DCHECK_IS_ON()
450   pending_task.cross_thread_ = false;
451 #endif
452 
453   if (notify_task_annotator) {
454     sequence_manager_->WillQueueTask(&pending_task);
455     MaybeReportIpcTaskQueuedFromMainThread(pending_task);
456   }
457   main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
458   UpdateWakeUp(lazy_now);
459 
460   TraceQueueSize();
461 }
462 
PushOntoDelayedIncomingQueue(Task pending_task)463 void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {
464   sequence_manager_->WillQueueTask(&pending_task);
465   MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(pending_task);
466 
467 #if DCHECK_IS_ON()
468   pending_task.cross_thread_ = true;
469 #endif
470 
471   // TODO(altimin): Add a copy method to Task to capture metadata here.
472   auto task_runner = pending_task.task_runner;
473   const auto task_type = pending_task.task_type;
474   PostImmediateTaskImpl(
475       PostedTask(std::move(task_runner),
476                  BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
477                           Unretained(this), std::move(pending_task)),
478                  FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),
479       CurrentThread::kNotMainThread);
480 }
481 
ScheduleDelayedWorkTask(Task pending_task)482 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
483   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
484   sequence_manager_->MaybeAddLeewayToTask(pending_task);
485   TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks();
486   LazyNow lazy_now(now);
487   // A delayed task is ready to run as soon as earliest_delayed_run_time() is
488   // reached.
489   if (pending_task.earliest_delayed_run_time() <= now) {
490     // If |delayed_run_time| is in the past then push it onto the work queue
491     // immediately. To ensure the right task ordering we need to temporarily
492     // push it onto the |delayed_incoming_queue|.
493     pending_task.delayed_run_time = now;
494     main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
495     MoveReadyDelayedTasksToWorkQueue(
496         &lazy_now, sequence_manager_->GetNextSequenceNumber());
497   } else {
498     // If |delayed_run_time| is in the future we can queue it as normal.
499     PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
500                                                &lazy_now, false);
501   }
502   TraceQueueSize();
503 }
504 
ReloadEmptyImmediateWorkQueue()505 void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
506   DCHECK(main_thread_only().immediate_work_queue->Empty());
507   main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();
508 
509   if (main_thread_only().throttler && IsQueueEnabled()) {
510     main_thread_only().throttler->OnHasImmediateTask();
511   }
512 }
513 
TakeImmediateIncomingQueueTasks(TaskDeque * queue)514 void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
515   DCHECK(queue->empty());
516   // Now is a good time to consider reducing the empty queue's capacity if we're
517   // wasting memory, before we make it the `immediate_incoming_queue`.
518   queue->MaybeShrinkQueue();
519 
520   base::internal::CheckedAutoLock lock(any_thread_lock_);
521   queue->swap(any_thread_.immediate_incoming_queue);
522 
523   // Activate delayed fence if necessary. This is ideologically similar to
524   // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
525   // from any thread we can't generate an enqueue order for the fence there,
526   // so we have to check all immediate tasks and use their enqueue order for
527   // a fence.
528   if (main_thread_only().delayed_fence) {
529     for (const Task& task : *queue) {
530       DCHECK(!task.queue_time.is_null());
531       DCHECK(task.delayed_run_time.is_null());
532       if (task.queue_time >= main_thread_only().delayed_fence.value()) {
533         main_thread_only().delayed_fence = absl::nullopt;
534         DCHECK(!main_thread_only().current_fence);
535         main_thread_only().current_fence = Fence(task.task_order());
536         // Do not trigger WorkQueueSets notification when taking incoming
537         // immediate queue.
538         main_thread_only().immediate_work_queue->InsertFenceSilently(
539             *main_thread_only().current_fence);
540         main_thread_only().delayed_work_queue->InsertFenceSilently(
541             *main_thread_only().current_fence);
542         break;
543       }
544     }
545   }
546 
547   UpdateCrossThreadQueueStateLocked();
548 }
549 
IsEmpty() const550 bool TaskQueueImpl::IsEmpty() const {
551   if (!main_thread_only().delayed_work_queue->Empty() ||
552       !main_thread_only().delayed_incoming_queue.empty() ||
553       !main_thread_only().immediate_work_queue->Empty()) {
554     return false;
555   }
556 
557   base::internal::CheckedAutoLock lock(any_thread_lock_);
558   return any_thread_.immediate_incoming_queue.empty();
559 }
560 
GetNumberOfPendingTasks() const561 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
562   size_t task_count = 0;
563   task_count += main_thread_only().delayed_work_queue->Size();
564   task_count += main_thread_only().delayed_incoming_queue.size();
565   task_count += main_thread_only().immediate_work_queue->Size();
566 
567   base::internal::CheckedAutoLock lock(any_thread_lock_);
568   task_count += any_thread_.immediate_incoming_queue.size();
569   return task_count;
570 }
571 
HasTaskToRunImmediatelyOrReadyDelayedTask() const572 bool TaskQueueImpl::HasTaskToRunImmediatelyOrReadyDelayedTask() const {
573   // Any work queue tasks count as immediate work.
574   if (!main_thread_only().delayed_work_queue->Empty() ||
575       !main_thread_only().immediate_work_queue->Empty()) {
576     return true;
577   }
578 
579   // Tasks on |delayed_incoming_queue| that could run now, count as
580   // immediate work.
581   if (!main_thread_only().delayed_incoming_queue.empty() &&
582       main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
583           sequence_manager_->main_thread_clock()->NowTicks()) {
584     return true;
585   }
586 
587   // Finally tasks on |immediate_incoming_queue| count as immediate work.
588   base::internal::CheckedAutoLock lock(any_thread_lock_);
589   return !any_thread_.immediate_incoming_queue.empty();
590 }
591 
GetNextDesiredWakeUp()592 absl::optional<WakeUp> TaskQueueImpl::GetNextDesiredWakeUp() {
593   // Note we don't scheduled a wake-up for disabled queues.
594   if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
595     return absl::nullopt;
596 
597   const auto& top_task = main_thread_only().delayed_incoming_queue.top();
598 
599   // High resolution is needed if the queue contains high resolution tasks and
600   // has a priority index <= kNormalPriority (precise execution time is
601   // unnecessary for a low priority queue).
602   WakeUpResolution resolution = has_pending_high_resolution_tasks() &&
603                                         GetQueuePriority() <= DefaultPriority()
604                                     ? WakeUpResolution::kHigh
605                                     : WakeUpResolution::kLow;
606   subtle::DelayPolicy delay_policy = top_task.delay_policy;
607   if (GetQueuePriority() > DefaultPriority() &&
608       delay_policy == subtle::DelayPolicy::kPrecise) {
609     delay_policy = subtle::DelayPolicy::kFlexibleNoSooner;
610   }
611   return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution,
612                 delay_policy};
613 }
614 
OnWakeUp(LazyNow * lazy_now,EnqueueOrder enqueue_order)615 void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) {
616   MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order);
617   if (main_thread_only().throttler) {
618     main_thread_only().throttler->OnWakeUp(lazy_now);
619   }
620 }
621 
RemoveAllCanceledDelayedTasksFromFront(LazyNow * lazy_now)622 bool TaskQueueImpl::RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) {
623   // Because task destructors could have a side-effect of posting new tasks, we
624   // move all the cancelled tasks into a temporary container before deleting
625   // them. This is to avoid the queue from changing while iterating over it.
626   absl::InlinedVector<Task, 8> tasks_to_delete;
627 
628   while (!main_thread_only().delayed_incoming_queue.empty()) {
629     const Task& task = main_thread_only().delayed_incoming_queue.top();
630     CHECK(task.task);
631     if (!task.task.IsCancelled())
632       break;
633 
634     tasks_to_delete.push_back(
635         main_thread_only().delayed_incoming_queue.take_top());
636   }
637 
638   if (!tasks_to_delete.empty()) {
639     UpdateWakeUp(lazy_now);
640     return true;
641   }
642 
643   return false;
644 }
645 
MoveReadyDelayedTasksToWorkQueue(LazyNow * lazy_now,EnqueueOrder enqueue_order)646 void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue(
647     LazyNow* lazy_now,
648     EnqueueOrder enqueue_order) {
649   // Enqueue all delayed tasks that should be running now, skipping any that
650   // have been canceled.
651   WorkQueue::TaskPusher delayed_work_queue_task_pusher(
652       main_thread_only().delayed_work_queue->CreateTaskPusher());
653 
654   // Because task destructors could have a side-effect of posting new tasks, we
655   // move all the cancelled tasks into a temporary container before deleting
656   // them. This is to avoid the queue from changing while iterating over it.
657   absl::InlinedVector<Task, 8> tasks_to_delete;
658 
659   while (!main_thread_only().delayed_incoming_queue.empty()) {
660     const Task& task = main_thread_only().delayed_incoming_queue.top();
661     CHECK(task.task);
662 
663     // Leave the top task alone if it hasn't been canceled and it is not ready.
664     const bool is_cancelled = task.task.IsCancelled();
665     if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now())
666       break;
667 
668     Task ready_task = main_thread_only().delayed_incoming_queue.take_top();
669     if (is_cancelled) {
670       tasks_to_delete.push_back(std::move(ready_task));
671       continue;
672     }
673 
674     // The top task is ready to run. Move it to the delayed work queue.
675 #if DCHECK_IS_ON()
676     if (sequence_manager_->settings().log_task_delay_expiry)
677       VLOG(0) << GetName() << " Delay expired for "
678               << ready_task.posted_from.ToString();
679 #endif  // DCHECK_IS_ON()
680     DCHECK(!ready_task.delayed_run_time.is_null());
681     DCHECK(!ready_task.enqueue_order_set());
682     ready_task.set_enqueue_order(enqueue_order);
683     ActivateDelayedFenceIfNeeded(ready_task);
684 
685     delayed_work_queue_task_pusher.Push(std::move(ready_task));
686   }
687 
688   // Explicitly delete tasks last.
689   tasks_to_delete.clear();
690 
691   UpdateWakeUp(lazy_now);
692 }
693 
TraceQueueSize() const694 void TaskQueueImpl::TraceQueueSize() const {
695   bool is_tracing;
696   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
697       TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
698   if (!is_tracing)
699     return;
700 
701   // It's only safe to access the work queues from the main thread.
702   // TODO(alexclarke): We should find another way of tracing this
703   if (!associated_thread_->IsBoundToCurrentThread())
704     return;
705 
706   size_t total_task_count;
707   {
708     base::internal::CheckedAutoLock lock(any_thread_lock_);
709     total_task_count = any_thread_.immediate_incoming_queue.size() +
710                        main_thread_only().immediate_work_queue->Size() +
711                        main_thread_only().delayed_work_queue->Size() +
712                        main_thread_only().delayed_incoming_queue.size();
713   }
714   TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
715                  total_task_count);
716 }
717 
SetQueuePriority(TaskQueue::QueuePriority priority)718 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
719   const TaskQueue::QueuePriority previous_priority = GetQueuePriority();
720   if (priority == previous_priority)
721     return;
722   sequence_manager_->main_thread_only().selector.SetQueuePriority(this,
723                                                                   priority);
724 
725 #if BUILDFLAG(IS_WIN)
726   // Updating queue priority can change whether high resolution timer is needed.
727   LazyNow lazy_now(sequence_manager_->main_thread_clock());
728   UpdateWakeUp(&lazy_now);
729 #endif
730 
731   if (priority > DefaultPriority()) {
732     // |priority| is now lower than the default, so update accordingly.
733     main_thread_only()
734         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
735         EnqueueOrder::max();
736   } else if (previous_priority > DefaultPriority()) {
737     // |priority| is no longer lower than the default, so record current
738     // sequence number.
739     DCHECK_EQ(
740         main_thread_only()
741             .enqueue_order_at_which_we_became_unblocked_with_normal_priority,
742         EnqueueOrder::max());
743     main_thread_only()
744         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
745         sequence_manager_->GetNextSequenceNumber();
746   }
747 }
748 
GetQueuePriority() const749 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
750   size_t set_index = immediate_work_queue()->work_queue_set_index();
751   DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
752   return static_cast<TaskQueue::QueuePriority>(set_index);
753 }
754 
AsValue(TimeTicks now,bool force_verbose) const755 Value::Dict TaskQueueImpl::AsValue(TimeTicks now, bool force_verbose) const {
756   base::internal::CheckedAutoLock lock(any_thread_lock_);
757   Value::Dict state;
758   state.Set("name", GetName());
759   if (any_thread_.unregistered) {
760     state.Set("unregistered", true);
761     return state;
762   }
763   DCHECK(main_thread_only().delayed_work_queue);
764   DCHECK(main_thread_only().immediate_work_queue);
765 
766   state.Set("task_queue_id",
767             StringPrintf("0x%" PRIx64, static_cast<uint64_t>(
768                                            reinterpret_cast<uintptr_t>(this))));
769   state.Set("enabled", IsQueueEnabled());
770   // TODO(crbug.com/1334256): Make base::Value able to store an int64_t and
771   // remove the various static_casts below.
772   state.Set("any_thread_.immediate_incoming_queuesize",
773             static_cast<int>(any_thread_.immediate_incoming_queue.size()));
774   state.Set("delayed_incoming_queue_size",
775             static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
776   state.Set("immediate_work_queue_size",
777             static_cast<int>(main_thread_only().immediate_work_queue->Size()));
778   state.Set("delayed_work_queue_size",
779             static_cast<int>(main_thread_only().delayed_work_queue->Size()));
780 
781   state.Set("any_thread_.immediate_incoming_queuecapacity",
782             static_cast<int>(any_thread_.immediate_incoming_queue.capacity()));
783   state.Set("immediate_work_queue_capacity",
784             static_cast<int>(immediate_work_queue()->Capacity()));
785   state.Set("delayed_work_queue_capacity",
786             static_cast<int>(delayed_work_queue()->Capacity()));
787 
788   if (!main_thread_only().delayed_incoming_queue.empty()) {
789     TimeDelta delay_to_next_task =
790         (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
791          sequence_manager_->main_thread_clock()->NowTicks());
792     state.Set("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF());
793   }
794   if (main_thread_only().current_fence) {
795     Value::Dict fence_state;
796     fence_state.Set(
797         "enqueue_order",
798         static_cast<int>(
799             main_thread_only().current_fence->task_order().enqueue_order()));
800     fence_state.Set("activated_in_wake_up", !main_thread_only()
801                                                  .current_fence->task_order()
802                                                  .delayed_run_time()
803                                                  .is_null());
804     state.Set("current_fence", std::move(fence_state));
805   }
806   if (main_thread_only().delayed_fence) {
807     state.Set("delayed_fence_seconds_from_now",
808               (main_thread_only().delayed_fence.value() - now).InSecondsF());
809   }
810 
811   bool verbose = false;
812   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
813       TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
814       &verbose);
815 
816   if (verbose || force_verbose) {
817     state.Set("immediate_incoming_queue",
818               QueueAsValue(any_thread_.immediate_incoming_queue, now));
819     state.Set("delayed_work_queue",
820               main_thread_only().delayed_work_queue->AsValue(now));
821     state.Set("immediate_work_queue",
822               main_thread_only().immediate_work_queue->AsValue(now));
823     state.Set("delayed_incoming_queue",
824               main_thread_only().delayed_incoming_queue.AsValue(now));
825   }
826   state.Set("priority", GetQueuePriority());
827   return state;
828 }
829 
AddTaskObserver(TaskObserver * task_observer)830 void TaskQueueImpl::AddTaskObserver(TaskObserver* task_observer) {
831   main_thread_only().task_observers.AddObserver(task_observer);
832 }
833 
RemoveTaskObserver(TaskObserver * task_observer)834 void TaskQueueImpl::RemoveTaskObserver(TaskObserver* task_observer) {
835   main_thread_only().task_observers.RemoveObserver(task_observer);
836 }
837 
NotifyWillProcessTask(const Task & task,bool was_blocked_or_low_priority)838 void TaskQueueImpl::NotifyWillProcessTask(const Task& task,
839                                           bool was_blocked_or_low_priority) {
840   DCHECK(should_notify_observers_);
841 
842   for (auto& observer : main_thread_only().task_observers)
843     observer.WillProcessTask(task, was_blocked_or_low_priority);
844 }
845 
NotifyDidProcessTask(const Task & task)846 void TaskQueueImpl::NotifyDidProcessTask(const Task& task) {
847   DCHECK(should_notify_observers_);
848   for (auto& observer : main_thread_only().task_observers)
849     observer.DidProcessTask(task);
850 }
851 
InsertFence(TaskQueue::InsertFencePosition position)852 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
853   Fence new_fence = position == TaskQueue::InsertFencePosition::kNow
854                         ? Fence::CreateWithEnqueueOrder(
855                               sequence_manager_->GetNextSequenceNumber())
856                         : Fence::BlockingFence();
857   InsertFence(new_fence);
858 }
859 
InsertFence(Fence current_fence)860 void TaskQueueImpl::InsertFence(Fence current_fence) {
861   // Only one fence may be present at a time.
862   main_thread_only().delayed_fence = absl::nullopt;
863 
864   absl::optional<Fence> previous_fence = main_thread_only().current_fence;
865 
866   // Tasks posted after this point will have a strictly higher enqueue order
867   // and will be blocked from running.
868   main_thread_only().current_fence = current_fence;
869   bool front_task_unblocked =
870       main_thread_only().immediate_work_queue->InsertFence(current_fence);
871   front_task_unblocked |=
872       main_thread_only().delayed_work_queue->InsertFence(current_fence);
873 
874   {
875     base::internal::CheckedAutoLock lock(any_thread_lock_);
876     if (!front_task_unblocked && previous_fence &&
877         previous_fence->task_order() < current_fence.task_order()) {
878       if (!any_thread_.immediate_incoming_queue.empty() &&
879           any_thread_.immediate_incoming_queue.front().task_order() >
880               previous_fence->task_order() &&
881           any_thread_.immediate_incoming_queue.front().task_order() <
882               current_fence.task_order()) {
883         front_task_unblocked = true;
884       }
885     }
886 
887     UpdateCrossThreadQueueStateLocked();
888   }
889 
890   if (IsQueueEnabled() && front_task_unblocked) {
891     OnQueueUnblocked();
892     sequence_manager_->ScheduleWork();
893   }
894 }
895 
InsertFenceAt(TimeTicks time)896 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
897   DCHECK(delayed_fence_allowed_)
898       << "Delayed fences are not supported for this queue. Enable them "
899          "explicitly in TaskQueue::Spec when creating the queue";
900 
901   // Task queue can have only one fence, delayed or not.
902   RemoveFence();
903   main_thread_only().delayed_fence = time;
904 }
905 
RemoveFence()906 void TaskQueueImpl::RemoveFence() {
907   absl::optional<Fence> previous_fence = main_thread_only().current_fence;
908   main_thread_only().current_fence = absl::nullopt;
909   main_thread_only().delayed_fence = absl::nullopt;
910 
911   bool front_task_unblocked =
912       main_thread_only().immediate_work_queue->RemoveFence();
913   front_task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
914 
915   {
916     base::internal::CheckedAutoLock lock(any_thread_lock_);
917     if (!front_task_unblocked && previous_fence) {
918       if (!any_thread_.immediate_incoming_queue.empty() &&
919           any_thread_.immediate_incoming_queue.front().task_order() >
920               previous_fence->task_order()) {
921         front_task_unblocked = true;
922       }
923     }
924 
925     UpdateCrossThreadQueueStateLocked();
926   }
927 
928   if (IsQueueEnabled() && front_task_unblocked) {
929     OnQueueUnblocked();
930     sequence_manager_->ScheduleWork();
931   }
932 }
933 
BlockedByFence() const934 bool TaskQueueImpl::BlockedByFence() const {
935   if (!main_thread_only().current_fence)
936     return false;
937 
938   if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
939       !main_thread_only().delayed_work_queue->BlockedByFence()) {
940     return false;
941   }
942 
943   base::internal::CheckedAutoLock lock(any_thread_lock_);
944   if (any_thread_.immediate_incoming_queue.empty())
945     return true;
946 
947   return any_thread_.immediate_incoming_queue.front().task_order() >
948          main_thread_only().current_fence->task_order();
949 }
950 
HasActiveFence()951 bool TaskQueueImpl::HasActiveFence() {
952   if (main_thread_only().delayed_fence &&
953       sequence_manager_->main_thread_clock()->NowTicks() >
954           main_thread_only().delayed_fence.value()) {
955     return true;
956   }
957   return !!main_thread_only().current_fence;
958 }
959 
CouldTaskRun(EnqueueOrder enqueue_order) const960 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
961   if (!IsQueueEnabled())
962     return false;
963 
964   if (!main_thread_only().current_fence)
965     return true;
966 
967   // TODO(crbug.com/1249857): This should use TaskOrder. This is currently only
968   // used for tests and is fine as-is, but we should be using `TaskOrder` for
969   // task comparisons. Also this test should be renamed with a testing suffix as
970   // it is not used in production.
971   return enqueue_order <
972          main_thread_only().current_fence->task_order().enqueue_order();
973 }
974 
WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const975 bool TaskQueueImpl::WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const {
976   return enqueue_order <
977          main_thread_only()
978              .enqueue_order_at_which_we_became_unblocked_with_normal_priority;
979 }
980 
981 // static
QueueAsValue(const TaskDeque & queue,TimeTicks now)982 Value::List TaskQueueImpl::QueueAsValue(const TaskDeque& queue, TimeTicks now) {
983   Value::List state;
984   for (const Task& task : queue)
985     state.Append(TaskAsValue(task, now));
986   return state;
987 }
988 
989 // static
TaskAsValue(const Task & task,TimeTicks now)990 Value::Dict TaskQueueImpl::TaskAsValue(const Task& task, TimeTicks now) {
991   Value::Dict state;
992   state.Set("posted_from", task.posted_from.ToString());
993   if (task.enqueue_order_set())
994     state.Set("enqueue_order", static_cast<int>(task.enqueue_order()));
995   state.Set("sequence_num", task.sequence_num);
996   state.Set("nestable", task.nestable == Nestable::kNestable);
997   state.Set("is_high_res", task.is_high_res);
998   state.Set("is_cancelled", task.task.IsCancelled());
999   state.Set("delayed_run_time",
1000             (task.delayed_run_time - TimeTicks()).InMillisecondsF());
1001   const TimeDelta delayed_run_time_milliseconds_from_now =
1002       task.delayed_run_time.is_null() ? TimeDelta()
1003                                       : (task.delayed_run_time - now);
1004   state.Set("delayed_run_time_milliseconds_from_now",
1005             delayed_run_time_milliseconds_from_now.InMillisecondsF());
1006   return state;
1007 }
1008 
MakeDelayedTask(PostedTask delayed_task,LazyNow * lazy_now) const1009 Task TaskQueueImpl::MakeDelayedTask(PostedTask delayed_task,
1010                                     LazyNow* lazy_now) const {
1011   EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
1012   base::TimeDelta delay;
1013   WakeUpResolution resolution = WakeUpResolution::kLow;
1014 #if BUILDFLAG(IS_WIN)
1015   const bool explicit_high_resolution_timer_win =
1016       g_explicit_high_resolution_timer_win.load(std::memory_order_relaxed);
1017 #endif  // BUILDFLAG(IS_WIN)
1018   if (absl::holds_alternative<base::TimeDelta>(
1019           delayed_task.delay_or_delayed_run_time)) {
1020     delay = absl::get<base::TimeDelta>(delayed_task.delay_or_delayed_run_time);
1021     delayed_task.delay_or_delayed_run_time = lazy_now->Now() + delay;
1022   } else {
1023     delay = absl::get<base::TimeTicks>(delayed_task.delay_or_delayed_run_time) -
1024             lazy_now->Now();
1025   }
1026 #if BUILDFLAG(IS_WIN)
1027   if (!explicit_high_resolution_timer_win &&
1028       delay < (2 * base::Milliseconds(Time::kMinLowResolutionThresholdMs))) {
1029     // Outside the kExplicitHighResolutionTimerWin experiment, We consider the
1030     // task needs a high resolution timer if the delay is more than 0 and less
1031     // than 32ms. This caps the relative error to less than 50% : a 33ms wait
1032     // can wake at 48ms since the default resolution on Windows is between 10
1033     // and 15ms.
1034     resolution = WakeUpResolution::kHigh;
1035   }
1036 #endif  // BUILDFLAG(IS_WIN)
1037   delayed_task.delay_policy = subtle::MaybeOverrideDelayPolicy(
1038       delayed_task.delay_policy, delay,
1039       g_max_precise_delay.load(std::memory_order_relaxed));
1040   // leeway isn't specified yet since this may be called from any thread.
1041   return Task(std::move(delayed_task), sequence_number, EnqueueOrder(),
1042               lazy_now->Now(), resolution);
1043 }
1044 
IsQueueEnabled() const1045 bool TaskQueueImpl::IsQueueEnabled() const {
1046   return main_thread_only().is_enabled;
1047 }
1048 
SetQueueEnabled(bool enabled)1049 void TaskQueueImpl::SetQueueEnabled(bool enabled) {
1050   if (main_thread_only().is_enabled == enabled)
1051     return;
1052 
1053   // Update the |main_thread_only_| struct.
1054   main_thread_only().is_enabled = enabled;
1055   main_thread_only().disabled_time = absl::nullopt;
1056 
1057   // |sequence_manager_| can be null in tests.
1058   if (!sequence_manager_)
1059     return;
1060 
1061   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1062 
1063   if (!enabled) {
1064     bool tracing_enabled = false;
1065     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1066                                        &tracing_enabled);
1067     main_thread_only().disabled_time = lazy_now.Now();
1068   } else {
1069     // Override reporting if the queue is becoming enabled again.
1070     main_thread_only().should_report_posted_tasks_when_disabled = false;
1071   }
1072 
1073   // If there is a throttler, it will be notified of pending delayed and
1074   // immediate tasks inside UpdateWakeUp().
1075   UpdateWakeUp(&lazy_now);
1076 
1077   {
1078     base::internal::CheckedAutoLock lock(any_thread_lock_);
1079     UpdateCrossThreadQueueStateLocked();
1080 
1081     // Copy over the task-reporting related state.
1082     any_thread_.tracing_only.is_enabled = enabled;
1083     any_thread_.tracing_only.disabled_time = main_thread_only().disabled_time;
1084     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1085         main_thread_only().should_report_posted_tasks_when_disabled;
1086   }
1087 
1088   // Finally, enable or disable the queue with the selector.
1089   if (enabled) {
1090     // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
1091     // a DoWork if needed.
1092     sequence_manager_->main_thread_only().selector.EnableQueue(this);
1093 
1094     if (!BlockedByFence())
1095       OnQueueUnblocked();
1096   } else {
1097     sequence_manager_->main_thread_only().selector.DisableQueue(this);
1098   }
1099 }
1100 
SetShouldReportPostedTasksWhenDisabled(bool should_report)1101 void TaskQueueImpl::SetShouldReportPostedTasksWhenDisabled(bool should_report) {
1102   if (main_thread_only().should_report_posted_tasks_when_disabled ==
1103       should_report)
1104     return;
1105 
1106   // Only observe transitions turning the reporting on if tracing is enabled.
1107   if (should_report) {
1108     bool tracing_enabled = false;
1109     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1110                                        &tracing_enabled);
1111     if (!tracing_enabled)
1112       return;
1113   }
1114 
1115   main_thread_only().should_report_posted_tasks_when_disabled = should_report;
1116 
1117   // Mirror the state to the AnyThread struct as well.
1118   {
1119     base::internal::CheckedAutoLock lock(any_thread_lock_);
1120     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1121         should_report;
1122   }
1123 }
1124 
UpdateCrossThreadQueueStateLocked()1125 void TaskQueueImpl::UpdateCrossThreadQueueStateLocked() {
1126   any_thread_.immediate_work_queue_empty =
1127       main_thread_only().immediate_work_queue->Empty();
1128 
1129   if (main_thread_only().throttler) {
1130     // If there's a Throttler, always ScheduleWork() when immediate work is
1131     // posted and the queue is enabled, to ensure that
1132     // Throttler::OnHasImmediateTask() is invoked.
1133     any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled();
1134   } else {
1135     // Otherwise, ScheduleWork() only if the queue is enabled and there isn't a
1136     // fence to prevent the task from being executed.
1137     any_thread_.post_immediate_task_should_schedule_work =
1138         IsQueueEnabled() && !main_thread_only().current_fence;
1139   }
1140 
1141 #if DCHECK_IS_ON()
1142   any_thread_.queue_set_index =
1143       main_thread_only().immediate_work_queue->work_queue_set_index();
1144 #endif
1145 }
1146 
ReclaimMemory(TimeTicks now)1147 void TaskQueueImpl::ReclaimMemory(TimeTicks now) {
1148   if (main_thread_only().delayed_incoming_queue.empty())
1149     return;
1150 
1151   main_thread_only().delayed_incoming_queue.SweepCancelledTasks(
1152       sequence_manager_);
1153 
1154   // If deleting one of the cancelled tasks shut down this queue, bail out.
1155   // Note that in this scenario |this| is still valid, but some fields of the
1156   // queue have been cleared out by |UnregisterTaskQueue|.
1157   if (!main_thread_only().delayed_work_queue) {
1158     return;
1159   }
1160 
1161   LazyNow lazy_now(now);
1162   UpdateWakeUp(&lazy_now);
1163 
1164   // Also consider shrinking the work queue if it's wasting memory.
1165   main_thread_only().delayed_work_queue->MaybeShrinkQueue();
1166   main_thread_only().immediate_work_queue->MaybeShrinkQueue();
1167 
1168   {
1169     base::internal::CheckedAutoLock lock(any_thread_lock_);
1170     any_thread_.immediate_incoming_queue.MaybeShrinkQueue();
1171   }
1172 }
1173 
PushImmediateIncomingTaskForTest(Task task)1174 void TaskQueueImpl::PushImmediateIncomingTaskForTest(Task task) {
1175   base::internal::CheckedAutoLock lock(any_thread_lock_);
1176   any_thread_.immediate_incoming_queue.push_back(std::move(task));
1177 }
1178 
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)1179 void TaskQueueImpl::RequeueDeferredNonNestableTask(
1180     DeferredNonNestableTask task) {
1181   DCHECK(task.task.nestable == Nestable::kNonNestable);
1182 
1183   // It's possible that the queue was unregistered since the task was posted.
1184   // Skip the task in that case.
1185   if (!main_thread_only().delayed_work_queue)
1186     return;
1187 
1188   // The re-queued tasks have to be pushed onto the front because we'd otherwise
1189   // violate the strict monotonically increasing enqueue order within the
1190   // WorkQueue.  We can't assign them a new enqueue order here because that will
1191   // not behave correctly with fences and things will break (e.g Idle TQ).
1192   if (task.work_queue_type == WorkQueueType::kDelayed) {
1193     main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
1194         std::move(task.task));
1195   } else {
1196     // We're about to push |task| onto an empty |immediate_work_queue|
1197     // (bypassing |immediate_incoming_queue_|). As such, we no longer need to
1198     // reload if we were planning to. The flag must be cleared while holding
1199     // the lock to avoid a cross-thread post task setting it again before
1200     // we actually make |immediate_work_queue| non-empty.
1201     if (main_thread_only().immediate_work_queue->Empty()) {
1202       base::internal::CheckedAutoLock lock(any_thread_lock_);
1203       empty_queues_to_reload_handle_.SetActive(false);
1204 
1205       any_thread_.immediate_work_queue_empty = false;
1206       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1207           std::move(task.task));
1208 
1209     } else {
1210       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1211           std::move(task.task));
1212     }
1213   }
1214 }
1215 
SetThrottler(TaskQueue::Throttler * throttler)1216 void TaskQueueImpl::SetThrottler(TaskQueue::Throttler* throttler) {
1217   DCHECK(throttler);
1218   DCHECK(!main_thread_only().throttler)
1219       << "Can't assign two different throttlers to "
1220          "base::sequence_manager:TaskQueue";
1221   // `throttler` is guaranteed to outlive this object.
1222   main_thread_only().throttler = throttler;
1223 }
1224 
ResetThrottler()1225 void TaskQueueImpl::ResetThrottler() {
1226   main_thread_only().throttler = nullptr;
1227   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1228   // The current delayed wake up may have been determined by the Throttler.
1229   // Update it now that there is no Throttler.
1230   UpdateWakeUp(&lazy_now);
1231 }
1232 
UpdateWakeUp(LazyNow * lazy_now)1233 void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) {
1234   absl::optional<WakeUp> wake_up = GetNextDesiredWakeUp();
1235   if (main_thread_only().throttler && IsQueueEnabled()) {
1236     // GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is
1237     // nullopt, e.g. to throttle immediate tasks.
1238     wake_up = main_thread_only().throttler->GetNextAllowedWakeUp(
1239         lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask());
1240   }
1241   SetNextWakeUp(lazy_now, wake_up);
1242 }
1243 
SetNextWakeUp(LazyNow * lazy_now,absl::optional<WakeUp> wake_up)1244 void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now,
1245                                   absl::optional<WakeUp> wake_up) {
1246   if (main_thread_only().scheduled_wake_up == wake_up)
1247     return;
1248   main_thread_only().scheduled_wake_up = wake_up;
1249   main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now,
1250                                                           wake_up);
1251 }
1252 
HasTaskToRunImmediately() const1253 bool TaskQueueImpl::HasTaskToRunImmediately() const {
1254   // Any work queue tasks count as immediate work.
1255   if (!main_thread_only().delayed_work_queue->Empty() ||
1256       !main_thread_only().immediate_work_queue->Empty()) {
1257     return true;
1258   }
1259 
1260   // Finally tasks on |immediate_incoming_queue| count as immediate work.
1261   base::internal::CheckedAutoLock lock(any_thread_lock_);
1262   return !any_thread_.immediate_incoming_queue.empty();
1263 }
1264 
HasTaskToRunImmediatelyLocked() const1265 bool TaskQueueImpl::HasTaskToRunImmediatelyLocked() const {
1266   return !main_thread_only().delayed_work_queue->Empty() ||
1267          !main_thread_only().immediate_work_queue->Empty() ||
1268          !any_thread_.immediate_incoming_queue.empty();
1269 }
1270 
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)1271 void TaskQueueImpl::SetOnTaskStartedHandler(
1272     TaskQueueImpl::OnTaskStartedHandler handler) {
1273   DCHECK(should_notify_observers_ || handler.is_null());
1274   main_thread_only().on_task_started_handler = std::move(handler);
1275 }
1276 
OnTaskStarted(const Task & task,const TaskQueue::TaskTiming & task_timing)1277 void TaskQueueImpl::OnTaskStarted(const Task& task,
1278                                   const TaskQueue::TaskTiming& task_timing) {
1279   if (!main_thread_only().on_task_started_handler.is_null())
1280     main_thread_only().on_task_started_handler.Run(task, task_timing);
1281 }
1282 
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)1283 void TaskQueueImpl::SetOnTaskCompletedHandler(
1284     TaskQueueImpl::OnTaskCompletedHandler handler) {
1285   DCHECK(should_notify_observers_ || handler.is_null());
1286   main_thread_only().on_task_completed_handler = std::move(handler);
1287 }
1288 
OnTaskCompleted(const Task & task,TaskQueue::TaskTiming * task_timing,LazyNow * lazy_now)1289 void TaskQueueImpl::OnTaskCompleted(const Task& task,
1290                                     TaskQueue::TaskTiming* task_timing,
1291                                     LazyNow* lazy_now) {
1292   if (!main_thread_only().on_task_completed_handler.is_null()) {
1293     main_thread_only().on_task_completed_handler.Run(task, task_timing,
1294                                                      lazy_now);
1295   }
1296 }
1297 
RequiresTaskTiming() const1298 bool TaskQueueImpl::RequiresTaskTiming() const {
1299   return !main_thread_only().on_task_started_handler.is_null() ||
1300          !main_thread_only().on_task_completed_handler.is_null();
1301 }
1302 
1303 std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler)1304 TaskQueueImpl::AddOnTaskPostedHandler(OnTaskPostedHandler handler) {
1305   DCHECK(should_notify_observers_ && !handler.is_null());
1306   std::unique_ptr<OnTaskPostedCallbackHandleImpl> handle =
1307       std::make_unique<OnTaskPostedCallbackHandleImpl>(this,
1308                                                        associated_thread_);
1309   base::internal::CheckedAutoLock lock(any_thread_lock_);
1310   any_thread_.on_task_posted_handlers.insert(
1311       {handle.get(), std::move(handler)});
1312   return handle;
1313 }
1314 
RemoveOnTaskPostedHandler(TaskQueueImpl::OnTaskPostedCallbackHandleImpl * on_task_posted_callback_handle)1315 void TaskQueueImpl::RemoveOnTaskPostedHandler(
1316     TaskQueueImpl::OnTaskPostedCallbackHandleImpl*
1317         on_task_posted_callback_handle) {
1318   base::internal::CheckedAutoLock lock(any_thread_lock_);
1319   any_thread_.on_task_posted_handlers.erase(on_task_posted_callback_handle);
1320 }
1321 
SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger)1322 void TaskQueueImpl::SetTaskExecutionTraceLogger(
1323     TaskExecutionTraceLogger logger) {
1324   DCHECK(should_notify_observers_ || logger.is_null());
1325   main_thread_only().task_execution_trace_logger = std::move(logger);
1326 }
1327 
IsUnregistered() const1328 bool TaskQueueImpl::IsUnregistered() const {
1329   base::internal::CheckedAutoLock lock(any_thread_lock_);
1330   return any_thread_.unregistered;
1331 }
1332 
GetSequenceManagerWeakPtr()1333 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
1334   return sequence_manager_->GetWeakPtr();
1335 }
1336 
ActivateDelayedFenceIfNeeded(const Task & task)1337 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) {
1338   if (!main_thread_only().delayed_fence)
1339     return;
1340   if (main_thread_only().delayed_fence.value() > task.delayed_run_time)
1341     return;
1342   InsertFence(Fence(task.task_order()));
1343   main_thread_only().delayed_fence = absl::nullopt;
1344 }
1345 
MaybeReportIpcTaskQueuedFromMainThread(const Task & pending_task)1346 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread(
1347     const Task& pending_task) {
1348   if (!pending_task.ipc_hash)
1349     return;
1350 
1351   // It's possible that tracing was just enabled and no disabled time has been
1352   // stored. In that case, skip emitting the event.
1353   if (!main_thread_only().disabled_time)
1354     return;
1355 
1356   bool tracing_enabled = false;
1357   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1358                                      &tracing_enabled);
1359   if (!tracing_enabled)
1360     return;
1361 
1362   if (main_thread_only().is_enabled ||
1363       !main_thread_only().should_report_posted_tasks_when_disabled) {
1364     return;
1365   }
1366 
1367   base::TimeDelta time_since_disabled =
1368       sequence_manager_->main_thread_clock()->NowTicks() -
1369       main_thread_only().disabled_time.value();
1370 
1371   ReportIpcTaskQueued(pending_task, time_since_disabled);
1372 }
1373 
ShouldReportIpcTaskQueuedFromAnyThreadLocked(base::TimeDelta * time_since_disabled)1374 bool TaskQueueImpl::ShouldReportIpcTaskQueuedFromAnyThreadLocked(
1375     base::TimeDelta* time_since_disabled) {
1376   // It's possible that tracing was just enabled and no disabled time has been
1377   // stored. In that case, skip emitting the event.
1378   if (!any_thread_.tracing_only.disabled_time)
1379     return false;
1380 
1381   if (any_thread_.tracing_only.is_enabled ||
1382       any_thread_.tracing_only.should_report_posted_tasks_when_disabled) {
1383     return false;
1384   }
1385 
1386   *time_since_disabled = sequence_manager_->any_thread_clock()->NowTicks() -
1387                          any_thread_.tracing_only.disabled_time.value();
1388   return true;
1389 }
1390 
MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task & pending_task)1391 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadLocked(
1392     const Task& pending_task) {
1393   if (!pending_task.ipc_hash)
1394     return;
1395 
1396   bool tracing_enabled = false;
1397   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1398                                      &tracing_enabled);
1399   if (!tracing_enabled)
1400     return;
1401 
1402   base::TimeDelta time_since_disabled;
1403   if (ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled))
1404     ReportIpcTaskQueued(pending_task, time_since_disabled);
1405 }
1406 
MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task & pending_task)1407 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(
1408     const Task& pending_task) {
1409   if (!pending_task.ipc_hash)
1410     return;
1411 
1412   bool tracing_enabled = false;
1413   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1414                                      &tracing_enabled);
1415   if (!tracing_enabled)
1416     return;
1417 
1418   base::TimeDelta time_since_disabled;
1419   bool should_report = false;
1420   {
1421     base::internal::CheckedAutoLock lock(any_thread_lock_);
1422     should_report =
1423         ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled);
1424   }
1425 
1426   if (should_report)
1427     ReportIpcTaskQueued(pending_task, time_since_disabled);
1428 }
1429 
ReportIpcTaskQueued(const Task & pending_task,const base::TimeDelta & time_since_disabled)1430 void TaskQueueImpl::ReportIpcTaskQueued(
1431     const Task& pending_task,
1432     const base::TimeDelta& time_since_disabled) {
1433   TRACE_EVENT_INSTANT(
1434       TRACE_DISABLED_BY_DEFAULT("lifecycles"), "task_posted_to_disabled_queue",
1435       [&](perfetto::EventContext ctx) {
1436         auto* proto = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
1437                           ->set_chrome_task_posted_to_disabled_queue();
1438         proto->set_time_since_disabled_ms(
1439             checked_cast<uint64_t>(time_since_disabled.InMilliseconds()));
1440         proto->set_ipc_hash(pending_task.ipc_hash);
1441         proto->set_source_location_iid(
1442             base::trace_event::InternedSourceLocation::Get(
1443                 &ctx, pending_task.posted_from));
1444       });
1445 }
1446 
OnQueueUnblocked()1447 void TaskQueueImpl::OnQueueUnblocked() {
1448   DCHECK(IsQueueEnabled());
1449   DCHECK(!BlockedByFence());
1450 
1451   main_thread_only().enqueue_order_at_which_we_became_unblocked =
1452       sequence_manager_->GetNextSequenceNumber();
1453   if (GetQueuePriority() <= DefaultPriority()) {
1454     // We are default priority or more important so update
1455     // |enqueue_order_at_which_we_became_unblocked_with_normal_priority|.
1456     main_thread_only()
1457         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
1458         main_thread_only().enqueue_order_at_which_we_became_unblocked;
1459   }
1460 }
1461 
1462 std::unique_ptr<TaskQueue::QueueEnabledVoter>
CreateQueueEnabledVoter()1463 TaskQueueImpl::CreateQueueEnabledVoter() {
1464   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1465   return WrapUnique(
1466       new TaskQueue::QueueEnabledVoter(voter_weak_ptr_factory_.GetWeakPtr()));
1467 }
1468 
AddQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1469 void TaskQueueImpl::AddQueueEnabledVoter(bool voter_is_enabled,
1470                                          TaskQueue::QueueEnabledVoter& voter) {
1471   ++main_thread_only().voter_count;
1472   if (voter_is_enabled) {
1473     ++main_thread_only().enabled_voter_count;
1474   }
1475 }
1476 
RemoveQueueEnabledVoter(bool voter_is_enabled,TaskQueue::QueueEnabledVoter & voter)1477 void TaskQueueImpl::RemoveQueueEnabledVoter(
1478     bool voter_is_enabled,
1479     TaskQueue::QueueEnabledVoter& voter) {
1480   bool was_enabled = AreAllQueueEnabledVotersEnabled();
1481   if (voter_is_enabled) {
1482     --main_thread_only().enabled_voter_count;
1483     DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1484   }
1485 
1486   --main_thread_only().voter_count;
1487   DCHECK_GE(main_thread_only().voter_count, 0);
1488 
1489   bool is_enabled = AreAllQueueEnabledVotersEnabled();
1490   if (was_enabled != is_enabled) {
1491     SetQueueEnabled(is_enabled);
1492   }
1493 }
1494 
OnQueueEnabledVoteChanged(bool enabled)1495 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
1496   bool was_enabled = AreAllQueueEnabledVotersEnabled();
1497   if (enabled) {
1498     ++main_thread_only().enabled_voter_count;
1499     DCHECK_LE(main_thread_only().enabled_voter_count,
1500               main_thread_only().voter_count);
1501   } else {
1502     --main_thread_only().enabled_voter_count;
1503     DCHECK_GE(main_thread_only().enabled_voter_count, 0);
1504   }
1505 
1506   bool is_enabled = AreAllQueueEnabledVotersEnabled();
1507   if (was_enabled != is_enabled) {
1508     SetQueueEnabled(is_enabled);
1509   }
1510 }
1511 
CompleteInitializationOnBoundThread()1512 void TaskQueueImpl::CompleteInitializationOnBoundThread() {
1513   voter_weak_ptr_factory_.BindToCurrentSequence(PassKey<TaskQueueImpl>());
1514 }
1515 
DefaultPriority() const1516 TaskQueue::QueuePriority TaskQueueImpl::DefaultPriority() const {
1517   return sequence_manager()->settings().priority_settings.default_priority();
1518 }
1519 
1520 TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default;
1521 TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default;
1522 
push(Task task)1523 void TaskQueueImpl::DelayedIncomingQueue::push(Task task) {
1524   // TODO(crbug.com/1247285): Remove this once the cause of corrupted tasks in
1525   // the queue is understood.
1526   CHECK(task.task);
1527   if (task.is_high_res)
1528     pending_high_res_tasks_++;
1529   queue_.insert(std::move(task));
1530 }
1531 
remove(HeapHandle heap_handle)1532 void TaskQueueImpl::DelayedIncomingQueue::remove(HeapHandle heap_handle) {
1533   DCHECK(!empty());
1534   DCHECK_LT(heap_handle.index(), queue_.size());
1535   Task task = queue_.take(heap_handle);
1536   if (task.is_high_res) {
1537     pending_high_res_tasks_--;
1538     DCHECK_GE(pending_high_res_tasks_, 0);
1539   }
1540 }
1541 
take_top()1542 Task TaskQueueImpl::DelayedIncomingQueue::take_top() {
1543   DCHECK(!empty());
1544   if (queue_.top().is_high_res) {
1545     pending_high_res_tasks_--;
1546     DCHECK_GE(pending_high_res_tasks_, 0);
1547   }
1548   return queue_.take_top();
1549 }
1550 
swap(DelayedIncomingQueue * rhs)1551 void TaskQueueImpl::DelayedIncomingQueue::swap(DelayedIncomingQueue* rhs) {
1552   std::swap(pending_high_res_tasks_, rhs->pending_high_res_tasks_);
1553   std::swap(queue_, rhs->queue_);
1554 }
1555 
SweepCancelledTasks(SequenceManagerImpl * sequence_manager)1556 void TaskQueueImpl::DelayedIncomingQueue::SweepCancelledTasks(
1557     SequenceManagerImpl* sequence_manager) {
1558   // Note: IntrusiveHeap::EraseIf() is safe against re-entrancy caused by
1559   // deleted tasks posting new tasks.
1560   queue_.EraseIf([this](const Task& task) {
1561     if (task.task.IsCancelled()) {
1562       if (task.is_high_res) {
1563         --pending_high_res_tasks_;
1564         DCHECK_GE(pending_high_res_tasks_, 0);
1565       }
1566       return true;
1567     }
1568     return false;
1569   });
1570 }
1571 
AsValue(TimeTicks now) const1572 Value::List TaskQueueImpl::DelayedIncomingQueue::AsValue(TimeTicks now) const {
1573   Value::List state;
1574   for (const Task& task : queue_)
1575     state.Append(TaskAsValue(task, now));
1576   return state;
1577 }
1578 
operator ()(const Task & lhs,const Task & rhs) const1579 bool TaskQueueImpl::DelayedIncomingQueue::Compare::operator()(
1580     const Task& lhs,
1581     const Task& rhs) const {
1582   // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
1583   // not be the first task eligible to run, but tasks will always become ripe
1584   // before their latest_delayed_run_time().
1585   const TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
1586   const TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
1587   if (lhs_latest_delayed_run_time == rhs_latest_delayed_run_time)
1588     return lhs.sequence_num > rhs.sequence_num;
1589   return lhs_latest_delayed_run_time > rhs_latest_delayed_run_time;
1590 }
1591 
OnTaskPostedCallbackHandleImpl(TaskQueueImpl * task_queue_impl,scoped_refptr<const AssociatedThreadId> associated_thread)1592 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::OnTaskPostedCallbackHandleImpl(
1593     TaskQueueImpl* task_queue_impl,
1594     scoped_refptr<const AssociatedThreadId> associated_thread)
1595     : task_queue_impl_(task_queue_impl),
1596       associated_thread_(std::move(associated_thread)) {
1597   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1598 }
1599 
1600 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::
~OnTaskPostedCallbackHandleImpl()1601     ~OnTaskPostedCallbackHandleImpl() {
1602   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1603   if (task_queue_impl_)
1604     task_queue_impl_->RemoveOnTaskPostedHandler(this);
1605 }
1606 
1607 }  // namespace internal
1608 }  // namespace sequence_manager
1609 }  // namespace base
1610