// Copyright 2015 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/task/sequence_manager/task_queue_impl.h" #include #include #include #include #include "base/check.h" #include "base/compiler_specific.h" #include "base/feature_list.h" #include "base/logging.h" #include "base/memory/scoped_refptr.h" #include "base/metrics/histogram_macros.h" #include "base/notreached.h" #include "base/observer_list.h" #include "base/ranges/algorithm.h" #include "base/sequence_token.h" #include "base/strings/stringprintf.h" #include "base/task/common/scoped_defer_task_posting.h" #include "base/task/default_delayed_task_handle_delegate.h" #include "base/task/sequence_manager/associated_thread_id.h" #include "base/task/sequence_manager/delayed_task_handle_delegate.h" #include "base/task/sequence_manager/fence.h" #include "base/task/sequence_manager/sequence_manager_impl.h" #include "base/task/sequence_manager/task_order.h" #include "base/task/sequence_manager/wake_up_queue.h" #include "base/task/sequence_manager/work_queue.h" #include "base/task/single_thread_task_runner.h" #include "base/task/task_features.h" #include "base/task/task_observer.h" #include "base/threading/thread_restrictions.h" #include "base/time/time.h" #include "base/trace_event/base_tracing.h" #include "build/build_config.h" #include "third_party/abseil-cpp/absl/container/inlined_vector.h" namespace base { namespace sequence_manager { namespace internal { // This class outside the anonymous namespace exists to allow being a friend of // `SingleThreadTaskRunner::CurrentDefaultHandle` in order to access // `SingleThreadTaskRunner::CurrentDefaultHandle::MayAlreadyExist`. class CurrentDefaultHandleOverrideForRunOrPostTask { public: explicit CurrentDefaultHandleOverrideForRunOrPostTask( scoped_refptr task_runner) : sttr_override_( nullptr, SingleThreadTaskRunner::CurrentDefaultHandle::MayAlreadyExist{}), str_override_(std::move(task_runner)) {} private: SingleThreadTaskRunner::CurrentDefaultHandle sttr_override_; SequencedTaskRunner::CurrentDefaultHandle str_override_; }; namespace { // An atomic is used here because the value is queried from other threads when // tasks are posted cross-thread, which can race with its initialization. std::atomic g_max_precise_delay{kDefaultMaxPreciseDelay}; #if BUILDFLAG(IS_WIN) // An atomic is used here because the flag is queried from other threads when // tasks are posted cross-thread, which can race with its initialization. std::atomic_bool g_explicit_high_resolution_timer_win{true}; #endif // BUILDFLAG(IS_WIN) void RunTaskSynchronously(AssociatedThreadId* associated_thread, scoped_refptr task_runner, OnceClosure closure) { base::internal::TaskScope sequence_scope( associated_thread->GetBoundSequenceToken(), /* is_thread_bound=*/false, /* is_running_synchronously=*/true); CurrentDefaultHandleOverrideForRunOrPostTask task_runner_override( std::move(task_runner)); associated_thread->StartInSequenceWithCurrentThread(); std::move(closure).Run(); associated_thread->StopInSequenceWithCurrentThread(); } } // namespace TaskQueueImpl::GuardedTaskPoster::GuardedTaskPoster(TaskQueueImpl* outer) : outer_(outer) {} TaskQueueImpl::GuardedTaskPoster::~GuardedTaskPoster() = default; bool TaskQueueImpl::GuardedTaskPoster::PostTask(PostedTask task) { // Do not process new PostTasks while we are handling a PostTask (tracing // has to do this) as it can lead to a deadlock and defer it instead. ScopedDeferTaskPosting disallow_task_posting; auto token = operations_controller_.TryBeginOperation(); if (!token) return false; outer_->PostTask(std::move(task)); return true; } DelayedTaskHandle TaskQueueImpl::GuardedTaskPoster::PostCancelableTask( PostedTask task) { // Do not process new PostTasks while we are handling a PostTask (tracing // has to do this) as it can lead to a deadlock and defer it instead. ScopedDeferTaskPosting disallow_task_posting; auto token = operations_controller_.TryBeginOperation(); if (!token) return DelayedTaskHandle(); auto delayed_task_handle_delegate = std::make_unique(outer_); task.delayed_task_handle_delegate = delayed_task_handle_delegate->AsWeakPtr(); outer_->PostTask(std::move(task)); DCHECK(delayed_task_handle_delegate->IsValid()); return DelayedTaskHandle(std::move(delayed_task_handle_delegate)); } bool TaskQueueImpl::GuardedTaskPoster::RunOrPostTask(PostedTask task) { auto token = operations_controller_.TryBeginOperation(); if (!token) { return false; } auto sync_work_auth = outer_->sequence_manager_->TryAcquireSyncWorkAuthorization(); // The queue may be disabled immediately after checking // `IsQueueEnabledFromAnyThread()`. That won't prevent the task from running. if (sync_work_auth.IsValid() && outer_->IsQueueEnabledFromAnyThread()) { RunTaskSynchronously(outer_->associated_thread_.get(), outer_->sequence_manager_->GetTaskRunner(), std::move(task.callback)); return true; } return PostTask(std::move(task)); } TaskQueueImpl::TaskRunner::TaskRunner( scoped_refptr task_poster, scoped_refptr associated_thread, TaskType task_type) : task_poster_(std::move(task_poster)), associated_thread_(std::move(associated_thread)), task_type_(task_type) {} TaskQueueImpl::TaskRunner::~TaskRunner() = default; bool TaskQueueImpl::TaskRunner::PostDelayedTask(const Location& location, OnceClosure callback, TimeDelta delay) { return task_poster_->PostTask(PostedTask(this, std::move(callback), location, delay, Nestable::kNestable, task_type_)); } bool TaskQueueImpl::TaskRunner::PostDelayedTaskAt( subtle::PostDelayedTaskPassKey, const Location& location, OnceClosure callback, TimeTicks delayed_run_time, base::subtle::DelayPolicy delay_policy) { return task_poster_->PostTask(PostedTask(this, std::move(callback), location, delayed_run_time, delay_policy, Nestable::kNestable, task_type_)); } DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTaskAt( subtle::PostDelayedTaskPassKey pass_key, const Location& location, OnceClosure callback, TimeTicks delayed_run_time, base::subtle::DelayPolicy delay_policy) { return task_poster_->PostCancelableTask( PostedTask(this, std::move(callback), location, delayed_run_time, delay_policy, Nestable::kNestable, task_type_)); } DelayedTaskHandle TaskQueueImpl::TaskRunner::PostCancelableDelayedTask( subtle::PostDelayedTaskPassKey pass_key, const Location& location, OnceClosure callback, TimeDelta delay) { return task_poster_->PostCancelableTask( PostedTask(this, std::move(callback), location, delay, Nestable::kNestable, task_type_)); } bool TaskQueueImpl::TaskRunner::PostNonNestableDelayedTask( const Location& location, OnceClosure callback, TimeDelta delay) { return task_poster_->PostTask(PostedTask(this, std::move(callback), location, delay, Nestable::kNonNestable, task_type_)); } bool TaskQueueImpl::TaskRunner::RunOrPostTask(subtle::RunOrPostTaskPassKey, const Location& location, OnceClosure callback) { return task_poster_->RunOrPostTask( PostedTask(this, std::move(callback), location, TimeDelta(), Nestable::kNestable, task_type_)); } bool TaskQueueImpl::TaskRunner::BelongsToCurrentThread() const { return associated_thread_->IsBoundToCurrentThread(); } bool TaskQueueImpl::TaskRunner::RunsTasksInCurrentSequence() const { // Return true on the bound thread. This works even after `thread_local` // destruction. if (BelongsToCurrentThread()) { return true; } // Return true in a `RunOrPostTask` callback running synchronously on a // different thread. if (associated_thread_->IsBound() && associated_thread_->GetBoundSequenceToken() == base::internal::SequenceToken::GetForCurrentThread()) { return true; } return false; } // static void TaskQueueImpl::InitializeFeatures() { g_max_precise_delay = kMaxPreciseDelay.Get(); #if BUILDFLAG(IS_WIN) g_explicit_high_resolution_timer_win.store( FeatureList::IsEnabled(kExplicitHighResolutionTimerWin), std::memory_order_relaxed); #endif // BUILDFLAG(IS_WIN) } TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager, WakeUpQueue* wake_up_queue, const TaskQueue::Spec& spec) : name_(spec.name), sequence_manager_(sequence_manager), associated_thread_(sequence_manager ? sequence_manager->associated_thread() : AssociatedThreadId::CreateBound()), task_poster_(MakeRefCounted(this)), main_thread_only_(this, wake_up_queue), empty_queues_to_reload_handle_( sequence_manager ? sequence_manager->GetFlagToRequestReloadForEmptyQueue(this) : AtomicFlagSet::AtomicFlag()), should_monitor_quiescence_(spec.should_monitor_quiescence), should_notify_observers_(spec.should_notify_observers), delayed_fence_allowed_(spec.delayed_fence_allowed), default_task_runner_(CreateTaskRunner(kTaskTypeNone)) { UpdateCrossThreadQueueStateLocked(); // SequenceManager can't be set later, so we need to prevent task runners // from posting any tasks. if (sequence_manager_) task_poster_->StartAcceptingOperations(); } TaskQueueImpl::~TaskQueueImpl() { #if DCHECK_IS_ON() base::internal::CheckedAutoLock lock(any_thread_lock_); // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_| // contains a strong reference to this TaskQueueImpl and the // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task // queues. DCHECK(any_thread_.unregistered) << "UnregisterTaskQueue must be called first!"; #endif } TaskQueueImpl::AnyThread::AnyThread() = default; TaskQueueImpl::AnyThread::~AnyThread() = default; TaskQueueImpl::AnyThread::TracingOnly::TracingOnly() = default; TaskQueueImpl::AnyThread::TracingOnly::~TracingOnly() = default; TaskQueueImpl::MainThreadOnly::MainThreadOnly(TaskQueueImpl* task_queue, WakeUpQueue* wake_up_queue) : wake_up_queue(wake_up_queue), delayed_work_queue( new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)), immediate_work_queue(new WorkQueue(task_queue, "immediate", WorkQueue::QueueType::kImmediate)) {} TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default; scoped_refptr TaskQueueImpl::CreateTaskRunner( TaskType task_type) const { DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); return MakeRefCounted(task_poster_, associated_thread_, task_type); } const scoped_refptr& TaskQueueImpl::task_runner() const { return default_task_runner_; } void TaskQueueImpl::UnregisterTaskQueue() { TRACE_EVENT0("base", "TaskQueueImpl::UnregisterTaskQueue"); // Invalidate weak pointers now so no voters reference this in a partially // torn down state. voter_weak_ptr_factory_.InvalidateWeakPtrs(); // Detach task runners. { ScopedAllowBaseSyncPrimitivesOutsideBlockingScope allow_wait; task_poster_->ShutdownAndWaitForZeroOperations(); } TaskDeque immediate_incoming_queue; base::flat_map, OnTaskPostedHandler> on_task_posted_handlers; { base::internal::CheckedAutoLock lock(any_thread_lock_); any_thread_.unregistered = true; immediate_incoming_queue.swap(any_thread_.immediate_incoming_queue); for (auto& handler : any_thread_.on_task_posted_handlers) handler.first->UnregisterTaskQueue(); any_thread_.on_task_posted_handlers.swap(on_task_posted_handlers); } if (main_thread_only().wake_up_queue) { main_thread_only().wake_up_queue->UnregisterQueue(this); } main_thread_only().on_task_started_handler = OnTaskStartedHandler(); main_thread_only().on_task_completed_handler = OnTaskCompletedHandler(); main_thread_only().wake_up_queue = nullptr; main_thread_only().throttler = nullptr; empty_queues_to_reload_handle_.ReleaseAtomicFlag(); // It is possible for a task to hold a scoped_refptr to this, which // will lead to TaskQueueImpl destructor being called when deleting a task. // To avoid use-after-free, we need to clear all fields of a task queue // before starting to delete the tasks. // All work queues and priority queues containing tasks should be moved to // local variables on stack (std::move for unique_ptrs and swap for queues) // before clearing them and deleting tasks. // Flush the queues outside of the lock because TSAN complains about a lock // order inversion for tasks that are posted from within a lock, with a // destructor that acquires the same lock. DelayedIncomingQueue delayed_incoming_queue; delayed_incoming_queue.swap(&main_thread_only().delayed_incoming_queue); std::unique_ptr immediate_work_queue = std::move(main_thread_only().immediate_work_queue); std::unique_ptr delayed_work_queue = std::move(main_thread_only().delayed_work_queue); } const char* TaskQueueImpl::GetName() const { return perfetto::protos::pbzero::SequenceManagerTask::QueueName_Name(name_); } QueueName TaskQueueImpl::GetProtoName() const { return name_; } void TaskQueueImpl::PostTask(PostedTask task) { CurrentThread current_thread = associated_thread_->IsBoundToCurrentThread() ? TaskQueueImpl::CurrentThread::kMainThread : TaskQueueImpl::CurrentThread::kNotMainThread; #if DCHECK_IS_ON() TimeDelta delay = GetTaskDelayAdjustment(current_thread); if (absl::holds_alternative( task.delay_or_delayed_run_time)) { absl::get(task.delay_or_delayed_run_time) += delay; } else { absl::get(task.delay_or_delayed_run_time) += delay; } #endif // DCHECK_IS_ON() if (!task.is_delayed()) { PostImmediateTaskImpl(std::move(task), current_thread); } else { PostDelayedTaskImpl(std::move(task), current_thread); } } void TaskQueueImpl::RemoveCancelableTask(HeapHandle heap_handle) { associated_thread_->AssertInSequenceWithCurrentThread(); DCHECK(heap_handle.IsValid()); main_thread_only().delayed_incoming_queue.remove(heap_handle); // Only update the delayed wake up if the top task is removed and we're // running on the main thread (a `RunOrPostTask` callback may run outside the // main thread, but in sequence with it -- it's not safe to invoke // `MessagePump::ScheduleDelayedWork()` in that context). if (heap_handle.index() == 0u && associated_thread_->IsBoundToCurrentThread()) { LazyNow lazy_now(sequence_manager_->main_thread_clock()); UpdateWakeUp(&lazy_now); } } TimeDelta TaskQueueImpl::GetTaskDelayAdjustment(CurrentThread current_thread) { #if DCHECK_IS_ON() if (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread) { base::internal::CheckedAutoLock lock(any_thread_lock_); // Add a per-priority delay to cross thread tasks. This can help diagnose // scheduler induced flakiness by making things flake most of the time. return sequence_manager_->settings() .priority_settings .per_priority_cross_thread_task_delay()[any_thread_.queue_set_index]; } else { return sequence_manager_->settings() .priority_settings.per_priority_same_thread_task_delay() [main_thread_only().immediate_work_queue->work_queue_set_index()]; } #else // No delay adjustment. return TimeDelta(); #endif // DCHECK_IS_ON() } void TaskQueueImpl::PostImmediateTaskImpl(PostedTask task, CurrentThread current_thread) { // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 // for details. CHECK(task.callback); bool should_schedule_work = false; { // TODO(alexclarke): Maybe add a main thread only immediate_incoming_queue // See https://crbug.com/901800 base::internal::CheckedAutoLock lock(any_thread_lock_); bool add_queue_time_to_tasks = sequence_manager_->GetAddQueueTimeToTasks(); TimeTicks queue_time; bool config_category_enabled = false; #if BUILDFLAG(ENABLE_BASE_TRACING) config_category_enabled = TRACE_EVENT_CATEGORY_ENABLED("config.scheduler.record_task_post_time"); #endif if (config_category_enabled || add_queue_time_to_tasks || delayed_fence_allowed_) { queue_time = sequence_manager_->any_thread_clock()->NowTicks(); } // The sequence number must be incremented atomically with pushing onto the // incoming queue. Otherwise if there are several threads posting task we // risk breaking the assumption that sequence numbers increase monotonically // within a queue. EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber(); bool was_immediate_incoming_queue_empty = any_thread_.immediate_incoming_queue.empty(); any_thread_.immediate_incoming_queue.push_back( Task(std::move(task), sequence_number, sequence_number, queue_time)); #if DCHECK_IS_ON() any_thread_.immediate_incoming_queue.back().cross_thread_ = (current_thread == TaskQueueImpl::CurrentThread::kNotMainThread); #endif sequence_manager_->WillQueueTask( &any_thread_.immediate_incoming_queue.back()); MaybeReportIpcTaskQueuedFromAnyThreadLocked( any_thread_.immediate_incoming_queue.back()); for (auto& handler : any_thread_.on_task_posted_handlers) { DCHECK(!handler.second.is_null()); handler.second.Run(any_thread_.immediate_incoming_queue.back()); } // If this queue was completely empty, then the SequenceManager needs to be // informed so it can reload the work queue and add us to the // TaskQueueSelector which can only be done from the main thread. In // addition it may need to schedule a DoWork if this queue isn't blocked. if (was_immediate_incoming_queue_empty && any_thread_.immediate_work_queue_empty) { sequence_manager_->WillRequestReloadImmediateWorkQueue(); empty_queues_to_reload_handle_.SetActive(true); should_schedule_work = any_thread_.post_immediate_task_should_schedule_work; } } // On windows it's important to call this outside of a lock because calling a // pump while holding a lock can result in priority inversions. See // http://shortn/_ntnKNqjDQT for a discussion. // // Calling ScheduleWork outside the lock should be safe, only the main thread // can mutate |any_thread_.post_immediate_task_should_schedule_work|. If it // transitions to false we call ScheduleWork redundantly that's harmless. If // it transitions to true, the side effect of // |empty_queues_to_reload_handle_SetActive(true)| is guaranteed to be picked // up by the ThreadController's call to SequenceManagerImpl::DelayTillNextTask // when it computes what continuation (if any) is needed. if (should_schedule_work) sequence_manager_->ScheduleWork(); TraceQueueSize(); } void TaskQueueImpl::PostDelayedTaskImpl(PostedTask posted_task, CurrentThread current_thread) { // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 // for details. CHECK(posted_task.callback); if (current_thread == CurrentThread::kMainThread) { LazyNow lazy_now(sequence_manager_->main_thread_clock()); Task pending_task = MakeDelayedTask(std::move(posted_task), &lazy_now); sequence_manager_->MaybeAddLeewayToTask(pending_task); PushOntoDelayedIncomingQueueFromMainThread( std::move(pending_task), &lazy_now, /* notify_task_annotator */ true); } else { LazyNow lazy_now(sequence_manager_->any_thread_clock()); PushOntoDelayedIncomingQueue( MakeDelayedTask(std::move(posted_task), &lazy_now)); } } void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread( Task pending_task, LazyNow* lazy_now, bool notify_task_annotator) { #if DCHECK_IS_ON() pending_task.cross_thread_ = false; #endif if (notify_task_annotator) { sequence_manager_->WillQueueTask(&pending_task); MaybeReportIpcTaskQueuedFromMainThread(pending_task); } main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); UpdateWakeUp(lazy_now); TraceQueueSize(); } void TaskQueueImpl::PushOntoDelayedIncomingQueue(Task pending_task) { sequence_manager_->WillQueueTask(&pending_task); MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(pending_task); #if DCHECK_IS_ON() pending_task.cross_thread_ = true; #endif // TODO(altimin): Add a copy method to Task to capture metadata here. auto task_runner = pending_task.task_runner; const auto task_type = pending_task.task_type; PostImmediateTaskImpl( PostedTask(std::move(task_runner), BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask, Unretained(this), std::move(pending_task)), FROM_HERE, TimeDelta(), Nestable::kNonNestable, task_type), CurrentThread::kNotMainThread); } void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) { DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); sequence_manager_->MaybeAddLeewayToTask(pending_task); TimeTicks now = sequence_manager_->main_thread_clock()->NowTicks(); LazyNow lazy_now(now); // A delayed task is ready to run as soon as earliest_delayed_run_time() is // reached. if (pending_task.earliest_delayed_run_time() <= now) { // If |delayed_run_time| is in the past then push it onto the work queue // immediately. To ensure the right task ordering we need to temporarily // push it onto the |delayed_incoming_queue|. pending_task.delayed_run_time = now; main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); MoveReadyDelayedTasksToWorkQueue( &lazy_now, sequence_manager_->GetNextSequenceNumber()); } else { // If |delayed_run_time| is in the future we can queue it as normal. PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task), &lazy_now, false); } TraceQueueSize(); } void TaskQueueImpl::ReloadEmptyImmediateWorkQueue() { DCHECK(main_thread_only().immediate_work_queue->Empty()); main_thread_only().immediate_work_queue->TakeImmediateIncomingQueueTasks(); if (main_thread_only().throttler && IsQueueEnabled()) { main_thread_only().throttler->OnHasImmediateTask(); } } void TaskQueueImpl::TakeImmediateIncomingQueueTasks(TaskDeque* queue) { DCHECK(queue->empty()); // Now is a good time to consider reducing the empty queue's capacity if we're // wasting memory, before we make it the `immediate_incoming_queue`. queue->MaybeShrinkQueue(); base::internal::CheckedAutoLock lock(any_thread_lock_); queue->swap(any_thread_.immediate_incoming_queue); // Activate delayed fence if necessary. This is ideologically similar to // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted // from any thread we can't generate an enqueue order for the fence there, // so we have to check all immediate tasks and use their enqueue order for // a fence. if (main_thread_only().delayed_fence) { for (const Task& task : *queue) { DCHECK(!task.queue_time.is_null()); DCHECK(task.delayed_run_time.is_null()); if (task.queue_time >= main_thread_only().delayed_fence.value()) { main_thread_only().delayed_fence = std::nullopt; DCHECK(!main_thread_only().current_fence); main_thread_only().current_fence = Fence(task.task_order()); // Do not trigger WorkQueueSets notification when taking incoming // immediate queue. main_thread_only().immediate_work_queue->InsertFenceSilently( *main_thread_only().current_fence); main_thread_only().delayed_work_queue->InsertFenceSilently( *main_thread_only().current_fence); break; } } } UpdateCrossThreadQueueStateLocked(); } bool TaskQueueImpl::IsEmpty() const { if (!main_thread_only().delayed_work_queue->Empty() || !main_thread_only().delayed_incoming_queue.empty() || !main_thread_only().immediate_work_queue->Empty()) { return false; } base::internal::CheckedAutoLock lock(any_thread_lock_); return any_thread_.immediate_incoming_queue.empty(); } size_t TaskQueueImpl::GetNumberOfPendingTasks() const { size_t task_count = 0; task_count += main_thread_only().delayed_work_queue->Size(); task_count += main_thread_only().delayed_incoming_queue.size(); task_count += main_thread_only().immediate_work_queue->Size(); base::internal::CheckedAutoLock lock(any_thread_lock_); task_count += any_thread_.immediate_incoming_queue.size(); return task_count; } bool TaskQueueImpl::HasTaskToRunImmediatelyOrReadyDelayedTask() const { // Any work queue tasks count as immediate work. if (!main_thread_only().delayed_work_queue->Empty() || !main_thread_only().immediate_work_queue->Empty()) { return true; } // Tasks on |delayed_incoming_queue| that could run now, count as // immediate work. if (!main_thread_only().delayed_incoming_queue.empty() && main_thread_only().delayed_incoming_queue.top().delayed_run_time <= sequence_manager_->main_thread_clock()->NowTicks()) { return true; } // Finally tasks on |immediate_incoming_queue| count as immediate work. base::internal::CheckedAutoLock lock(any_thread_lock_); return !any_thread_.immediate_incoming_queue.empty(); } std::optional TaskQueueImpl::GetNextDesiredWakeUp() { // Note we don't scheduled a wake-up for disabled queues. if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled()) return std::nullopt; const auto& top_task = main_thread_only().delayed_incoming_queue.top(); // High resolution is needed if the queue contains high resolution tasks and // has a priority index <= kNormalPriority (precise execution time is // unnecessary for a low priority queue). WakeUpResolution resolution = has_pending_high_resolution_tasks() && GetQueuePriority() <= DefaultPriority() ? WakeUpResolution::kHigh : WakeUpResolution::kLow; subtle::DelayPolicy delay_policy = top_task.delay_policy; if (GetQueuePriority() > DefaultPriority() && delay_policy == subtle::DelayPolicy::kPrecise) { delay_policy = subtle::DelayPolicy::kFlexibleNoSooner; } return WakeUp{top_task.delayed_run_time, top_task.leeway, resolution, delay_policy}; } void TaskQueueImpl::OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order) { MoveReadyDelayedTasksToWorkQueue(lazy_now, enqueue_order); if (main_thread_only().throttler) { main_thread_only().throttler->OnWakeUp(lazy_now); } } bool TaskQueueImpl::RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now) { // Because task destructors could have a side-effect of posting new tasks, we // move all the cancelled tasks into a temporary container before deleting // them. This is to avoid the queue from changing while iterating over it. absl::InlinedVector tasks_to_delete; while (!main_thread_only().delayed_incoming_queue.empty()) { const Task& task = main_thread_only().delayed_incoming_queue.top(); CHECK(task.task); if (!task.task.IsCancelled()) break; tasks_to_delete.push_back( main_thread_only().delayed_incoming_queue.take_top()); } if (!tasks_to_delete.empty()) { UpdateWakeUp(lazy_now); return true; } return false; } void TaskQueueImpl::MoveReadyDelayedTasksToWorkQueue( LazyNow* lazy_now, EnqueueOrder enqueue_order) { // Enqueue all delayed tasks that should be running now, skipping any that // have been canceled. WorkQueue::TaskPusher delayed_work_queue_task_pusher( main_thread_only().delayed_work_queue->CreateTaskPusher()); // Because task destructors could have a side-effect of posting new tasks, we // move all the cancelled tasks into a temporary container before deleting // them. This is to avoid the queue from changing while iterating over it. absl::InlinedVector tasks_to_delete; while (!main_thread_only().delayed_incoming_queue.empty()) { const Task& task = main_thread_only().delayed_incoming_queue.top(); CHECK(task.task); // Leave the top task alone if it hasn't been canceled and it is not ready. const bool is_cancelled = task.task.IsCancelled(); if (!is_cancelled && task.earliest_delayed_run_time() > lazy_now->Now()) break; Task ready_task = main_thread_only().delayed_incoming_queue.take_top(); if (is_cancelled) { tasks_to_delete.push_back(std::move(ready_task)); continue; } // The top task is ready to run. Move it to the delayed work queue. #if DCHECK_IS_ON() if (sequence_manager_->settings().log_task_delay_expiry) VLOG(0) << GetName() << " Delay expired for " << ready_task.posted_from.ToString(); #endif // DCHECK_IS_ON() DCHECK(!ready_task.delayed_run_time.is_null()); DCHECK(!ready_task.enqueue_order_set()); ready_task.set_enqueue_order(enqueue_order); ActivateDelayedFenceIfNeeded(ready_task); delayed_work_queue_task_pusher.Push(std::move(ready_task)); } // Explicitly delete tasks last. tasks_to_delete.clear(); UpdateWakeUp(lazy_now); } void TaskQueueImpl::TraceQueueSize() const { bool is_tracing; TRACE_EVENT_CATEGORY_GROUP_ENABLED( TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing); if (!is_tracing) return; // It's only safe to access the work queues from the main thread. // TODO(alexclarke): We should find another way of tracing this if (!associated_thread_->IsBoundToCurrentThread()) return; size_t total_task_count; { base::internal::CheckedAutoLock lock(any_thread_lock_); total_task_count = any_thread_.immediate_incoming_queue.size() + main_thread_only().immediate_work_queue->Size() + main_thread_only().delayed_work_queue->Size() + main_thread_only().delayed_incoming_queue.size(); } TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(), total_task_count); } void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) { const TaskQueue::QueuePriority previous_priority = GetQueuePriority(); if (priority == previous_priority) return; sequence_manager_->main_thread_only().selector.SetQueuePriority(this, priority); #if BUILDFLAG(IS_WIN) // Updating queue priority can change whether high resolution timer is needed. LazyNow lazy_now(sequence_manager_->main_thread_clock()); UpdateWakeUp(&lazy_now); #endif if (priority > DefaultPriority()) { // |priority| is now lower than the default, so update accordingly. main_thread_only() .enqueue_order_at_which_we_became_unblocked_with_normal_priority = EnqueueOrder::max(); } else if (previous_priority > DefaultPriority()) { // |priority| is no longer lower than the default, so record current // sequence number. DCHECK_EQ( main_thread_only() .enqueue_order_at_which_we_became_unblocked_with_normal_priority, EnqueueOrder::max()); main_thread_only() .enqueue_order_at_which_we_became_unblocked_with_normal_priority = sequence_manager_->GetNextSequenceNumber(); } } TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const { size_t set_index = immediate_work_queue()->work_queue_set_index(); DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index()); return static_cast(set_index); } Value::Dict TaskQueueImpl::AsValue(TimeTicks now, bool force_verbose) const { base::internal::CheckedAutoLock lock(any_thread_lock_); Value::Dict state; state.Set("name", GetName()); if (any_thread_.unregistered) { state.Set("unregistered", true); return state; } DCHECK(main_thread_only().delayed_work_queue); DCHECK(main_thread_only().immediate_work_queue); state.Set("task_queue_id", StringPrintf("0x%" PRIx64, static_cast( reinterpret_cast(this)))); state.Set("enabled", IsQueueEnabled()); // TODO(crbug.com/40228085): Make base::Value able to store an int64_t and // remove the various static_casts below. state.Set("any_thread_.immediate_incoming_queuesize", static_cast(any_thread_.immediate_incoming_queue.size())); state.Set("delayed_incoming_queue_size", static_cast(main_thread_only().delayed_incoming_queue.size())); state.Set("immediate_work_queue_size", static_cast(main_thread_only().immediate_work_queue->Size())); state.Set("delayed_work_queue_size", static_cast(main_thread_only().delayed_work_queue->Size())); state.Set("any_thread_.immediate_incoming_queuecapacity", static_cast(any_thread_.immediate_incoming_queue.capacity())); state.Set("immediate_work_queue_capacity", static_cast(immediate_work_queue()->Capacity())); state.Set("delayed_work_queue_capacity", static_cast(delayed_work_queue()->Capacity())); if (!main_thread_only().delayed_incoming_queue.empty()) { TimeDelta delay_to_next_task = (main_thread_only().delayed_incoming_queue.top().delayed_run_time - sequence_manager_->main_thread_clock()->NowTicks()); state.Set("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF()); } if (main_thread_only().current_fence) { Value::Dict fence_state; fence_state.Set( "enqueue_order", static_cast( main_thread_only().current_fence->task_order().enqueue_order())); fence_state.Set("activated_in_wake_up", !main_thread_only() .current_fence->task_order() .delayed_run_time() .is_null()); state.Set("current_fence", std::move(fence_state)); } if (main_thread_only().delayed_fence) { state.Set("delayed_fence_seconds_from_now", (main_thread_only().delayed_fence.value() - now).InSecondsF()); } bool verbose = false; TRACE_EVENT_CATEGORY_GROUP_ENABLED( TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"), &verbose); if (verbose || force_verbose) { state.Set("immediate_incoming_queue", QueueAsValue(any_thread_.immediate_incoming_queue, now)); state.Set("delayed_work_queue", main_thread_only().delayed_work_queue->AsValue(now)); state.Set("immediate_work_queue", main_thread_only().immediate_work_queue->AsValue(now)); state.Set("delayed_incoming_queue", main_thread_only().delayed_incoming_queue.AsValue(now)); } state.Set("priority", GetQueuePriority()); return state; } void TaskQueueImpl::AddTaskObserver(TaskObserver* task_observer) { main_thread_only().task_observers.AddObserver(task_observer); } void TaskQueueImpl::RemoveTaskObserver(TaskObserver* task_observer) { main_thread_only().task_observers.RemoveObserver(task_observer); } void TaskQueueImpl::NotifyWillProcessTask(const Task& task, bool was_blocked_or_low_priority) { DCHECK(should_notify_observers_); for (auto& observer : main_thread_only().task_observers) observer.WillProcessTask(task, was_blocked_or_low_priority); } void TaskQueueImpl::NotifyDidProcessTask(const Task& task) { DCHECK(should_notify_observers_); for (auto& observer : main_thread_only().task_observers) observer.DidProcessTask(task); } void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) { Fence new_fence = position == TaskQueue::InsertFencePosition::kNow ? Fence::CreateWithEnqueueOrder( sequence_manager_->GetNextSequenceNumber()) : Fence::BlockingFence(); InsertFence(new_fence); } void TaskQueueImpl::InsertFence(Fence current_fence) { // Only one fence may be present at a time. main_thread_only().delayed_fence = std::nullopt; std::optional previous_fence = main_thread_only().current_fence; // Tasks posted after this point will have a strictly higher enqueue order // and will be blocked from running. main_thread_only().current_fence = current_fence; bool front_task_unblocked = main_thread_only().immediate_work_queue->InsertFence(current_fence); front_task_unblocked |= main_thread_only().delayed_work_queue->InsertFence(current_fence); { base::internal::CheckedAutoLock lock(any_thread_lock_); if (!front_task_unblocked && previous_fence && previous_fence->task_order() < current_fence.task_order()) { if (!any_thread_.immediate_incoming_queue.empty() && any_thread_.immediate_incoming_queue.front().task_order() > previous_fence->task_order() && any_thread_.immediate_incoming_queue.front().task_order() < current_fence.task_order()) { front_task_unblocked = true; } } UpdateCrossThreadQueueStateLocked(); } if (IsQueueEnabled() && front_task_unblocked) { OnQueueUnblocked(); sequence_manager_->ScheduleWork(); } } void TaskQueueImpl::InsertFenceAt(TimeTicks time) { DCHECK(delayed_fence_allowed_) << "Delayed fences are not supported for this queue. Enable them " "explicitly in TaskQueue::Spec when creating the queue"; // Task queue can have only one fence, delayed or not. RemoveFence(); main_thread_only().delayed_fence = time; } void TaskQueueImpl::RemoveFence() { std::optional previous_fence = main_thread_only().current_fence; main_thread_only().current_fence = std::nullopt; main_thread_only().delayed_fence = std::nullopt; bool front_task_unblocked = main_thread_only().immediate_work_queue->RemoveFence(); front_task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence(); { base::internal::CheckedAutoLock lock(any_thread_lock_); if (!front_task_unblocked && previous_fence) { if (!any_thread_.immediate_incoming_queue.empty() && any_thread_.immediate_incoming_queue.front().task_order() > previous_fence->task_order()) { front_task_unblocked = true; } } UpdateCrossThreadQueueStateLocked(); } if (IsQueueEnabled() && front_task_unblocked) { OnQueueUnblocked(); sequence_manager_->ScheduleWork(); } } bool TaskQueueImpl::BlockedByFence() const { if (!main_thread_only().current_fence) return false; if (!main_thread_only().immediate_work_queue->BlockedByFence() || !main_thread_only().delayed_work_queue->BlockedByFence()) { return false; } base::internal::CheckedAutoLock lock(any_thread_lock_); if (any_thread_.immediate_incoming_queue.empty()) return true; return any_thread_.immediate_incoming_queue.front().task_order() > main_thread_only().current_fence->task_order(); } bool TaskQueueImpl::HasActiveFence() { if (main_thread_only().delayed_fence && sequence_manager_->main_thread_clock()->NowTicks() > main_thread_only().delayed_fence.value()) { return true; } return !!main_thread_only().current_fence; } bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const { if (!IsQueueEnabled()) return false; if (!main_thread_only().current_fence) return true; // TODO(crbug.com/40791504): This should use TaskOrder. This is currently only // used for tests and is fine as-is, but we should be using `TaskOrder` for // task comparisons. Also this test should be renamed with a testing suffix as // it is not used in production. return enqueue_order < main_thread_only().current_fence->task_order().enqueue_order(); } bool TaskQueueImpl::WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const { return enqueue_order < main_thread_only() .enqueue_order_at_which_we_became_unblocked_with_normal_priority; } // static Value::List TaskQueueImpl::QueueAsValue(const TaskDeque& queue, TimeTicks now) { Value::List state; for (const Task& task : queue) state.Append(TaskAsValue(task, now)); return state; } // static Value::Dict TaskQueueImpl::TaskAsValue(const Task& task, TimeTicks now) { Value::Dict state; state.Set("posted_from", task.posted_from.ToString()); if (task.enqueue_order_set()) state.Set("enqueue_order", static_cast(task.enqueue_order())); state.Set("sequence_num", task.sequence_num); state.Set("nestable", task.nestable == Nestable::kNestable); state.Set("is_high_res", task.is_high_res); state.Set("is_cancelled", task.task.IsCancelled()); state.Set("delayed_run_time", (task.delayed_run_time - TimeTicks()).InMillisecondsF()); const TimeDelta delayed_run_time_milliseconds_from_now = task.delayed_run_time.is_null() ? TimeDelta() : (task.delayed_run_time - now); state.Set("delayed_run_time_milliseconds_from_now", delayed_run_time_milliseconds_from_now.InMillisecondsF()); return state; } Task TaskQueueImpl::MakeDelayedTask(PostedTask delayed_task, LazyNow* lazy_now) const { EnqueueOrder sequence_number = sequence_manager_->GetNextSequenceNumber(); base::TimeDelta delay; WakeUpResolution resolution = WakeUpResolution::kLow; #if BUILDFLAG(IS_WIN) const bool explicit_high_resolution_timer_win = g_explicit_high_resolution_timer_win.load(std::memory_order_relaxed); #endif // BUILDFLAG(IS_WIN) if (absl::holds_alternative( delayed_task.delay_or_delayed_run_time)) { delay = absl::get(delayed_task.delay_or_delayed_run_time); delayed_task.delay_or_delayed_run_time = lazy_now->Now() + delay; } else { delay = absl::get(delayed_task.delay_or_delayed_run_time) - lazy_now->Now(); } #if BUILDFLAG(IS_WIN) if (!explicit_high_resolution_timer_win && delay < (2 * base::Milliseconds(Time::kMinLowResolutionThresholdMs))) { // Outside the kExplicitHighResolutionTimerWin experiment, We consider the // task needs a high resolution timer if the delay is more than 0 and less // than 32ms. This caps the relative error to less than 50% : a 33ms wait // can wake at 48ms since the default resolution on Windows is between 10 // and 15ms. resolution = WakeUpResolution::kHigh; } #endif // BUILDFLAG(IS_WIN) delayed_task.delay_policy = subtle::MaybeOverrideDelayPolicy( delayed_task.delay_policy, delay, g_max_precise_delay.load(std::memory_order_relaxed)); // leeway isn't specified yet since this may be called from any thread. return Task(std::move(delayed_task), sequence_number, EnqueueOrder(), lazy_now->Now(), resolution); } bool TaskQueueImpl::IsQueueEnabled() const { return main_thread_only().is_enabled; } void TaskQueueImpl::SetQueueEnabled(bool enabled) { if (main_thread_only().is_enabled == enabled) return; // Update the |main_thread_only_| struct. main_thread_only().is_enabled = enabled; main_thread_only().disabled_time = std::nullopt; // |sequence_manager_| can be null in tests. if (!sequence_manager_) return; LazyNow lazy_now(sequence_manager_->main_thread_clock()); if (!enabled) { bool tracing_enabled = false; TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"), &tracing_enabled); main_thread_only().disabled_time = lazy_now.Now(); } else { // Override reporting if the queue is becoming enabled again. main_thread_only().should_report_posted_tasks_when_disabled = false; } // If there is a throttler, it will be notified of pending delayed and // immediate tasks inside UpdateWakeUp(). UpdateWakeUp(&lazy_now); { base::internal::CheckedAutoLock lock(any_thread_lock_); UpdateCrossThreadQueueStateLocked(); // Copy over the task-reporting related state. any_thread_.is_enabled = enabled; any_thread_.tracing_only.disabled_time = main_thread_only().disabled_time; any_thread_.tracing_only.should_report_posted_tasks_when_disabled = main_thread_only().should_report_posted_tasks_when_disabled; } // Finally, enable or disable the queue with the selector. if (enabled) { // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts // a DoWork if needed. sequence_manager_->main_thread_only().selector.EnableQueue(this); if (!BlockedByFence()) OnQueueUnblocked(); } else { sequence_manager_->main_thread_only().selector.DisableQueue(this); } } void TaskQueueImpl::SetShouldReportPostedTasksWhenDisabled(bool should_report) { if (main_thread_only().should_report_posted_tasks_when_disabled == should_report) return; // Only observe transitions turning the reporting on if tracing is enabled. if (should_report) { bool tracing_enabled = false; TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"), &tracing_enabled); if (!tracing_enabled) return; } main_thread_only().should_report_posted_tasks_when_disabled = should_report; // Mirror the state to the AnyThread struct as well. { base::internal::CheckedAutoLock lock(any_thread_lock_); any_thread_.tracing_only.should_report_posted_tasks_when_disabled = should_report; } } void TaskQueueImpl::UpdateCrossThreadQueueStateLocked() { any_thread_.immediate_work_queue_empty = main_thread_only().immediate_work_queue->Empty(); any_thread_.is_enabled = main_thread_only().is_enabled; if (main_thread_only().throttler) { // If there's a Throttler, always ScheduleWork() when immediate work is // posted and the queue is enabled, to ensure that // Throttler::OnHasImmediateTask() is invoked. any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled(); } else { // Otherwise, ScheduleWork() only if the queue is enabled and there isn't a // fence to prevent the task from being executed. any_thread_.post_immediate_task_should_schedule_work = IsQueueEnabled() && !main_thread_only().current_fence; } #if DCHECK_IS_ON() any_thread_.queue_set_index = main_thread_only().immediate_work_queue->work_queue_set_index(); #endif } void TaskQueueImpl::ReclaimMemory(TimeTicks now) { if (main_thread_only().delayed_incoming_queue.empty()) return; main_thread_only().delayed_incoming_queue.SweepCancelledTasks( sequence_manager_); // If deleting one of the cancelled tasks shut down this queue, bail out. // Note that in this scenario |this| is still valid, but some fields of the // queue have been cleared out by |UnregisterTaskQueue|. if (!main_thread_only().delayed_work_queue) { return; } LazyNow lazy_now(now); UpdateWakeUp(&lazy_now); // Also consider shrinking the work queue if it's wasting memory. main_thread_only().delayed_work_queue->MaybeShrinkQueue(); main_thread_only().immediate_work_queue->MaybeShrinkQueue(); { base::internal::CheckedAutoLock lock(any_thread_lock_); any_thread_.immediate_incoming_queue.MaybeShrinkQueue(); } } void TaskQueueImpl::PushImmediateIncomingTaskForTest(Task task) { base::internal::CheckedAutoLock lock(any_thread_lock_); any_thread_.immediate_incoming_queue.push_back(std::move(task)); } void TaskQueueImpl::RequeueDeferredNonNestableTask( DeferredNonNestableTask task) { DCHECK(task.task.nestable == Nestable::kNonNestable); // It's possible that the queue was unregistered since the task was posted. // Skip the task in that case. if (!main_thread_only().delayed_work_queue) return; // The re-queued tasks have to be pushed onto the front because we'd otherwise // violate the strict monotonically increasing enqueue order within the // WorkQueue. We can't assign them a new enqueue order here because that will // not behave correctly with fences and things will break (e.g Idle TQ). if (task.work_queue_type == WorkQueueType::kDelayed) { main_thread_only().delayed_work_queue->PushNonNestableTaskToFront( std::move(task.task)); } else { // We're about to push |task| onto an empty |immediate_work_queue| // (bypassing |immediate_incoming_queue_|). As such, we no longer need to // reload if we were planning to. The flag must be cleared while holding // the lock to avoid a cross-thread post task setting it again before // we actually make |immediate_work_queue| non-empty. if (main_thread_only().immediate_work_queue->Empty()) { base::internal::CheckedAutoLock lock(any_thread_lock_); empty_queues_to_reload_handle_.SetActive(false); any_thread_.immediate_work_queue_empty = false; main_thread_only().immediate_work_queue->PushNonNestableTaskToFront( std::move(task.task)); } else { main_thread_only().immediate_work_queue->PushNonNestableTaskToFront( std::move(task.task)); } } } void TaskQueueImpl::SetThrottler(TaskQueue::Throttler* throttler) { DCHECK(throttler); DCHECK(!main_thread_only().throttler) << "Can't assign two different throttlers to " "base::sequence_manager:TaskQueue"; // `throttler` is guaranteed to outlive this object. main_thread_only().throttler = throttler; } void TaskQueueImpl::ResetThrottler() { main_thread_only().throttler = nullptr; LazyNow lazy_now(sequence_manager_->main_thread_clock()); // The current delayed wake up may have been determined by the Throttler. // Update it now that there is no Throttler. UpdateWakeUp(&lazy_now); } void TaskQueueImpl::UpdateWakeUp(LazyNow* lazy_now) { std::optional wake_up = GetNextDesiredWakeUp(); if (main_thread_only().throttler && IsQueueEnabled()) { // GetNextAllowedWakeUp() may return a non-null wake_up even if |wake_up| is // nullopt, e.g. to throttle immediate tasks. wake_up = main_thread_only().throttler->GetNextAllowedWakeUp( lazy_now, wake_up, HasTaskToRunImmediatelyOrReadyDelayedTask()); } SetNextWakeUp(lazy_now, wake_up); } void TaskQueueImpl::SetNextWakeUp(LazyNow* lazy_now, std::optional wake_up) { if (main_thread_only().scheduled_wake_up == wake_up) return; main_thread_only().scheduled_wake_up = wake_up; main_thread_only().wake_up_queue->SetNextWakeUpForQueue(this, lazy_now, wake_up); } bool TaskQueueImpl::HasTaskToRunImmediately() const { // Any work queue tasks count as immediate work. if (!main_thread_only().delayed_work_queue->Empty() || !main_thread_only().immediate_work_queue->Empty()) { return true; } // Finally tasks on |immediate_incoming_queue| count as immediate work. base::internal::CheckedAutoLock lock(any_thread_lock_); return !any_thread_.immediate_incoming_queue.empty(); } bool TaskQueueImpl::HasTaskToRunImmediatelyLocked() const { return !main_thread_only().delayed_work_queue->Empty() || !main_thread_only().immediate_work_queue->Empty() || !any_thread_.immediate_incoming_queue.empty(); } void TaskQueueImpl::SetOnTaskStartedHandler( TaskQueueImpl::OnTaskStartedHandler handler) { DCHECK(should_notify_observers_ || handler.is_null()); main_thread_only().on_task_started_handler = std::move(handler); } void TaskQueueImpl::OnTaskStarted(const Task& task, const TaskQueue::TaskTiming& task_timing) { if (!main_thread_only().on_task_started_handler.is_null()) main_thread_only().on_task_started_handler.Run(task, task_timing); } void TaskQueueImpl::SetOnTaskCompletedHandler( TaskQueueImpl::OnTaskCompletedHandler handler) { DCHECK(should_notify_observers_ || handler.is_null()); main_thread_only().on_task_completed_handler = std::move(handler); } void TaskQueueImpl::OnTaskCompleted(const Task& task, TaskQueue::TaskTiming* task_timing, LazyNow* lazy_now) { if (!main_thread_only().on_task_completed_handler.is_null()) { main_thread_only().on_task_completed_handler.Run(task, task_timing, lazy_now); } } bool TaskQueueImpl::RequiresTaskTiming() const { return !main_thread_only().on_task_started_handler.is_null() || !main_thread_only().on_task_completed_handler.is_null(); } std::unique_ptr TaskQueueImpl::AddOnTaskPostedHandler(OnTaskPostedHandler handler) { DCHECK(should_notify_observers_ && !handler.is_null()); std::unique_ptr handle = std::make_unique(this, associated_thread_); base::internal::CheckedAutoLock lock(any_thread_lock_); any_thread_.on_task_posted_handlers.insert( {handle.get(), std::move(handler)}); return handle; } void TaskQueueImpl::RemoveOnTaskPostedHandler( TaskQueueImpl::OnTaskPostedCallbackHandleImpl* on_task_posted_callback_handle) { base::internal::CheckedAutoLock lock(any_thread_lock_); any_thread_.on_task_posted_handlers.erase(on_task_posted_callback_handle); } void TaskQueueImpl::SetTaskExecutionTraceLogger( TaskExecutionTraceLogger logger) { DCHECK(should_notify_observers_ || logger.is_null()); main_thread_only().task_execution_trace_logger = std::move(logger); } bool TaskQueueImpl::IsUnregistered() const { base::internal::CheckedAutoLock lock(any_thread_lock_); return any_thread_.unregistered; } WeakPtr TaskQueueImpl::GetSequenceManagerWeakPtr() { return sequence_manager_->GetWeakPtr(); } void TaskQueueImpl::ActivateDelayedFenceIfNeeded(const Task& task) { if (!main_thread_only().delayed_fence) return; if (main_thread_only().delayed_fence.value() > task.delayed_run_time) return; InsertFence(Fence(task.task_order())); main_thread_only().delayed_fence = std::nullopt; } void TaskQueueImpl::MaybeReportIpcTaskQueuedFromMainThread( const Task& pending_task) { if (!pending_task.ipc_hash) return; // It's possible that tracing was just enabled and no disabled time has been // stored. In that case, skip emitting the event. if (!main_thread_only().disabled_time) return; bool tracing_enabled = false; TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"), &tracing_enabled); if (!tracing_enabled) return; if (main_thread_only().is_enabled || !main_thread_only().should_report_posted_tasks_when_disabled) { return; } base::TimeDelta time_since_disabled = sequence_manager_->main_thread_clock()->NowTicks() - main_thread_only().disabled_time.value(); ReportIpcTaskQueued(pending_task, time_since_disabled); } bool TaskQueueImpl::ShouldReportIpcTaskQueuedFromAnyThreadLocked( base::TimeDelta* time_since_disabled) { // It's possible that tracing was just enabled and no disabled time has been // stored. In that case, skip emitting the event. if (!any_thread_.tracing_only.disabled_time) return false; if (any_thread_.is_enabled || any_thread_.tracing_only.should_report_posted_tasks_when_disabled) { return false; } *time_since_disabled = sequence_manager_->any_thread_clock()->NowTicks() - any_thread_.tracing_only.disabled_time.value(); return true; } void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadLocked( const Task& pending_task) { if (!pending_task.ipc_hash) return; bool tracing_enabled = false; TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"), &tracing_enabled); if (!tracing_enabled) return; base::TimeDelta time_since_disabled; if (ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled)) ReportIpcTaskQueued(pending_task, time_since_disabled); } void TaskQueueImpl::MaybeReportIpcTaskQueuedFromAnyThreadUnlocked( const Task& pending_task) { if (!pending_task.ipc_hash) return; bool tracing_enabled = false; TRACE_EVENT_CATEGORY_GROUP_ENABLED(TRACE_DISABLED_BY_DEFAULT("lifecycles"), &tracing_enabled); if (!tracing_enabled) return; base::TimeDelta time_since_disabled; bool should_report = false; { base::internal::CheckedAutoLock lock(any_thread_lock_); should_report = ShouldReportIpcTaskQueuedFromAnyThreadLocked(&time_since_disabled); } if (should_report) ReportIpcTaskQueued(pending_task, time_since_disabled); } void TaskQueueImpl::ReportIpcTaskQueued( const Task& pending_task, const base::TimeDelta& time_since_disabled) { TRACE_EVENT_INSTANT( TRACE_DISABLED_BY_DEFAULT("lifecycles"), "task_posted_to_disabled_queue", [&](perfetto::EventContext ctx) { auto* proto = ctx.event() ->set_chrome_task_posted_to_disabled_queue(); proto->set_time_since_disabled_ms( checked_cast(time_since_disabled.InMilliseconds())); proto->set_ipc_hash(pending_task.ipc_hash); proto->set_source_location_iid( base::trace_event::InternedSourceLocation::Get( &ctx, pending_task.posted_from)); }); } void TaskQueueImpl::OnQueueUnblocked() { DCHECK(IsQueueEnabled()); DCHECK(!BlockedByFence()); main_thread_only().enqueue_order_at_which_we_became_unblocked = sequence_manager_->GetNextSequenceNumber(); if (GetQueuePriority() <= DefaultPriority()) { // We are default priority or more important so update // |enqueue_order_at_which_we_became_unblocked_with_normal_priority|. main_thread_only() .enqueue_order_at_which_we_became_unblocked_with_normal_priority = main_thread_only().enqueue_order_at_which_we_became_unblocked; } } std::unique_ptr TaskQueueImpl::CreateQueueEnabledVoter() { DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); return WrapUnique( new TaskQueue::QueueEnabledVoter(voter_weak_ptr_factory_.GetWeakPtr())); } void TaskQueueImpl::AddQueueEnabledVoter(bool voter_is_enabled, TaskQueue::QueueEnabledVoter& voter) { ++main_thread_only().voter_count; if (voter_is_enabled) { ++main_thread_only().enabled_voter_count; } } void TaskQueueImpl::RemoveQueueEnabledVoter( bool voter_is_enabled, TaskQueue::QueueEnabledVoter& voter) { bool was_enabled = AreAllQueueEnabledVotersEnabled(); if (voter_is_enabled) { --main_thread_only().enabled_voter_count; DCHECK_GE(main_thread_only().enabled_voter_count, 0); } --main_thread_only().voter_count; DCHECK_GE(main_thread_only().voter_count, 0); bool is_enabled = AreAllQueueEnabledVotersEnabled(); if (was_enabled != is_enabled) { SetQueueEnabled(is_enabled); } } void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) { bool was_enabled = AreAllQueueEnabledVotersEnabled(); if (enabled) { ++main_thread_only().enabled_voter_count; DCHECK_LE(main_thread_only().enabled_voter_count, main_thread_only().voter_count); } else { --main_thread_only().enabled_voter_count; DCHECK_GE(main_thread_only().enabled_voter_count, 0); } bool is_enabled = AreAllQueueEnabledVotersEnabled(); if (was_enabled != is_enabled) { SetQueueEnabled(is_enabled); } } void TaskQueueImpl::CompleteInitializationOnBoundThread() { voter_weak_ptr_factory_.BindToCurrentSequence( subtle::BindWeakPtrFactoryPassKey()); } TaskQueue::QueuePriority TaskQueueImpl::DefaultPriority() const { return sequence_manager()->settings().priority_settings.default_priority(); } bool TaskQueueImpl::IsQueueEnabledFromAnyThread() const { base::internal::CheckedAutoLock lock(any_thread_lock_); return any_thread_.is_enabled; } TaskQueueImpl::DelayedIncomingQueue::DelayedIncomingQueue() = default; TaskQueueImpl::DelayedIncomingQueue::~DelayedIncomingQueue() = default; void TaskQueueImpl::DelayedIncomingQueue::push(Task task) { // TODO(crbug.com/40789839): Remove this once the cause of corrupted tasks in // the queue is understood. CHECK(task.task); if (task.is_high_res) pending_high_res_tasks_++; queue_.insert(std::move(task)); } void TaskQueueImpl::DelayedIncomingQueue::remove(HeapHandle heap_handle) { DCHECK(!empty()); DCHECK_LT(heap_handle.index(), queue_.size()); Task task = queue_.take(heap_handle); if (task.is_high_res) { pending_high_res_tasks_--; DCHECK_GE(pending_high_res_tasks_, 0); } } Task TaskQueueImpl::DelayedIncomingQueue::take_top() { DCHECK(!empty()); if (queue_.top().is_high_res) { pending_high_res_tasks_--; DCHECK_GE(pending_high_res_tasks_, 0); } return queue_.take_top(); } void TaskQueueImpl::DelayedIncomingQueue::swap(DelayedIncomingQueue* rhs) { std::swap(pending_high_res_tasks_, rhs->pending_high_res_tasks_); std::swap(queue_, rhs->queue_); } void TaskQueueImpl::DelayedIncomingQueue::SweepCancelledTasks( SequenceManagerImpl* sequence_manager) { // Note: IntrusiveHeap::EraseIf() is safe against re-entrancy caused by // deleted tasks posting new tasks. queue_.EraseIf([this](const Task& task) { if (task.task.IsCancelled()) { if (task.is_high_res) { --pending_high_res_tasks_; DCHECK_GE(pending_high_res_tasks_, 0); } return true; } return false; }); } Value::List TaskQueueImpl::DelayedIncomingQueue::AsValue(TimeTicks now) const { Value::List state; for (const Task& task : queue_) state.Append(TaskAsValue(task, now)); return state; } bool TaskQueueImpl::DelayedIncomingQueue::Compare::operator()( const Task& lhs, const Task& rhs) const { // Delayed tasks are ordered by latest_delayed_run_time(). The top task may // not be the first task eligible to run, but tasks will always become ripe // before their latest_delayed_run_time(). const TimeTicks lhs_latest_delayed_run_time = lhs.latest_delayed_run_time(); const TimeTicks rhs_latest_delayed_run_time = rhs.latest_delayed_run_time(); if (lhs_latest_delayed_run_time == rhs_latest_delayed_run_time) return lhs.sequence_num > rhs.sequence_num; return lhs_latest_delayed_run_time > rhs_latest_delayed_run_time; } TaskQueueImpl::OnTaskPostedCallbackHandleImpl::OnTaskPostedCallbackHandleImpl( TaskQueueImpl* task_queue_impl, scoped_refptr associated_thread) : task_queue_impl_(task_queue_impl), associated_thread_(std::move(associated_thread)) { DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); } TaskQueueImpl::OnTaskPostedCallbackHandleImpl:: ~OnTaskPostedCallbackHandleImpl() { DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker); if (task_queue_impl_) task_queue_impl_->RemoveOnTaskPostedHandler(this); } } // namespace internal } // namespace sequence_manager } // namespace base