• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/task_queue_impl.h"
6 
7 #include <inttypes.h>
8 
9 #include <memory>
10 #include <utility>
11 
12 #include "base/check.h"
13 #include "base/compiler_specific.h"
14 #include "base/containers/stack_container.h"
15 #include "base/feature_list.h"
16 #include "base/logging.h"
17 #include "base/memory/scoped_refptr.h"
18 #include "base/metrics/histogram_macros.h"
19 #include "base/notreached.h"
20 #include "base/observer_list.h"
21 #include "base/ranges/algorithm.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/task/common/scoped_defer_task_posting.h"
24 #include "base/task/default_delayed_task_handle_delegate.h"
25 #include "base/task/sequence_manager/associated_thread_id.h"
26 #include "base/task/sequence_manager/delayed_task_handle_delegate.h"
27 #include "base/task/sequence_manager/fence.h"
28 #include "base/task/sequence_manager/sequence_manager_impl.h"
29 #include "base/task/sequence_manager/task_order.h"
30 #include "base/task/sequence_manager/wake_up_queue.h"
31 #include "base/task/sequence_manager/work_queue.h"
32 #include "base/task/task_features.h"
33 #include "base/task/task_observer.h"
34 #include "base/threading/thread_restrictions.h"
35 #include "base/time/time.h"
36 #include "base/trace_event/base_tracing.h"
37 #include "build/build_config.h"
38 #include "third_party/abseil-cpp/absl/types/optional.h"
39 
40 namespace base {
41 namespace sequence_manager {
42 
43 namespace {
44 
45 // Controls whether cancelled tasks are removed from the delayed queue.
46 BASE_FEATURE(kSweepCancelledTasks,
47              "SweepCancelledTasks",
48              FEATURE_ENABLED_BY_DEFAULT);
49 
50 }  // namespace
51 
52 namespace internal {
53 
54 namespace {
55 
56 // Cache of the state of the kRemoveCanceledTasksInTaskQueue,
57 // kSweepCancelledTasks and kExplicitHighResolutionTimerWin features. This
58 // avoids the need to constantly query their enabled state through
59 // FeatureList::IsEnabled().
60 bool g_is_remove_canceled_tasks_in_task_queue_enabled = true;
61 bool g_is_sweep_cancelled_tasks_enabled =
62     kSweepCancelledTasks.default_state == FEATURE_ENABLED_BY_DEFAULT;
63 #if BUILDFLAG(IS_WIN)
64 // An atomic is used here because the flag is queried from other threads when
65 // tasks are posted cross-thread, which can race with its initialization.
66 std::atomic_bool g_explicit_high_resolution_timer_win{false};
67 #endif  // BUILDFLAG(IS_WIN)
68 
69 }  // namespace
70 
GuardedTaskPoster(TaskQueueImpl * outer)71 TaskQueueImpl::GuardedTaskPoster::GuardedTaskPoster(TaskQueueImpl* outer)
72     : outer_(outer) {}
73 
~GuardedTaskPoster()74 TaskQueueImpl::GuardedTaskPoster::~GuardedTaskPoster() {}
75 
PostTask(PostedTask task)76 bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) {
77   // Do not process new PostTasks while we are handling a PostTask (tracing
78   // has to do this) as it can lead to a deadlock and defer it instead.
79   ScopedDeferTaskPosting disallow_task_posting;
80 
81   auto token = operations_controller_.TryBeginOperation();
82   if (!token)
83     return false;
84 
85   outer_->PostTask(std::move(task));
86   return true;
87 }
88 
PostCancelableTask(PostedTask task)89 DelayedTaskHandle TaskQueueImpl::GuardedTaskPoster::PostCancelableTask(
90     PostedTask task) {
91   // Do not process new PostTasks while we are handling a PostTask (tracing
92   // has to do this) as it can lead to a deadlock and defer it instead.
93   ScopedDeferTaskPosting disallow_task_posting;
94 
95   auto token = operations_controller_.TryBeginOperation();
96   if (!token)
97     return DelayedTaskHandle();
98 
99   auto delayed_task_handle_delegate =
100       std::make_unique<DelayedTaskHandleDelegate>(outer_);
101   task.delayed_task_handle_delegate = delayed_task_handle_delegate->AsWeakPtr();
102 
103   outer_->PostTask(std::move(task));
104   DCHECK(delayed_task_handle_delegate->IsValid());
105   return DelayedTaskHandle(std::move(delayed_task_handle_delegate));
106 }
107 
TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,scoped_refptr<const AssociatedThreadId> associated_thread,TaskType task_type)108 TaskQueueImpl::TaskRunner::TaskRunner(
109     scoped_refptr<GuardedTaskPoster> task_poster,
110     scoped_refptr<const AssociatedThreadId> associated_thread,
111     TaskType task_type)
112     : task_poster_(std::move(task_poster)),
113       associated_thread_(std::move(associated_thread)),
114       task_type_(task_type) {}
115 
~TaskRunner()116 TaskQueueImpl::TaskRunner::~TaskRunner() {}
117 
PostDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)118 bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location,
119                                                 OnceClosure callback,
120                                                 TimeDelta delay) {
121   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
122                                            delay, Nestable::kNestable,
123                                            task_type_));
124 }
125 
PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)126 bool TaskQueueImpl::TaskRunner::PostDelayedTaskAt(
127     subtle::PostDelayedTaskPassKey,
128     const Location& location,
129     OnceClosure callback,
130     TimeTicks delayed_run_time,
131     base::subtle::DelayPolicy delay_policy) {
132   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
133                                            delayed_run_time, delay_policy,
134                                            Nestable::kNestable, task_type_));
135 }
136 
PostCancelableDelayedTaskAt(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeTicks delayed_run_time,base::subtle::DelayPolicy delay_policy)137 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTaskAt(
138     subtle::PostDelayedTaskPassKey pass_key,
139     const Location& location,
140     OnceClosure callback,
141     TimeTicks delayed_run_time,
142     base::subtle::DelayPolicy delay_policy) {
143   if (!g_is_remove_canceled_tasks_in_task_queue_enabled) {
144     // This will revert to PostDelayedTaskAt() with default DelayedTaskHandle.
145     return SequencedTaskRunner::PostCancelableDelayedTaskAt(
146         pass_key, location, std::move(callback), delayed_run_time,
147         delay_policy);
148   }
149   return task_poster_->PostCancelableTask(
150       PostedTask(this, std::move(callback), location, delayed_run_time,
151                  delay_policy, Nestable::kNestable, task_type_));
152 }
153 
PostCancelableDelayedTask(subtle::PostDelayedTaskPassKey pass_key,const Location & location,OnceClosure callback,TimeDelta delay)154 DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTask(
155     subtle::PostDelayedTaskPassKey pass_key,
156     const Location& location,
157     OnceClosure callback,
158     TimeDelta delay) {
159   if (!g_is_remove_canceled_tasks_in_task_queue_enabled) {
160     return SequencedTaskRunner::PostCancelableDelayedTask(
161         pass_key, location, std::move(callback), delay);
162   }
163 
164   return task_poster_->PostCancelableTask(
165       PostedTask(this, std::move(callback), location, delay,
166                  Nestable::kNestable, task_type_));
167 }
168 
PostNonNestableDelayedTask(const Location & location,OnceClosure callback,TimeDelta delay)169 bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask(
170     const Location& location,
171     OnceClosure callback,
172     TimeDelta delay) {
173   return task_poster_->PostTask(PostedTask(this, std::move(callback), location,
174                                            delay, Nestable::kNonNestable,
175                                            task_type_));
176 }
177 
RunsTasksInCurrentSequence() const178 bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const {
179   return associated_thread_->IsBoundToCurrentThread();
180 }
181 
182 // static
InitializeFeatures()183 void TaskQueueImpl::InitializeFeatures() {
184   ApplyRemoveCanceledTasksInTaskQueue();
185   g_is_sweep_cancelled_tasks_enabled =
186       FeatureList::IsEnabled(kSweepCancelledTasks);
187 #if BUILDFLAG(IS_WIN)
188   g_explicit_high_resolution_timer_win.store(
189       FeatureList::IsEnabled(kExplicitHighResolutionTimerWin),
190       std::memory_order_relaxed);
191 #endif  // BUILDFLAG(IS_WIN)
192 }
193 
194 // static
ApplyRemoveCanceledTasksInTaskQueue()195 void TaskQueueImpl::ApplyRemoveCanceledTasksInTaskQueue() {
196   // Since kRemoveCanceledTasksInTaskQueue is not constexpr (forbidden for
197   // Features), it cannot be used to initialize
198   // |g_is_remove_canceled_tasks_in_task_queue_enabled| at compile time. At
199   // least DCHECK that its initial value matches the default value of the
200   // feature here.
201   DCHECK_EQ(g_is_remove_canceled_tasks_in_task_queue_enabled,
202             kRemoveCanceledTasksInTaskQueue.default_state ==
203                 FEATURE_ENABLED_BY_DEFAULT);
204   g_is_remove_canceled_tasks_in_task_queue_enabled =
205       FeatureList::IsEnabled(kRemoveCanceledTasksInTaskQueue);
206 }
207 
208 // static
ResetRemoveCanceledTasksInTaskQueueForTesting()209 void TaskQueueImpl::ResetRemoveCanceledTasksInTaskQueueForTesting() {
210   g_is_remove_canceled_tasks_in_task_queue_enabled =
211       kRemoveCanceledTasksInTaskQueue.default_state ==
212       FEATURE_ENABLED_BY_DEFAULT;
213 }
214 
TaskQueueImpl(SequenceManagerImpl * sequence_manager,WakeUpQueue * wake_up_queue,const TaskQueue::Spec & spec)215 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
216                              WakeUpQueue* wake_up_queue,
217                              const TaskQueue::Spec& spec)
218     : name_(spec.name),
219       sequence_manager_(sequence_manager),
220       associated_thread_(sequence_manager
221                              ? sequence_manager->associated_thread()
222                              : AssociatedThreadId::CreateBound()),
223       task_poster_(MakeRefCounted<GuardedTaskPoster>(this)),
224       main_thread_only_(this, wake_up_queue),
225       empty_queues_to_reload_handle_(
226           sequence_manager
227               ? sequence_manager->GetFlagToRequestReloadForEmptyQueue(this)
228               : AtomicFlagSet::AtomicFlag()),
229       should_monitor_quiescence_(spec.should_monitor_quiescence),
230       should_notify_observers_(spec.should_notify_observers),
231       delayed_fence_allowed_(spec.delayed_fence_allowed) {
232   UpdateCrossThreadQueueStateLocked();
233   // SequenceManager can't be set later, so we need to prevent task runners
234   // from posting any tasks.
235   if (sequence_manager_)
236     task_poster_->StartAcceptingOperations();
237 }
238 
~TaskQueueImpl()239 TaskQueueImpl::~TaskQueueImpl() {
240 #if DCHECK_IS_ON()
241   base::internal::CheckedAutoLock lock(any_thread_lock_);
242   // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
243   // contains a strong reference to this TaskQueueImpl and the
244   // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
245   // queues.
246   DCHECK(any_thread_.unregistered)
247       << "UnregisterTaskQueue must be called first!";
248 #endif
249 }
250 
251 TaskQueueImpl::AnyThread::AnyThread() = default;
252 TaskQueueImpl::AnyThread::~AnyThread() = default;
253 
254 TaskQueueImpl::AnyThread::TracingOnly::TracingOnly() = default;
255 TaskQueueImpl::AnyThread::TracingOnly::~TracingOnly() = default;
256 
MainThreadOnly(TaskQueueImpl * task_queue,WakeUpQueue * wake_up_queue)257 TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue,
258                                               WakeUpQueue* wake_up_queue)
259     : wake_up_queue(wake_up_queue),
260       delayed_work_queue(
261           new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
262       immediate_work_queue(new WorkQueue(task_queue,
263                                          "immediate",
264                                          WorkQueue::QueueType::kImmediate)) {}
265 
266 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
267 
CreateTaskRunner(TaskType task_type) const268 scoped_refptr<SingleThreadTaskRunner> TaskQueueImpl::CreateTaskRunner(
269     TaskType task_type) const {
270   return MakeRefCounted<TaskRunner>(task_poster_, associated_thread_,
271                                     task_type);
272 }
273 
UnregisterTaskQueue()274 void TaskQueueImpl::UnregisterTaskQueue() {
275   TRACE_EVENT0("base", "TaskQueueImpl::UnregisterTaskQueue");
276   // Detach task runners.
277   {
278     ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait;
279     task_poster_->ShutdownAndWaitForZeroOperations();
280   }
281 
282   TaskDeque immediate_incoming_queue;
283   base::flat_map<raw_ptr<OnTaskPostedCallbackHandleImpl>, OnTaskPostedHandler>
284       on_task_posted_handlers;
285 
286   {
287     base::internal::CheckedAutoLock lock(any_thread_lock_);
288     any_thread_.unregistered = true;
289     immediate_incoming_queue.swap(any_thread_.immediate_incoming_queue);
290 
291     for (auto& handler : any_thread_.on_task_posted_handlers)
292       handler.first->UnregisterTaskQueue();
293     any_thread_.on_task_posted_handlers.swap(on_task_posted_handlers);
294   }
295 
296   if (main_thread_only().wake_up_queue) {
297     main_thread_only().wake_up_queue->UnregisterQueue(this);
298   }
299 
300   main_thread_only().on_task_started_handler = OnTaskStartedHandler();
301   main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
302   main_thread_only().wake_up_queue = nullptr;
303   main_thread_only().throttler = nullptr;
304   empty_queues_to_reload_handle_.ReleaseAtomicFlag();
305 
306   // It is possible for a task to hold a scoped_refptr to this, which
307   // will lead to TaskQueueImpl destructor being called when deleting a task.
308   // To avoid use-after-free, we need to clear all fields of a task queue
309   // before starting to delete the tasks.
310   // All work queues and priority queues containing tasks should be moved to
311   // local variables on stack (std::move for unique_ptrs and swap for queues)
312   // before clearing them and deleting tasks.
313 
314   // Flush the queues outside of the lock because TSAN complains about a lock
315   // order inversion for tasks that are posted from within a lock, with a
316   // destructor that acquires the same lock.
317 
318   DelayedIncomingQueue delayed_incoming_queue;
319   delayed_incoming_queue.swap(&main_thread_only().delayed_incoming_queue);
320   std::unique_ptr<WorkQueue> immediate_work_queue =
321       std::move(main_thread_only().immediate_work_queue);
322   std::unique_ptr<WorkQueue> delayed_work_queue =
323       std::move(main_thread_only().delayed_work_queue);
324 }
325 
GetName() const326 const char* TaskQueueImpl::GetName() const {
327   return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_);
328 }
329 
GetProtoName() const330 QueueName TaskQueueImpl::GetProtoName() const {
331   return name_;
332 }
333 
PostTask(PostedTask task)334 void TaskQueueImpl::PostTask(PostedTask task) {
335   CurrentThread current_thread =
336       associated_thread_->IsBoundToCurrentThread()
337           ? TaskQueueImpl::CurrentThread::kMainThread
338           : TaskQueueImpl::CurrentThread::kNotMainThread;
339 
340 #if DCHECK_IS_ON()
341   TimeDelta delay = GetTaskDelayAdjustment(current_thread);
342   if (absl::holds_alternative<base::TimeTicks>(
343           task.delay_or_delayed_run_time)) {
344     absl::get<base::TimeTicks>(task.delay_or_delayed_run_time) += delay;
345   } else {
346     absl::get<base::TimeDelta>(task.delay_or_delayed_run_time) += delay;
347   }
348 #endif  // DCHECK_IS_ON()
349 
350   if (!task.is_delayed()) {
351     PostImmediateTaskImpl(std::move(task), current_thread);
352   } else {
353     PostDelayedTaskImpl(std::move(task), current_thread);
354   }
355 }
356 
RemoveCancelableTask(HeapHandle heap_handle)357 void TaskQueueImpl::RemoveCancelableTask(HeapHandle heap_handle) {
358   // Can only cancel from the current thread.
359   DCHECK(associated_thread_->IsBoundToCurrentThread());
360   DCHECK(heap_handle.IsValid());
361 
362   main_thread_only().delayed_incoming_queue.remove(heap_handle);
363 
364   // Only update the delayed wake up if the top task is removed.
365   if (heap_handle.index() == 0u) {
366     LazyNow lazy_now(sequence_manager_->main_thread_clock());
367     UpdateWakeUp(&lazy_now);
368   }
369 }
370 
GetTaskDelayAdjustment(CurrentThread current_thread)371 TimeDelta TaskQueueImpl::GetTaskDelayAdjustment(CurrentThread current_thread) {
372 #if DCHECK_IS_ON()
373   if (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread) {
374     base::internal::CheckedAutoLock lock(any_thread_lock_);
375     // Add a per-priority delay to cross thread tasks. This can help diagnose
376     // scheduler induced flakiness by making things flake most of the time.
377     return sequence_manager_->settings()
378         .priority_settings
379         .per_priority_cross_thread_task_delay()[any_thread_.queue_set_index];
380   } else {
381     return sequence_manager_->settings()
382         .priority_settings.per_priority_same_thread_task_delay()
383             [main_thread_only().immediate_work_queue->work_queue_set_index()];
384   }
385 #else
386   // No delay adjustment.
387   return TimeDelta();
388 #endif  // DCHECK_IS_ON()
389 }
390 
PostImmediateTaskImpl(PostedTask task,CurrentThread current_thread)391 void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task,
392                                           CurrentThread current_thread) {
393   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
394   // for details.
395   CHECK(task.callback);
396 
397   bool should_schedule_work = false;
398   {
399     // TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue
400     // See https://crbug.com/901800
401     base::internal::CheckedAutoLock lock(any_thread_lock_);
402     bool add_queue_time_to_tasks = sequence_manager_->GetAddQueueTimeToTasks();
403     TimeTicks queue_time;
404     if (add_queue_time_to_tasks || delayed_fence_allowed_)
405       queue_time = sequence_manager_->any_thread_clock()->NowTicks();
406 
407     // The sequence number must be incremented atomically with pushing onto the
408     // incoming queue. Otherwise if there are several threads posting task we
409     // risk breaking the assumption that sequence numbers increase monotonically
410     // within a queue.
411     EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
412     bool was_immediate_incoming_queue_empty =
413         any_thread_.immediate_incoming_queue.empty();
414     any_thread_.immediate_incoming_queue.push_back(
415         Task(std::move(task), sequence_number, sequence_number, queue_time));
416 
417 #if DCHECK_IS_ON()
418     any_thread_.immediate_incoming_queue.back().cross_thread_ =
419         (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread);
420 #endif
421 
422     sequence_manager_->WillQueueTask(
423         &any_thread_.immediate_incoming_queue.back());
424     MaybeReportIpcTaskQueuedFromAnyThreadLocked(
425         any_thread_.immediate_incoming_queue.back());
426 
427     for (auto& handler : any_thread_.on_task_posted_handlers) {
428       DCHECK(!handler.second.is_null());
429       handler.second.Run(any_thread_.immediate_incoming_queue.back());
430     }
431 
432     // If this queue was completely empty, then the SequenceManager needs to be
433     // informed so it can reload the work queue and add us to the
434     // TaskQueueSelector which can only be done from the main thread. In
435     // addition it may need to schedule a DoWork if this queue isn't blocked.
436     if (was_immediate_incoming_queue_empty &&
437         any_thread_.immediate_work_queue_empty) {
438       empty_queues_to_reload_handle_.SetActive(true);
439       should_schedule_work =
440           any_thread_.post_immediate_task_should_schedule_work;
441     }
442   }
443 
444   // On windows it's important to call this outside of a lock because calling a
445   // pump while holding a lock can result in priority inversions. See
446   // http://shortn/_ntnKNqjDQT for a discussion.
447   //
448   // Calling ScheduleWork outside the lock should be safe, only the main thread
449   // can mutate |any_thread_.post_immediate_task_should_schedule_work|. If it
450   // transitions to false we call ScheduleWork redundantly that's harmless. If
451   // it transitions to true, the side effect of
452   // |empty_queues_to_reload_handle_SetActive(true)| is guaranteed to be picked
453   // up by the ThreadController's call to SequenceManagerImpl::DelayTillNextTask
454   // when it computes what continuation (if any) is needed.
455   if (should_schedule_work)
456     sequence_manager_->ScheduleWork();
457 
458   TraceQueueSize();
459 }
460 
PostDelayedTaskImpl(PostedTask posted_task,CurrentThread current_thread)461 void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task,
462                                         CurrentThread current_thread) {
463   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
464   // for details.
465   CHECK(posted_task.callback);
466 
467   if (current_thread == CurrentThread::kMainThread) {
468     LazyNow lazy_now(sequence_manager_->main_thread_clock());
469     Task pending_task = MakeDelayedTask(std::move(posted_task), &lazy_now);
470     sequence_manager_->MaybeAddLeewayToTask(pending_task);
471     PushOntoDelayedIncomingQueueFromMainThread(
472         std::move(pending_task), &lazy_now,
473         /* notify_task_annotator */ true);
474   } else {
475     LazyNow lazy_now(sequence_manager_->any_thread_clock());
476     PushOntoDelayedIncomingQueue(
477         MakeDelayedTask(std::move(posted_task), &lazy_now));
478   }
479 }
480 
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,LazyNow * lazy_now,bool notify_task_annotator)481 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
482     Task pending_task,
483     LazyNow* lazy_now,
484     bool notify_task_annotator) {
485 #if DCHECK_IS_ON()
486   pending_task.cross_thread_ = false;
487 #endif
488 
489   if (notify_task_annotator) {
490     sequence_manager_->WillQueueTask(&pending_task);
491     MaybeReportIpcTaskQueuedFromMainThread(pending_task);
492   }
493   RecordQueuingDelayedTaskMetrics(pending_task, lazy_now);
494   main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
495   UpdateWakeUp(lazy_now);
496 
497   TraceQueueSize();
498 }
499 
PushOntoDelayedIncomingQueue(Task pending_task)500 void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) {
501   sequence_manager_->WillQueueTask(&pending_task);
502   MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(pending_task);
503 
504 #if DCHECK_IS_ON()
505   pending_task.cross_thread_ = true;
506 #endif
507 
508   // TODO(altimin): Add a copy method to Task to capture metadata here.
509   auto task_runner = pending_task.task_runner;
510   const auto task_type = pending_task.task_type;
511   PostImmediateTaskImpl(
512       PostedTask(std::move(task_runner),
513                  BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
514                           Unretained(this), std::move(pending_task)),
515                  FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type),
516       CurrentThread::kNotMainThread);
517 }
518 
ScheduleDelayedWorkTask(Task pending_task)519 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
520   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
521   sequence_manager_->MaybeAddLeewayToTask(pending_task);
522   TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks();
523   LazyNow lazy_now(now);
524   // A delayed task is ready to run as soon as earliest_delayed_run_time() is
525   // reached.
526   if (pending_task.earliest_delayed_run_time() <= now) {
527     // If |delayed_run_time| is in the past then push it onto the work queue
528     // immediately. To ensure the right task ordering we need to temporarily
529     // push it onto the |delayed_incoming_queue|.
530     pending_task.delayed_run_time = now;
531     main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
532     MoveReadyDelayedTasksToWorkQueue(
533         &lazy_now, sequence_manager_->GetNextSequenceNumber());
534   } else {
535     // If |delayed_run_time| is in the future we can queue it as normal.
536     PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
537                                                &lazy_now, false);
538   }
539   TraceQueueSize();
540 }
541 
RecordQueuingDelayedTaskMetrics(const Task & pending_task,LazyNow * lazy_now)542 void TaskQueueImpl::RecordQueuingDelayedTaskMetrics(const Task& pending_task,
543                                                     LazyNow* lazy_now) {
544   // The sampling depends on having a high-resolution clock.
545   if (!base::TimeTicks::IsHighResolution())
546     return;
547 
548   // A sample is taken on average every kSampleRate tasks.
549   static constexpr int kSampleRate = 10000;
550 
551   // Use pseudorandom sampling to avoid "running jank," which may occur
552   // when emitting many samples to a histogram in parallel. (This function is
553   // called a lot in parallel.) See https://crbug/1254354 for more details. The
554   // current time is used as a source of pseudorandomness.
555   if (((lazy_now->Now() - TimeTicks::UnixEpoch()).InMicroseconds() ^
556        pending_task.sequence_num) %
557           kSampleRate ==
558       0) {
559     // The |delay| will be different than the delay passed to PostDelayedTask
560     // for cross-thread delayed tasks.
561     const TimeDelta delay = pending_task.delayed_run_time - lazy_now->Now();
562     UMA_HISTOGRAM_LONG_TIMES("Scheduler.TaskQueueImpl.PostDelayedTaskDelay",
563                              delay);
564     UMA_HISTOGRAM_COUNTS_1000(
565         "Scheduler.TaskQueueImpl.DelayedIncomingQueueSize",
566         static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
567   }
568 }
569 
ReloadEmptyImmediateWorkQueue()570 void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() {
571   DCHECK(main_thread_only().immediate_work_queue->Empty());
572   main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks();
573 
574   if (main_thread_only().throttler && IsQueueEnabled()) {
575     main_thread_only().throttler->OnHasImmediateTask();
576   }
577 }
578 
TakeImmediateIncomingQueueTasks(TaskDeque * queue)579 void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) {
580   DCHECK(queue->empty());
581   // Now is a good time to consider reducing the empty queue's capacity if we're
582   // wasting memory, before we make it the `immediate_incoming_queue`.
583   queue->MaybeShrinkQueue();
584 
585   base::internal::CheckedAutoLock lock(any_thread_lock_);
586   queue->swap(any_thread_.immediate_incoming_queue);
587 
588   // Activate delayed fence if necessary. This is ideologically similar to
589   // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
590   // from any thread we can't generate an enqueue order for the fence there,
591   // so we have to check all immediate tasks and use their enqueue order for
592   // a fence.
593   if (main_thread_only().delayed_fence) {
594     for (const Task& task : *queue) {
595       DCHECK(!task.queue_time.is_null());
596       DCHECK(task.delayed_run_time.is_null());
597       if (task.queue_time >= main_thread_only().delayed_fence.value()) {
598         main_thread_only().delayed_fence = absl::nullopt;
599         DCHECK(!main_thread_only().current_fence);
600         main_thread_only().current_fence = Fence(task.task_order());
601         // Do not trigger WorkQueueSets notification when taking incoming
602         // immediate queue.
603         main_thread_only().immediate_work_queue->InsertFenceSilently(
604             *main_thread_only().current_fence);
605         main_thread_only().delayed_work_queue->InsertFenceSilently(
606             *main_thread_only().current_fence);
607         break;
608       }
609     }
610   }
611 
612   UpdateCrossThreadQueueStateLocked();
613 }
614 
IsEmpty() const615 bool TaskQueueImpl::IsEmpty() const {
616   if (!main_thread_only().delayed_work_queue->Empty() ||
617       !main_thread_only().delayed_incoming_queue.empty() ||
618       !main_thread_only().immediate_work_queue->Empty()) {
619     return false;
620   }
621 
622   base::internal::CheckedAutoLock lock(any_thread_lock_);
623   return any_thread_.immediate_incoming_queue.empty();
624 }
625 
GetNumberOfPendingTasks() const626 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
627   size_t task_count = 0;
628   task_count += main_thread_only().delayed_work_queue->Size();
629   task_count += main_thread_only().delayed_incoming_queue.size();
630   task_count += main_thread_only().immediate_work_queue->Size();
631 
632   base::internal::CheckedAutoLock lock(any_thread_lock_);
633   task_count += any_thread_.immediate_incoming_queue.size();
634   return task_count;
635 }
636 
HasTaskToRunImmediatelyOrReadyDelayedTask() const637 bool TaskQueueImpl::HasTaskToRunImmediatelyOrReadyDelayedTask() const {
638   // Any work queue tasks count as immediate work.
639   if (!main_thread_only().delayed_work_queue->Empty() ||
640       !main_thread_only().immediate_work_queue->Empty()) {
641     return true;
642   }
643 
644   // Tasks on |delayed_incoming_queue| that could run now, count as
645   // immediate work.
646   if (!main_thread_only().delayed_incoming_queue.empty() &&
647       main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
648           sequence_manager_->main_thread_clock()->NowTicks()) {
649     return true;
650   }
651 
652   // Finally tasks on |immediate_incoming_queue| count as immediate work.
653   base::internal::CheckedAutoLock lock(any_thread_lock_);
654   return !any_thread_.immediate_incoming_queue.empty();
655 }
656 
GetNextDesiredWakeUp()657 absl::optional<WakeUp> TaskQueueImpl::GetNextDesiredWakeUp() {
658   // Note we don't scheduled a wake-up for disabled queues.
659   if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
660     return absl::nullopt;
661 
662   // High resolution is needed if the queue contains high resolution tasks and
663   // has a priority index <= kNormalPriority (precise execution time is
664   // unnecessary for a low priority queue).
665   WakeUpResolution resolution = has_pending_high_resolution_tasks() &&
666                                         GetQueuePriority() <= DefaultPriority()
667                                     ? WakeUpResolution::kHigh
668                                     : WakeUpResolution::kLow;
669 
670   const auto& top_task = main_thread_only().delayed_incoming_queue.top();
671   return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution,
672                 top_task.delay_policy};
673 }
674 
OnWakeUp(LazyNow * lazy_now,EnqueueOrder enqueue_order)675 void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) {
676   MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order);
677   if (main_thread_only().throttler) {
678     main_thread_only().throttler->OnWakeUp(lazy_now);
679   }
680 }
681 
RemoveAllCanceledDelayedTasksFromFront(LazyNow * lazy_now)682 bool TaskQueueImpl::RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) {
683   // Because task destructors could have a side-effect of posting new tasks, we
684   // move all the cancelled tasks into a temporary container before deleting
685   // them. This is to avoid the queue from changing while iterating over it.
686   StackVector<Task, 8> tasks_to_delete;
687 
688   while (!main_thread_only().delayed_incoming_queue.empty()) {
689     const Task& task = main_thread_only().delayed_incoming_queue.top();
690     CHECK(task.task);
691     if (!task.task.IsCancelled())
692       break;
693 
694     tasks_to_delete->push_back(
695         main_thread_only().delayed_incoming_queue.take_top());
696   }
697 
698   if (!tasks_to_delete->empty()) {
699     UpdateWakeUp(lazy_now);
700     return true;
701   }
702 
703   return false;
704 }
705 
MoveReadyDelayedTasksToWorkQueue(LazyNow * lazy_now,EnqueueOrder enqueue_order)706 void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue(
707     LazyNow* lazy_now,
708     EnqueueOrder enqueue_order) {
709   // Enqueue all delayed tasks that should be running now, skipping any that
710   // have been canceled.
711   WorkQueue::TaskPusher delayed_work_queue_task_pusher(
712       main_thread_only().delayed_work_queue->CreateTaskPusher());
713 
714   // Because task destructors could have a side-effect of posting new tasks, we
715   // move all the cancelled tasks into a temporary container before deleting
716   // them. This is to avoid the queue from changing while iterating over it.
717   StackVector<Task, 8> tasks_to_delete;
718 
719   while (!main_thread_only().delayed_incoming_queue.empty()) {
720     const Task& task = main_thread_only().delayed_incoming_queue.top();
721     CHECK(task.task);
722 
723     // Leave the top task alone if it hasn't been canceled and it is not ready.
724     const bool is_cancelled = task.task.IsCancelled();
725     if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now())
726       break;
727 
728     Task ready_task = main_thread_only().delayed_incoming_queue.take_top();
729     if (is_cancelled) {
730       tasks_to_delete->push_back(std::move(ready_task));
731       continue;
732     }
733 
734     // The top task is ready to run. Move it to the delayed work queue.
735 #if DCHECK_IS_ON()
736     if (sequence_manager_->settings().log_task_delay_expiry)
737       VLOG(0) << GetName() << " Delay expired for "
738               << ready_task.posted_from.ToString();
739 #endif  // DCHECK_IS_ON()
740     DCHECK(!ready_task.delayed_run_time.is_null());
741     DCHECK(!ready_task.enqueue_order_set());
742     ready_task.set_enqueue_order(enqueue_order);
743     ActivateDelayedFenceIfNeeded(ready_task);
744 
745     delayed_work_queue_task_pusher.Push(std::move(ready_task));
746   }
747 
748   // Explicitly delete tasks last.
749   tasks_to_delete->clear();
750 
751   UpdateWakeUp(lazy_now);
752 }
753 
TraceQueueSize() const754 void TaskQueueImpl::TraceQueueSize() const {
755   bool is_tracing;
756   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
757       TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
758   if (!is_tracing)
759     return;
760 
761   // It's only safe to access the work queues from the main thread.
762   // TODO(alexclarke): We should find another way of tracing this
763   if (!associated_thread_->IsBoundToCurrentThread())
764     return;
765 
766   size_t total_task_count;
767   {
768     base::internal::CheckedAutoLock lock(any_thread_lock_);
769     total_task_count = any_thread_.immediate_incoming_queue.size() +
770                        main_thread_only().immediate_work_queue->Size() +
771                        main_thread_only().delayed_work_queue->Size() +
772                        main_thread_only().delayed_incoming_queue.size();
773   }
774   TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
775                  total_task_count);
776 }
777 
SetQueuePriority(TaskQueue::QueuePriority priority)778 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
779   const TaskQueue::QueuePriority previous_priority = GetQueuePriority();
780   if (priority == previous_priority)
781     return;
782   sequence_manager_->main_thread_only().selector.SetQueuePriority(this,
783                                                                   priority);
784 
785 #if BUILDFLAG(IS_WIN)
786   // Updating queue priority can change whether high resolution timer is needed.
787   LazyNow lazy_now(sequence_manager_->main_thread_clock());
788   UpdateWakeUp(&lazy_now);
789 #endif
790 
791   if (priority > DefaultPriority()) {
792     // |priority| is now lower than the default, so update accordingly.
793     main_thread_only()
794         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
795         EnqueueOrder::max();
796   } else if (previous_priority > DefaultPriority()) {
797     // |priority| is no longer lower than the default, so record current
798     // sequence number.
799     DCHECK_EQ(
800         main_thread_only()
801             .enqueue_order_at_which_we_became_unblocked_with_normal_priority,
802         EnqueueOrder::max());
803     main_thread_only()
804         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
805         sequence_manager_->GetNextSequenceNumber();
806   }
807 }
808 
GetQueuePriority() const809 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
810   size_t set_index = immediate_work_queue()->work_queue_set_index();
811   DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
812   return static_cast<TaskQueue::QueuePriority>(set_index);
813 }
814 
AsValue(TimeTicks now,bool force_verbose) const815 Value::Dict TaskQueueImpl::AsValue(TimeTicks now, bool force_verbose) const {
816   base::internal::CheckedAutoLock lock(any_thread_lock_);
817   Value::Dict state;
818   state.Set("name", GetName());
819   if (any_thread_.unregistered) {
820     state.Set("unregistered", true);
821     return state;
822   }
823   DCHECK(main_thread_only().delayed_work_queue);
824   DCHECK(main_thread_only().immediate_work_queue);
825 
826   state.Set("task_queue_id",
827             StringPrintf("0x%" PRIx64, static_cast<uint64_t>(
828                                            reinterpret_cast<uintptr_t>(this))));
829   state.Set("enabled", IsQueueEnabled());
830   // TODO(crbug.com/1334256): Make base::Value able to store an int64_t and
831   // remove the various static_casts below.
832   state.Set("any_thread_.immediate_incoming_queuesize",
833             static_cast<int>(any_thread_.immediate_incoming_queue.size()));
834   state.Set("delayed_incoming_queue_size",
835             static_cast<int>(main_thread_only().delayed_incoming_queue.size()));
836   state.Set("immediate_work_queue_size",
837             static_cast<int>(main_thread_only().immediate_work_queue->Size()));
838   state.Set("delayed_work_queue_size",
839             static_cast<int>(main_thread_only().delayed_work_queue->Size()));
840 
841   state.Set("any_thread_.immediate_incoming_queuecapacity",
842             static_cast<int>(any_thread_.immediate_incoming_queue.capacity()));
843   state.Set("immediate_work_queue_capacity",
844             static_cast<int>(immediate_work_queue()->Capacity()));
845   state.Set("delayed_work_queue_capacity",
846             static_cast<int>(delayed_work_queue()->Capacity()));
847 
848   if (!main_thread_only().delayed_incoming_queue.empty()) {
849     TimeDelta delay_to_next_task =
850         (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
851          sequence_manager_->main_thread_clock()->NowTicks());
852     state.Set("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF());
853   }
854   if (main_thread_only().current_fence) {
855     Value::Dict fence_state;
856     fence_state.Set(
857         "enqueue_order",
858         static_cast<int>(
859             main_thread_only().current_fence->task_order().enqueue_order()));
860     fence_state.Set("activated_in_wake_up", !main_thread_only()
861                                                  .current_fence->task_order()
862                                                  .delayed_run_time()
863                                                  .is_null());
864     state.Set("current_fence", std::move(fence_state));
865   }
866   if (main_thread_only().delayed_fence) {
867     state.Set("delayed_fence_seconds_from_now",
868               (main_thread_only().delayed_fence.value() - now).InSecondsF());
869   }
870 
871   bool verbose = false;
872   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
873       TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
874       &verbose);
875 
876   if (verbose || force_verbose) {
877     state.Set("immediate_incoming_queue",
878               QueueAsValue(any_thread_.immediate_incoming_queue, now));
879     state.Set("delayed_work_queue",
880               main_thread_only().delayed_work_queue->AsValue(now));
881     state.Set("immediate_work_queue",
882               main_thread_only().immediate_work_queue->AsValue(now));
883     state.Set("delayed_incoming_queue",
884               main_thread_only().delayed_incoming_queue.AsValue(now));
885   }
886   state.Set("priority", GetQueuePriority());
887   return state;
888 }
889 
AddTaskObserver(TaskObserver * task_observer)890 void TaskQueueImpl::AddTaskObserver(TaskObserver* task_observer) {
891   main_thread_only().task_observers.AddObserver(task_observer);
892 }
893 
RemoveTaskObserver(TaskObserver * task_observer)894 void TaskQueueImpl::RemoveTaskObserver(TaskObserver* task_observer) {
895   main_thread_only().task_observers.RemoveObserver(task_observer);
896 }
897 
NotifyWillProcessTask(const Task & task,bool was_blocked_or_low_priority)898 void TaskQueueImpl::NotifyWillProcessTask(const Task& task,
899                                           bool was_blocked_or_low_priority) {
900   DCHECK(should_notify_observers_);
901 
902   for (auto& observer : main_thread_only().task_observers)
903     observer.WillProcessTask(task, was_blocked_or_low_priority);
904 }
905 
NotifyDidProcessTask(const Task & task)906 void TaskQueueImpl::NotifyDidProcessTask(const Task& task) {
907   DCHECK(should_notify_observers_);
908   for (auto& observer : main_thread_only().task_observers)
909     observer.DidProcessTask(task);
910 }
911 
InsertFence(TaskQueue::InsertFencePosition position)912 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
913   Fence new_fence = position == TaskQueue::InsertFencePosition::kNow
914                         ? Fence::CreateWithEnqueueOrder(
915                               sequence_manager_->GetNextSequenceNumber())
916                         : Fence::BlockingFence();
917   InsertFence(new_fence);
918 }
919 
InsertFence(Fence current_fence)920 void TaskQueueImpl::InsertFence(Fence current_fence) {
921   // Only one fence may be present at a time.
922   main_thread_only().delayed_fence = absl::nullopt;
923 
924   absl::optional<Fence> previous_fence = main_thread_only().current_fence;
925 
926   // Tasks posted after this point will have a strictly higher enqueue order
927   // and will be blocked from running.
928   main_thread_only().current_fence = current_fence;
929   bool front_task_unblocked =
930       main_thread_only().immediate_work_queue->InsertFence(current_fence);
931   front_task_unblocked |=
932       main_thread_only().delayed_work_queue->InsertFence(current_fence);
933 
934   {
935     base::internal::CheckedAutoLock lock(any_thread_lock_);
936     if (!front_task_unblocked && previous_fence &&
937         previous_fence->task_order() < current_fence.task_order()) {
938       if (!any_thread_.immediate_incoming_queue.empty() &&
939           any_thread_.immediate_incoming_queue.front().task_order() >
940               previous_fence->task_order() &&
941           any_thread_.immediate_incoming_queue.front().task_order() <
942               current_fence.task_order()) {
943         front_task_unblocked = true;
944       }
945     }
946 
947     UpdateCrossThreadQueueStateLocked();
948   }
949 
950   if (IsQueueEnabled() && front_task_unblocked) {
951     OnQueueUnblocked();
952     sequence_manager_->ScheduleWork();
953   }
954 }
955 
InsertFenceAt(TimeTicks time)956 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
957   DCHECK(delayed_fence_allowed_)
958       << "Delayed fences are not supported for this queue. Enable them "
959          "explicitly in TaskQueue::Spec when creating the queue";
960 
961   // Task queue can have only one fence, delayed or not.
962   RemoveFence();
963   main_thread_only().delayed_fence = time;
964 }
965 
RemoveFence()966 void TaskQueueImpl::RemoveFence() {
967   absl::optional<Fence> previous_fence = main_thread_only().current_fence;
968   main_thread_only().current_fence = absl::nullopt;
969   main_thread_only().delayed_fence = absl::nullopt;
970 
971   bool front_task_unblocked =
972       main_thread_only().immediate_work_queue->RemoveFence();
973   front_task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
974 
975   {
976     base::internal::CheckedAutoLock lock(any_thread_lock_);
977     if (!front_task_unblocked && previous_fence) {
978       if (!any_thread_.immediate_incoming_queue.empty() &&
979           any_thread_.immediate_incoming_queue.front().task_order() >
980               previous_fence->task_order()) {
981         front_task_unblocked = true;
982       }
983     }
984 
985     UpdateCrossThreadQueueStateLocked();
986   }
987 
988   if (IsQueueEnabled() && front_task_unblocked) {
989     OnQueueUnblocked();
990     sequence_manager_->ScheduleWork();
991   }
992 }
993 
BlockedByFence() const994 bool TaskQueueImpl::BlockedByFence() const {
995   if (!main_thread_only().current_fence)
996     return false;
997 
998   if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
999       !main_thread_only().delayed_work_queue->BlockedByFence()) {
1000     return false;
1001   }
1002 
1003   base::internal::CheckedAutoLock lock(any_thread_lock_);
1004   if (any_thread_.immediate_incoming_queue.empty())
1005     return true;
1006 
1007   return any_thread_.immediate_incoming_queue.front().task_order() >
1008          main_thread_only().current_fence->task_order();
1009 }
1010 
HasActiveFence()1011 bool TaskQueueImpl::HasActiveFence() {
1012   if (main_thread_only().delayed_fence &&
1013       sequence_manager_->main_thread_clock()->NowTicks() >
1014           main_thread_only().delayed_fence.value()) {
1015     return true;
1016   }
1017   return !!main_thread_only().current_fence;
1018 }
1019 
CouldTaskRun(EnqueueOrder enqueue_order) const1020 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
1021   if (!IsQueueEnabled())
1022     return false;
1023 
1024   if (!main_thread_only().current_fence)
1025     return true;
1026 
1027   // TODO(crbug.com/1249857): This should use TaskOrder. This is currently only
1028   // used for tests and is fine as-is, but we should be using `TaskOrder` for
1029   // task comparisons. Also this test should be renamed with a testing suffix as
1030   // it is not used in production.
1031   return enqueue_order <
1032          main_thread_only().current_fence->task_order().enqueue_order();
1033 }
1034 
WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const1035 bool TaskQueueImpl::WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const {
1036   return enqueue_order <
1037          main_thread_only()
1038              .enqueue_order_at_which_we_became_unblocked_with_normal_priority;
1039 }
1040 
1041 // static
QueueAsValue(const TaskDeque & queue,TimeTicks now)1042 Value::List TaskQueueImpl::QueueAsValue(const TaskDeque& queue, TimeTicks now) {
1043   Value::List state;
1044   for (const Task& task : queue)
1045     state.Append(TaskAsValue(task, now));
1046   return state;
1047 }
1048 
1049 // static
TaskAsValue(const Task & task,TimeTicks now)1050 Value::Dict TaskQueueImpl::TaskAsValue(const Task& task, TimeTicks now) {
1051   Value::Dict state;
1052   state.Set("posted_from", task.posted_from.ToString());
1053   if (task.enqueue_order_set())
1054     state.Set("enqueue_order", static_cast<int>(task.enqueue_order()));
1055   state.Set("sequence_num", task.sequence_num);
1056   state.Set("nestable", task.nestable == Nestable::kNestable);
1057   state.Set("is_high_res", task.is_high_res);
1058   state.Set("is_cancelled", task.task.IsCancelled());
1059   state.Set("delayed_run_time",
1060             (task.delayed_run_time - TimeTicks()).InMillisecondsF());
1061   const TimeDelta delayed_run_time_milliseconds_from_now =
1062       task.delayed_run_time.is_null() ? TimeDelta()
1063                                       : (task.delayed_run_time - now);
1064   state.Set("delayed_run_time_milliseconds_from_now",
1065             delayed_run_time_milliseconds_from_now.InMillisecondsF());
1066   return state;
1067 }
1068 
MakeDelayedTask(PostedTask delayed_task,LazyNow * lazy_now) const1069 Task TaskQueueImpl::MakeDelayedTask(PostedTask delayed_task,
1070                                     LazyNow* lazy_now) const {
1071   EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber();
1072   base::TimeDelta delay;
1073   WakeUpResolution resolution = WakeUpResolution::kLow;
1074 #if BUILDFLAG(IS_WIN)
1075   const bool explicit_high_resolution_timer_win =
1076       g_explicit_high_resolution_timer_win.load(std::memory_order_relaxed);
1077 #endif  // BUILDFLAG(IS_WIN)
1078   if (absl::holds_alternative<base::TimeDelta>(
1079           delayed_task.delay_or_delayed_run_time)) {
1080     delay = absl::get<base::TimeDelta>(delayed_task.delay_or_delayed_run_time);
1081     delayed_task.delay_or_delayed_run_time = lazy_now->Now() + delay;
1082   }
1083 #if BUILDFLAG(IS_WIN)
1084   else if (!explicit_high_resolution_timer_win) {
1085     delay = absl::get<base::TimeTicks>(delayed_task.delay_or_delayed_run_time) -
1086             lazy_now->Now();
1087   }
1088   if (!explicit_high_resolution_timer_win &&
1089       delay < (2 * base::Milliseconds(Time::kMinLowResolutionThresholdMs))) {
1090     // Outside the kExplicitHighResolutionTimerWin experiment, We consider the
1091     // task needs a high resolution timer if the delay is more than 0 and less
1092     // than 32ms. This caps the relative error to less than 50% : a 33ms wait
1093     // can wake at 48ms since the default resolution on Windows is between 10
1094     // and 15ms.
1095     resolution = WakeUpResolution::kHigh;
1096   }
1097 #endif  // BUILDFLAG(IS_WIN)
1098   // leeway isn't specified yet since this may be called from any thread.
1099   return Task(std::move(delayed_task), sequence_number, EnqueueOrder(),
1100               lazy_now->Now(), resolution);
1101 }
1102 
IsQueueEnabled() const1103 bool TaskQueueImpl::IsQueueEnabled() const {
1104   return main_thread_only().is_enabled;
1105 }
1106 
SetQueueEnabled(bool enabled)1107 void TaskQueueImpl::SetQueueEnabled(bool enabled) {
1108   if (main_thread_only().is_enabled == enabled)
1109     return;
1110 
1111   // Update the |main_thread_only_| struct.
1112   main_thread_only().is_enabled = enabled;
1113   main_thread_only().disabled_time = absl::nullopt;
1114 
1115   // |sequence_manager_| can be null in tests.
1116   if (!sequence_manager_)
1117     return;
1118 
1119   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1120 
1121   if (!enabled) {
1122     bool tracing_enabled = false;
1123     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1124                                        &tracing_enabled);
1125     main_thread_only().disabled_time = lazy_now.Now();
1126   } else {
1127     // Override reporting if the queue is becoming enabled again.
1128     main_thread_only().should_report_posted_tasks_when_disabled = false;
1129   }
1130 
1131   // If there is a throttler, it will be notified of pending delayed and
1132   // immediate tasks inside UpdateWakeUp().
1133   UpdateWakeUp(&lazy_now);
1134 
1135   {
1136     base::internal::CheckedAutoLock lock(any_thread_lock_);
1137     UpdateCrossThreadQueueStateLocked();
1138 
1139     // Copy over the task-reporting related state.
1140     any_thread_.tracing_only.is_enabled = enabled;
1141     any_thread_.tracing_only.disabled_time = main_thread_only().disabled_time;
1142     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1143         main_thread_only().should_report_posted_tasks_when_disabled;
1144   }
1145 
1146   // Finally, enable or disable the queue with the selector.
1147   if (enabled) {
1148     // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
1149     // a DoWork if needed.
1150     sequence_manager_->main_thread_only().selector.EnableQueue(this);
1151 
1152     if (!BlockedByFence())
1153       OnQueueUnblocked();
1154   } else {
1155     sequence_manager_->main_thread_only().selector.DisableQueue(this);
1156   }
1157 }
1158 
SetShouldReportPostedTasksWhenDisabled(bool should_report)1159 void TaskQueueImpl::SetShouldReportPostedTasksWhenDisabled(bool should_report) {
1160   if (main_thread_only().should_report_posted_tasks_when_disabled ==
1161       should_report)
1162     return;
1163 
1164   // Only observe transitions turning the reporting on if tracing is enabled.
1165   if (should_report) {
1166     bool tracing_enabled = false;
1167     TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1168                                        &tracing_enabled);
1169     if (!tracing_enabled)
1170       return;
1171   }
1172 
1173   main_thread_only().should_report_posted_tasks_when_disabled = should_report;
1174 
1175   // Mirror the state to the AnyThread struct as well.
1176   {
1177     base::internal::CheckedAutoLock lock(any_thread_lock_);
1178     any_thread_.tracing_only.should_report_posted_tasks_when_disabled =
1179         should_report;
1180   }
1181 }
1182 
UpdateCrossThreadQueueStateLocked()1183 void TaskQueueImpl::UpdateCrossThreadQueueStateLocked() {
1184   any_thread_.immediate_work_queue_empty =
1185       main_thread_only().immediate_work_queue->Empty();
1186 
1187   if (main_thread_only().throttler) {
1188     // If there's a Throttler, always ScheduleWork() when immediate work is
1189     // posted and the queue is enabled, to ensure that
1190     // Throttler::OnHasImmediateTask() is invoked.
1191     any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled();
1192   } else {
1193     // Otherwise, ScheduleWork() only if the queue is enabled and there isn't a
1194     // fence to prevent the task from being executed.
1195     any_thread_.post_immediate_task_should_schedule_work =
1196         IsQueueEnabled() && !main_thread_only().current_fence;
1197   }
1198 
1199 #if DCHECK_IS_ON()
1200   any_thread_.queue_set_index =
1201       main_thread_only().immediate_work_queue->work_queue_set_index();
1202 #endif
1203 }
1204 
ReclaimMemory(TimeTicks now)1205 void TaskQueueImpl::ReclaimMemory(TimeTicks now) {
1206   if (main_thread_only().delayed_incoming_queue.empty())
1207     return;
1208 
1209   if (g_is_sweep_cancelled_tasks_enabled) {
1210     main_thread_only().delayed_incoming_queue.SweepCancelledTasks(
1211         sequence_manager_);
1212 
1213     // If deleting one of the cancelled tasks shut down this queue, bail out.
1214     // Note that in this scenario |this| is still valid, but some fields of the
1215     // queue have been cleared out by |UnregisterTaskQueue|.
1216     if (!main_thread_only().delayed_work_queue)
1217       return;
1218 
1219     LazyNow lazy_now(now);
1220     UpdateWakeUp(&lazy_now);
1221   }
1222 
1223   // Also consider shrinking the work queue if it's wasting memory.
1224   main_thread_only().delayed_work_queue->MaybeShrinkQueue();
1225   main_thread_only().immediate_work_queue->MaybeShrinkQueue();
1226 
1227   {
1228     base::internal::CheckedAutoLock lock(any_thread_lock_);
1229     any_thread_.immediate_incoming_queue.MaybeShrinkQueue();
1230   }
1231 }
1232 
PushImmediateIncomingTaskForTest(Task task)1233 void TaskQueueImpl::PushImmediateIncomingTaskForTest(Task task) {
1234   base::internal::CheckedAutoLock lock(any_thread_lock_);
1235   any_thread_.immediate_incoming_queue.push_back(std::move(task));
1236 }
1237 
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)1238 void TaskQueueImpl::RequeueDeferredNonNestableTask(
1239     DeferredNonNestableTask task) {
1240   DCHECK(task.task.nestable == Nestable::kNonNestable);
1241 
1242   // It's possible that the queue was unregistered since the task was posted.
1243   // Skip the task in that case.
1244   if (!main_thread_only().delayed_work_queue)
1245     return;
1246 
1247   // The re-queued tasks have to be pushed onto the front because we'd otherwise
1248   // violate the strict monotonically increasing enqueue order within the
1249   // WorkQueue.  We can't assign them a new enqueue order here because that will
1250   // not behave correctly with fences and things will break (e.g Idle TQ).
1251   if (task.work_queue_type == WorkQueueType::kDelayed) {
1252     main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
1253         std::move(task.task));
1254   } else {
1255     // We're about to push |task| onto an empty |immediate_work_queue|
1256     // (bypassing |immediate_incoming_queue_|). As such, we no longer need to
1257     // reload if we were planning to. The flag must be cleared while holding
1258     // the lock to avoid a cross-thread post task setting it again before
1259     // we actually make |immediate_work_queue| non-empty.
1260     if (main_thread_only().immediate_work_queue->Empty()) {
1261       base::internal::CheckedAutoLock lock(any_thread_lock_);
1262       empty_queues_to_reload_handle_.SetActive(false);
1263 
1264       any_thread_.immediate_work_queue_empty = false;
1265       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1266           std::move(task.task));
1267 
1268     } else {
1269       main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
1270           std::move(task.task));
1271     }
1272   }
1273 }
1274 
SetThrottler(TaskQueue::Throttler * throttler)1275 void TaskQueueImpl::SetThrottler(TaskQueue::Throttler* throttler) {
1276   DCHECK(throttler);
1277   DCHECK(!main_thread_only().throttler)
1278       << "Can't assign two different throttlers to "
1279          "base::sequence_manager:TaskQueue";
1280 
1281   main_thread_only().throttler = throttler;
1282 }
1283 
ResetThrottler()1284 void TaskQueueImpl::ResetThrottler() {
1285   main_thread_only().throttler = nullptr;
1286   LazyNow lazy_now(sequence_manager_->main_thread_clock());
1287   // The current delayed wake up may have been determined by the Throttler.
1288   // Update it now that there is no Throttler.
1289   UpdateWakeUp(&lazy_now);
1290 }
1291 
UpdateWakeUp(LazyNow * lazy_now)1292 void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) {
1293   absl::optional<WakeUp> wake_up = GetNextDesiredWakeUp();
1294   if (main_thread_only().throttler && IsQueueEnabled()) {
1295     // GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is
1296     // nullopt, e.g. to throttle immediate tasks.
1297     wake_up = main_thread_only().throttler->GetNextAllowedWakeUp(
1298         lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask());
1299   }
1300   SetNextWakeUp(lazy_now, wake_up);
1301 }
1302 
SetNextWakeUp(LazyNow * lazy_now,absl::optional<WakeUp> wake_up)1303 void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now,
1304                                   absl::optional<WakeUp> wake_up) {
1305   if (main_thread_only().scheduled_wake_up == wake_up)
1306     return;
1307   main_thread_only().scheduled_wake_up = wake_up;
1308   main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now,
1309                                                           wake_up);
1310 }
1311 
HasTaskToRunImmediately() const1312 bool TaskQueueImpl::HasTaskToRunImmediately() const {
1313   // Any work queue tasks count as immediate work.
1314   if (!main_thread_only().delayed_work_queue->Empty() ||
1315       !main_thread_only().immediate_work_queue->Empty()) {
1316     return true;
1317   }
1318 
1319   // Finally tasks on |immediate_incoming_queue| count as immediate work.
1320   base::internal::CheckedAutoLock lock(any_thread_lock_);
1321   return !any_thread_.immediate_incoming_queue.empty();
1322 }
1323 
HasTaskToRunImmediatelyLocked() const1324 bool TaskQueueImpl::HasTaskToRunImmediatelyLocked() const {
1325   return !main_thread_only().delayed_work_queue->Empty() ||
1326          !main_thread_only().immediate_work_queue->Empty() ||
1327          !any_thread_.immediate_incoming_queue.empty();
1328 }
1329 
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)1330 void TaskQueueImpl::SetOnTaskStartedHandler(
1331     TaskQueueImpl::OnTaskStartedHandler handler) {
1332   DCHECK(should_notify_observers_ || handler.is_null());
1333   main_thread_only().on_task_started_handler = std::move(handler);
1334 }
1335 
OnTaskStarted(const Task & task,const TaskQueue::TaskTiming & task_timing)1336 void TaskQueueImpl::OnTaskStarted(const Task& task,
1337                                   const TaskQueue::TaskTiming& task_timing) {
1338   if (!main_thread_only().on_task_started_handler.is_null())
1339     main_thread_only().on_task_started_handler.Run(task, task_timing);
1340 }
1341 
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)1342 void TaskQueueImpl::SetOnTaskCompletedHandler(
1343     TaskQueueImpl::OnTaskCompletedHandler handler) {
1344   DCHECK(should_notify_observers_ || handler.is_null());
1345   main_thread_only().on_task_completed_handler = std::move(handler);
1346 }
1347 
OnTaskCompleted(const Task & task,TaskQueue::TaskTiming * task_timing,LazyNow * lazy_now)1348 void TaskQueueImpl::OnTaskCompleted(const Task& task,
1349                                     TaskQueue::TaskTiming* task_timing,
1350                                     LazyNow* lazy_now) {
1351   if (!main_thread_only().on_task_completed_handler.is_null()) {
1352     main_thread_only().on_task_completed_handler.Run(task, task_timing,
1353                                                      lazy_now);
1354   }
1355 }
1356 
RequiresTaskTiming() const1357 bool TaskQueueImpl::RequiresTaskTiming() const {
1358   return !main_thread_only().on_task_started_handler.is_null() ||
1359          !main_thread_only().on_task_completed_handler.is_null();
1360 }
1361 
1362 std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler)1363 TaskQueueImpl::AddOnTaskPostedHandler(OnTaskPostedHandler handler) {
1364   DCHECK(should_notify_observers_ && !handler.is_null());
1365   std::unique_ptr<OnTaskPostedCallbackHandleImpl> handle =
1366       std::make_unique<OnTaskPostedCallbackHandleImpl>(this,
1367                                                        associated_thread_);
1368   base::internal::CheckedAutoLock lock(any_thread_lock_);
1369   any_thread_.on_task_posted_handlers.insert(
1370       {handle.get(), std::move(handler)});
1371   return handle;
1372 }
1373 
RemoveOnTaskPostedHandler(TaskQueueImpl::OnTaskPostedCallbackHandleImpl * on_task_posted_callback_handle)1374 void TaskQueueImpl::RemoveOnTaskPostedHandler(
1375     TaskQueueImpl::OnTaskPostedCallbackHandleImpl*
1376         on_task_posted_callback_handle) {
1377   base::internal::CheckedAutoLock lock(any_thread_lock_);
1378   any_thread_.on_task_posted_handlers.erase(on_task_posted_callback_handle);
1379 }
1380 
SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger)1381 void TaskQueueImpl::SetTaskExecutionTraceLogger(
1382     TaskExecutionTraceLogger logger) {
1383   DCHECK(should_notify_observers_ || logger.is_null());
1384   main_thread_only().task_execution_trace_logger = std::move(logger);
1385 }
1386 
IsUnregistered() const1387 bool TaskQueueImpl::IsUnregistered() const {
1388   base::internal::CheckedAutoLock lock(any_thread_lock_);
1389   return any_thread_.unregistered;
1390 }
1391 
GetSequenceManagerWeakPtr()1392 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
1393   return sequence_manager_->GetWeakPtr();
1394 }
1395 
ActivateDelayedFenceIfNeeded(const Task & task)1396 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) {
1397   if (!main_thread_only().delayed_fence)
1398     return;
1399   if (main_thread_only().delayed_fence.value() > task.delayed_run_time)
1400     return;
1401   InsertFence(Fence(task.task_order()));
1402   main_thread_only().delayed_fence = absl::nullopt;
1403 }
1404 
MaybeReportIpcTaskQueuedFromMainThread(const Task & pending_task)1405 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread(
1406     const Task& pending_task) {
1407   if (!pending_task.ipc_hash)
1408     return;
1409 
1410   // It's possible that tracing was just enabled and no disabled time has been
1411   // stored. In that case, skip emitting the event.
1412   if (!main_thread_only().disabled_time)
1413     return;
1414 
1415   bool tracing_enabled = false;
1416   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1417                                      &tracing_enabled);
1418   if (!tracing_enabled)
1419     return;
1420 
1421   if (main_thread_only().is_enabled ||
1422       !main_thread_only().should_report_posted_tasks_when_disabled) {
1423     return;
1424   }
1425 
1426   base::TimeDelta time_since_disabled =
1427       sequence_manager_->main_thread_clock()->NowTicks() -
1428       main_thread_only().disabled_time.value();
1429 
1430   ReportIpcTaskQueued(pending_task, time_since_disabled);
1431 }
1432 
ShouldReportIpcTaskQueuedFromAnyThreadLocked(base::TimeDelta * time_since_disabled)1433 bool TaskQueueImpl::ShouldReportIpcTaskQueuedFromAnyThreadLocked(
1434     base::TimeDelta* time_since_disabled) {
1435   // It's possible that tracing was just enabled and no disabled time has been
1436   // stored. In that case, skip emitting the event.
1437   if (!any_thread_.tracing_only.disabled_time)
1438     return false;
1439 
1440   if (any_thread_.tracing_only.is_enabled ||
1441       any_thread_.tracing_only.should_report_posted_tasks_when_disabled) {
1442     return false;
1443   }
1444 
1445   *time_since_disabled = sequence_manager_->any_thread_clock()->NowTicks() -
1446                          any_thread_.tracing_only.disabled_time.value();
1447   return true;
1448 }
1449 
MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task & pending_task)1450 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadLocked(
1451     const Task& pending_task) {
1452   if (!pending_task.ipc_hash)
1453     return;
1454 
1455   bool tracing_enabled = false;
1456   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1457                                      &tracing_enabled);
1458   if (!tracing_enabled)
1459     return;
1460 
1461   base::TimeDelta time_since_disabled;
1462   if (ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled))
1463     ReportIpcTaskQueued(pending_task, time_since_disabled);
1464 }
1465 
MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task & pending_task)1466 void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(
1467     const Task& pending_task) {
1468   if (!pending_task.ipc_hash)
1469     return;
1470 
1471   bool tracing_enabled = false;
1472   TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"),
1473                                      &tracing_enabled);
1474   if (!tracing_enabled)
1475     return;
1476 
1477   base::TimeDelta time_since_disabled;
1478   bool should_report = false;
1479   {
1480     base::internal::CheckedAutoLock lock(any_thread_lock_);
1481     should_report =
1482         ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled);
1483   }
1484 
1485   if (should_report)
1486     ReportIpcTaskQueued(pending_task, time_since_disabled);
1487 }
1488 
ReportIpcTaskQueued(const Task & pending_task,const base::TimeDelta & time_since_disabled)1489 void TaskQueueImpl::ReportIpcTaskQueued(
1490     const Task& pending_task,
1491     const base::TimeDelta& time_since_disabled) {
1492   TRACE_EVENT_INSTANT(
1493       TRACE_DISABLED_BY_DEFAULT("lifecycles"), "task_posted_to_disabled_queue",
1494       [&](perfetto::EventContext ctx) {
1495         auto* proto = ctx.event<perfetto::protos::pbzero::ChromeTrackEvent>()
1496                           ->set_chrome_task_posted_to_disabled_queue();
1497         proto->set_time_since_disabled_ms(
1498             checked_cast<uint64_t>(time_since_disabled.InMilliseconds()));
1499         proto->set_ipc_hash(pending_task.ipc_hash);
1500         proto->set_source_location_iid(
1501             base::trace_event::InternedSourceLocation::Get(
1502                 &ctx, pending_task.posted_from));
1503       });
1504 }
1505 
OnQueueUnblocked()1506 void TaskQueueImpl::OnQueueUnblocked() {
1507   DCHECK(IsQueueEnabled());
1508   DCHECK(!BlockedByFence());
1509 
1510   main_thread_only().enqueue_order_at_which_we_became_unblocked =
1511       sequence_manager_->GetNextSequenceNumber();
1512   if (GetQueuePriority() <= DefaultPriority()) {
1513     // We are default priority or more important so update
1514     // |enqueue_order_at_which_we_became_unblocked_with_normal_priority|.
1515     main_thread_only()
1516         .enqueue_order_at_which_we_became_unblocked_with_normal_priority =
1517         main_thread_only().enqueue_order_at_which_we_became_unblocked;
1518   }
1519 }
1520 
1521 TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default;
1522 TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default;
1523 
push(Task task)1524 void TaskQueueImpl::DelayedIncomingQueue::push(Task task) {
1525   // TODO(crbug.com/1247285): Remove this once the cause of corrupted tasks in
1526   // the queue is understood.
1527   CHECK(task.task);
1528   if (task.is_high_res)
1529     pending_high_res_tasks_++;
1530   queue_.insert(std::move(task));
1531 }
1532 
remove(HeapHandle heap_handle)1533 void TaskQueueImpl::DelayedIncomingQueue::remove(HeapHandle heap_handle) {
1534   DCHECK(!empty());
1535   DCHECK_LT(heap_handle.index(), queue_.size());
1536   Task task = queue_.take(heap_handle);
1537   if (task.is_high_res) {
1538     pending_high_res_tasks_--;
1539     DCHECK_GE(pending_high_res_tasks_, 0);
1540   }
1541 }
1542 
take_top()1543 Task TaskQueueImpl::DelayedIncomingQueue::take_top() {
1544   DCHECK(!empty());
1545   if (queue_.top().is_high_res) {
1546     pending_high_res_tasks_--;
1547     DCHECK_GE(pending_high_res_tasks_, 0);
1548   }
1549   return queue_.take_top();
1550 }
1551 
swap(DelayedIncomingQueue * rhs)1552 void TaskQueueImpl::DelayedIncomingQueue::swap(DelayedIncomingQueue* rhs) {
1553   std::swap(pending_high_res_tasks_, rhs->pending_high_res_tasks_);
1554   std::swap(queue_, rhs->queue_);
1555 }
1556 
SweepCancelledTasks(SequenceManagerImpl * sequence_manager)1557 void TaskQueueImpl::DelayedIncomingQueue::SweepCancelledTasks(
1558     SequenceManagerImpl* sequence_manager) {
1559   // Note: IntrusiveHeap::EraseIf() is safe against re-entrancy caused by
1560   // deleted tasks posting new tasks.
1561   queue_.EraseIf([this](const Task& task) {
1562     if (task.task.IsCancelled()) {
1563       if (task.is_high_res) {
1564         --pending_high_res_tasks_;
1565         DCHECK_GE(pending_high_res_tasks_, 0);
1566       }
1567       return true;
1568     }
1569     return false;
1570   });
1571 }
1572 
AsValue(TimeTicks now) const1573 Value::List TaskQueueImpl::DelayedIncomingQueue::AsValue(TimeTicks now) const {
1574   Value::List state;
1575   for (const Task& task : queue_)
1576     state.Append(TaskAsValue(task, now));
1577   return state;
1578 }
1579 
operator ()(const Task & lhs,const Task & rhs) const1580 bool TaskQueueImpl::DelayedIncomingQueue::Compare::operator()(
1581     const Task& lhs,
1582     const Task& rhs) const {
1583   // Delayed tasks are ordered by latest_delayed_run_time(). The top task may
1584   // not be the first task eligible to run, but tasks will always become ripe
1585   // before their latest_delayed_run_time().
1586   const TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time();
1587   const TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time();
1588   if (lhs_latest_delayed_run_time == rhs_latest_delayed_run_time)
1589     return lhs.sequence_num > rhs.sequence_num;
1590   return lhs_latest_delayed_run_time > rhs_latest_delayed_run_time;
1591 }
1592 
OnTaskPostedCallbackHandleImpl(TaskQueueImpl * task_queue_impl,scoped_refptr<const AssociatedThreadId> associated_thread)1593 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::OnTaskPostedCallbackHandleImpl(
1594     TaskQueueImpl* task_queue_impl,
1595     scoped_refptr<const AssociatedThreadId> associated_thread)
1596     : task_queue_impl_(task_queue_impl),
1597       associated_thread_(std::move(associated_thread)) {
1598   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1599 }
1600 
1601 TaskQueueImpl::OnTaskPostedCallbackHandleImpl::
~OnTaskPostedCallbackHandleImpl()1602     ~OnTaskPostedCallbackHandleImpl() {
1603   DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
1604   if (task_queue_impl_)
1605     task_queue_impl_->RemoveOnTaskPostedHandler(this);
1606 }
1607 
DefaultPriority() const1608 TaskQueue::QueuePriority TaskQueueImpl::DefaultPriority() const {
1609   return sequence_manager()->settings().priority_settings.default_priority();
1610 }
1611 
1612 }  // namespace internal
1613 }  // namespace sequence_manager
1614 }  // namespace base
1615