// Copyright 2015 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/task/sequence_manager/task_queue_impl.h" #include #include #include "base/strings/stringprintf.h" #include "base/task/sequence_manager/sequence_manager_impl.h" #include "base/task/sequence_manager/time_domain.h" #include "base/task/sequence_manager/work_queue.h" #include "base/time/time.h" #include "base/trace_event/blame_context.h" namespace base { namespace sequence_manager { // static const char* TaskQueue::PriorityToString(TaskQueue::QueuePriority priority) { switch (priority) { case kControlPriority: return "control"; case kHighestPriority: return "highest"; case kHighPriority: return "high"; case kNormalPriority: return "normal"; case kLowPriority: return "low"; case kBestEffortPriority: return "best_effort"; default: NOTREACHED(); return nullptr; } } namespace internal { TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain, const TaskQueue::Spec& spec) : name_(spec.name), thread_id_(PlatformThread::CurrentId()), any_thread_(sequence_manager, time_domain), main_thread_only_(sequence_manager, this, time_domain), should_monitor_quiescence_(spec.should_monitor_quiescence), should_notify_observers_(spec.should_notify_observers) { DCHECK(time_domain); } TaskQueueImpl::~TaskQueueImpl() { #if DCHECK_IS_ON() AutoLock lock(any_thread_lock_); // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_| // contains a strong reference to this TaskQueueImpl and the // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task // queues. DCHECK(!any_thread().sequence_manager) << "UnregisterTaskQueue must be called first!"; #endif } TaskQueueImpl::PostTaskResult::PostTaskResult() : success(false), task(OnceClosure(), Location()) {} TaskQueueImpl::PostTaskResult::PostTaskResult(bool success, TaskQueue::PostedTask task) : success(success), task(std::move(task)) {} TaskQueueImpl::PostTaskResult::PostTaskResult(PostTaskResult&& move_from) : success(move_from.success), task(std::move(move_from.task)) {} TaskQueueImpl::PostTaskResult::~PostTaskResult() = default; TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Success() { return PostTaskResult(true, TaskQueue::PostedTask(OnceClosure(), Location())); } TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Fail( TaskQueue::PostedTask task) { return PostTaskResult(false, std::move(task)); } TaskQueueImpl::Task::Task(TaskQueue::PostedTask task, TimeTicks desired_run_time, EnqueueOrder sequence_number) : TaskQueue::Task(std::move(task), desired_run_time) { // It might wrap around to a negative number but it's handled properly. sequence_num = static_cast(sequence_number); } TaskQueueImpl::Task::Task(TaskQueue::PostedTask task, TimeTicks desired_run_time, EnqueueOrder sequence_number, EnqueueOrder enqueue_order) : TaskQueue::Task(std::move(task), desired_run_time), enqueue_order_(enqueue_order) { // It might wrap around to a negative number but it's handled properly. sequence_num = static_cast(sequence_number); } TaskQueueImpl::AnyThread::AnyThread(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain) : sequence_manager(sequence_manager), time_domain(time_domain) {} TaskQueueImpl::AnyThread::~AnyThread() = default; TaskQueueImpl::MainThreadOnly::MainThreadOnly( SequenceManagerImpl* sequence_manager, TaskQueueImpl* task_queue, TimeDomain* time_domain) : sequence_manager(sequence_manager), time_domain(time_domain), delayed_work_queue( new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)), immediate_work_queue(new WorkQueue(task_queue, "immediate", WorkQueue::QueueType::kImmediate)), set_index(0), is_enabled_refcount(0), voter_refcount(0), blame_context(nullptr), is_enabled_for_test(true) {} TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default; void TaskQueueImpl::UnregisterTaskQueue() { TaskDeque immediate_incoming_queue; { AutoLock lock(any_thread_lock_); AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_); if (main_thread_only().time_domain) main_thread_only().time_domain->UnregisterQueue(this); if (!any_thread().sequence_manager) return; main_thread_only().on_task_completed_handler = OnTaskCompletedHandler(); any_thread().time_domain = nullptr; main_thread_only().time_domain = nullptr; any_thread().sequence_manager = nullptr; main_thread_only().sequence_manager = nullptr; any_thread().on_next_wake_up_changed_callback = OnNextWakeUpChangedCallback(); main_thread_only().on_next_wake_up_changed_callback = OnNextWakeUpChangedCallback(); immediate_incoming_queue.swap(immediate_incoming_queue_); } // It is possible for a task to hold a scoped_refptr to this, which // will lead to TaskQueueImpl destructor being called when deleting a task. // To avoid use-after-free, we need to clear all fields of a task queue // before starting to delete the tasks. // All work queues and priority queues containing tasks should be moved to // local variables on stack (std::move for unique_ptrs and swap for queues) // before clearing them and deleting tasks. // Flush the queues outside of the lock because TSAN complains about a lock // order inversion for tasks that are posted from within a lock, with a // destructor that acquires the same lock. std::priority_queue delayed_incoming_queue; delayed_incoming_queue.swap(main_thread_only().delayed_incoming_queue); std::unique_ptr immediate_work_queue = std::move(main_thread_only().immediate_work_queue); std::unique_ptr delayed_work_queue = std::move(main_thread_only().delayed_work_queue); } const char* TaskQueueImpl::GetName() const { return name_; } bool TaskQueueImpl::RunsTasksInCurrentSequence() const { return PlatformThread::CurrentId() == thread_id_; } TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTask( TaskQueue::PostedTask task) { if (task.delay.is_zero()) return PostImmediateTaskImpl(std::move(task)); return PostDelayedTaskImpl(std::move(task)); } TaskQueueImpl::PostTaskResult TaskQueueImpl::PostImmediateTaskImpl( TaskQueue::PostedTask task) { // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 // for details. CHECK(task.callback); AutoLock lock(any_thread_lock_); if (!any_thread().sequence_manager) return PostTaskResult::Fail(std::move(task)); EnqueueOrder sequence_number = any_thread().sequence_manager->GetNextSequenceNumber(); PushOntoImmediateIncomingQueueLocked(Task(std::move(task), any_thread().time_domain->Now(), sequence_number, sequence_number)); return PostTaskResult::Success(); } TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTaskImpl( TaskQueue::PostedTask task) { // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 // for details. CHECK(task.callback); DCHECK_GT(task.delay, TimeDelta()); if (PlatformThread::CurrentId() == thread_id_) { // Lock-free fast path for delayed tasks posted from the main thread. if (!main_thread_only().sequence_manager) return PostTaskResult::Fail(std::move(task)); EnqueueOrder sequence_number = main_thread_only().sequence_manager->GetNextSequenceNumber(); TimeTicks time_domain_now = main_thread_only().time_domain->Now(); TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay; PushOntoDelayedIncomingQueueFromMainThread( Task(std::move(task), time_domain_delayed_run_time, sequence_number), time_domain_now); } else { // NOTE posting a delayed task from a different thread is not expected to // be common. This pathway is less optimal than perhaps it could be // because it causes two main thread tasks to be run. Should this // assumption prove to be false in future, we may need to revisit this. AutoLock lock(any_thread_lock_); if (!any_thread().sequence_manager) return PostTaskResult::Fail(std::move(task)); EnqueueOrder sequence_number = any_thread().sequence_manager->GetNextSequenceNumber(); TimeTicks time_domain_now = any_thread().time_domain->Now(); TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay; PushOntoDelayedIncomingQueueLocked( Task(std::move(task), time_domain_delayed_run_time, sequence_number)); } return PostTaskResult::Success(); } void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread( Task pending_task, TimeTicks now) { main_thread_only().sequence_manager->WillQueueTask(&pending_task); main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); LazyNow lazy_now(now); UpdateDelayedWakeUp(&lazy_now); TraceQueueSize(); } void TaskQueueImpl::PushOntoDelayedIncomingQueueLocked(Task pending_task) { any_thread().sequence_manager->WillQueueTask(&pending_task); EnqueueOrder thread_hop_task_sequence_number = any_thread().sequence_manager->GetNextSequenceNumber(); // TODO(altimin): Add a copy method to Task to capture metadata here. PushOntoImmediateIncomingQueueLocked(Task( TaskQueue::PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask, Unretained(this), std::move(pending_task)), FROM_HERE, TimeDelta(), Nestable::kNonNestable, pending_task.task_type()), TimeTicks(), thread_hop_task_sequence_number, thread_hop_task_sequence_number)); } void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) { DCHECK(main_thread_checker_.CalledOnValidThread()); TimeTicks delayed_run_time = pending_task.delayed_run_time; TimeTicks time_domain_now = main_thread_only().time_domain->Now(); if (delayed_run_time <= time_domain_now) { // If |delayed_run_time| is in the past then push it onto the work queue // immediately. To ensure the right task ordering we need to temporarily // push it onto the |delayed_incoming_queue|. delayed_run_time = time_domain_now; pending_task.delayed_run_time = time_domain_now; main_thread_only().delayed_incoming_queue.push(std::move(pending_task)); LazyNow lazy_now(time_domain_now); WakeUpForDelayedWork(&lazy_now); } else { // If |delayed_run_time| is in the future we can queue it as normal. PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task), time_domain_now); } TraceQueueSize(); } void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task task) { // If the |immediate_incoming_queue| is empty we need a DoWork posted to make // it run. bool was_immediate_incoming_queue_empty; EnqueueOrder sequence_number = task.enqueue_order(); TimeTicks desired_run_time = task.delayed_run_time; { AutoLock lock(immediate_incoming_queue_lock_); was_immediate_incoming_queue_empty = immediate_incoming_queue().empty(); any_thread().sequence_manager->WillQueueTask(&task); immediate_incoming_queue().push_back(std::move(task)); } if (was_immediate_incoming_queue_empty) { // However there's no point posting a DoWork for a blocked queue. NB we can // only tell if it's disabled from the main thread. bool queue_is_blocked = RunsTasksInCurrentSequence() && (!IsQueueEnabled() || main_thread_only().current_fence); any_thread().sequence_manager->OnQueueHasIncomingImmediateWork( this, sequence_number, queue_is_blocked); if (!any_thread().on_next_wake_up_changed_callback.is_null()) any_thread().on_next_wake_up_changed_callback.Run(desired_run_time); } TraceQueueSize(); } void TaskQueueImpl::ReloadImmediateWorkQueueIfEmpty() { if (!main_thread_only().immediate_work_queue->Empty()) return; main_thread_only().immediate_work_queue->ReloadEmptyImmediateQueue(); } void TaskQueueImpl::ReloadEmptyImmediateQueue(TaskDeque* queue) { DCHECK(queue->empty()); AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_); queue->swap(immediate_incoming_queue()); // Activate delayed fence if necessary. This is ideologically similar to // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted // from any thread we can't generate an enqueue order for the fence there, // so we have to check all immediate tasks and use their enqueue order for // a fence. if (main_thread_only().delayed_fence) { for (const Task& task : *queue) { if (task.delayed_run_time >= main_thread_only().delayed_fence.value()) { main_thread_only().delayed_fence = nullopt; DCHECK(!main_thread_only().current_fence); main_thread_only().current_fence = task.enqueue_order(); // Do not trigger WorkQueueSets notification when taking incoming // immediate queue. main_thread_only().immediate_work_queue->InsertFenceSilently( main_thread_only().current_fence); main_thread_only().delayed_work_queue->InsertFenceSilently( main_thread_only().current_fence); break; } } } } bool TaskQueueImpl::IsEmpty() const { if (!main_thread_only().delayed_work_queue->Empty() || !main_thread_only().delayed_incoming_queue.empty() || !main_thread_only().immediate_work_queue->Empty()) { return false; } AutoLock lock(immediate_incoming_queue_lock_); return immediate_incoming_queue().empty(); } size_t TaskQueueImpl::GetNumberOfPendingTasks() const { size_t task_count = 0; task_count += main_thread_only().delayed_work_queue->Size(); task_count += main_thread_only().delayed_incoming_queue.size(); task_count += main_thread_only().immediate_work_queue->Size(); AutoLock lock(immediate_incoming_queue_lock_); task_count += immediate_incoming_queue().size(); return task_count; } bool TaskQueueImpl::HasTaskToRunImmediately() const { // Any work queue tasks count as immediate work. if (!main_thread_only().delayed_work_queue->Empty() || !main_thread_only().immediate_work_queue->Empty()) { return true; } // Tasks on |delayed_incoming_queue| that could run now, count as // immediate work. if (!main_thread_only().delayed_incoming_queue.empty() && main_thread_only().delayed_incoming_queue.top().delayed_run_time <= main_thread_only().time_domain->CreateLazyNow().Now()) { return true; } // Finally tasks on |immediate_incoming_queue| count as immediate work. AutoLock lock(immediate_incoming_queue_lock_); return !immediate_incoming_queue().empty(); } Optional TaskQueueImpl::GetNextScheduledWakeUpImpl() { // Note we don't scheduled a wake-up for disabled queues. if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled()) return nullopt; return main_thread_only().delayed_incoming_queue.top().delayed_wake_up(); } Optional TaskQueueImpl::GetNextScheduledWakeUp() { Optional wake_up = GetNextScheduledWakeUpImpl(); if (!wake_up) return nullopt; return wake_up->time; } void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) { // Enqueue all delayed tasks that should be running now, skipping any that // have been canceled. while (!main_thread_only().delayed_incoming_queue.empty()) { Task& task = const_cast(main_thread_only().delayed_incoming_queue.top()); if (!task.task || task.task.IsCancelled()) { main_thread_only().delayed_incoming_queue.pop(); continue; } if (task.delayed_run_time > lazy_now->Now()) break; ActivateDelayedFenceIfNeeded(task.delayed_run_time); task.set_enqueue_order( main_thread_only().sequence_manager->GetNextSequenceNumber()); main_thread_only().delayed_work_queue->Push(std::move(task)); main_thread_only().delayed_incoming_queue.pop(); // Normally WakeUpForDelayedWork is called inside DoWork, but it also // can be called elsewhere (e.g. tests and fast-path for posting // delayed tasks). Ensure that there is a DoWork posting. No-op inside // existing DoWork due to DoWork deduplication. if (IsQueueEnabled() || !main_thread_only().current_fence) { main_thread_only().sequence_manager->MaybeScheduleImmediateWork( FROM_HERE); } } UpdateDelayedWakeUp(lazy_now); } void TaskQueueImpl::TraceQueueSize() const { bool is_tracing; TRACE_EVENT_CATEGORY_GROUP_ENABLED( TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing); if (!is_tracing) return; // It's only safe to access the work queues from the main thread. // TODO(alexclarke): We should find another way of tracing this if (PlatformThread::CurrentId() != thread_id_) return; AutoLock lock(immediate_incoming_queue_lock_); TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(), immediate_incoming_queue().size() + main_thread_only().immediate_work_queue->Size() + main_thread_only().delayed_work_queue->Size() + main_thread_only().delayed_incoming_queue.size()); } void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) { if (!main_thread_only().sequence_manager || priority == GetQueuePriority()) return; main_thread_only() .sequence_manager->main_thread_only() .selector.SetQueuePriority(this, priority); } TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const { size_t set_index = immediate_work_queue()->work_queue_set_index(); DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index()); return static_cast(set_index); } void TaskQueueImpl::AsValueInto(TimeTicks now, trace_event::TracedValue* state) const { AutoLock lock(any_thread_lock_); AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_); state->BeginDictionary(); state->SetString("name", GetName()); if (!main_thread_only().sequence_manager) { state->SetBoolean("unregistered", true); state->EndDictionary(); return; } DCHECK(main_thread_only().time_domain); DCHECK(main_thread_only().delayed_work_queue); DCHECK(main_thread_only().immediate_work_queue); state->SetString( "task_queue_id", StringPrintf("0x%" PRIx64, static_cast(reinterpret_cast(this)))); state->SetBoolean("enabled", IsQueueEnabled()); state->SetString("time_domain_name", main_thread_only().time_domain->GetName()); state->SetInteger("immediate_incoming_queue_size", immediate_incoming_queue().size()); state->SetInteger("delayed_incoming_queue_size", main_thread_only().delayed_incoming_queue.size()); state->SetInteger("immediate_work_queue_size", main_thread_only().immediate_work_queue->Size()); state->SetInteger("delayed_work_queue_size", main_thread_only().delayed_work_queue->Size()); if (!main_thread_only().delayed_incoming_queue.empty()) { TimeDelta delay_to_next_task = (main_thread_only().delayed_incoming_queue.top().delayed_run_time - main_thread_only().time_domain->CreateLazyNow().Now()); state->SetDouble("delay_to_next_task_ms", delay_to_next_task.InMillisecondsF()); } if (main_thread_only().current_fence) state->SetInteger("current_fence", main_thread_only().current_fence); if (main_thread_only().delayed_fence) { state->SetDouble( "delayed_fence_seconds_from_now", (main_thread_only().delayed_fence.value() - now).InSecondsF()); } bool verbose = false; TRACE_EVENT_CATEGORY_GROUP_ENABLED( TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"), &verbose); if (verbose) { state->BeginArray("immediate_incoming_queue"); QueueAsValueInto(immediate_incoming_queue(), now, state); state->EndArray(); state->BeginArray("delayed_work_queue"); main_thread_only().delayed_work_queue->AsValueInto(now, state); state->EndArray(); state->BeginArray("immediate_work_queue"); main_thread_only().immediate_work_queue->AsValueInto(now, state); state->EndArray(); state->BeginArray("delayed_incoming_queue"); QueueAsValueInto(main_thread_only().delayed_incoming_queue, now, state); state->EndArray(); } state->SetString("priority", TaskQueue::PriorityToString(GetQueuePriority())); state->EndDictionary(); } void TaskQueueImpl::AddTaskObserver(MessageLoop::TaskObserver* task_observer) { main_thread_only().task_observers.AddObserver(task_observer); } void TaskQueueImpl::RemoveTaskObserver( MessageLoop::TaskObserver* task_observer) { main_thread_only().task_observers.RemoveObserver(task_observer); } void TaskQueueImpl::NotifyWillProcessTask(const PendingTask& pending_task) { DCHECK(should_notify_observers_); if (main_thread_only().blame_context) main_thread_only().blame_context->Enter(); for (auto& observer : main_thread_only().task_observers) observer.WillProcessTask(pending_task); } void TaskQueueImpl::NotifyDidProcessTask(const PendingTask& pending_task) { DCHECK(should_notify_observers_); for (auto& observer : main_thread_only().task_observers) observer.DidProcessTask(pending_task); if (main_thread_only().blame_context) main_thread_only().blame_context->Leave(); } void TaskQueueImpl::SetTimeDomain(TimeDomain* time_domain) { { AutoLock lock(any_thread_lock_); DCHECK(time_domain); // NOTE this is similar to checking |any_thread().sequence_manager| but // the TaskQueueSelectorTests constructs TaskQueueImpl directly with a null // sequence_manager. Instead we check |any_thread().time_domain| which is // another way of asserting that UnregisterTaskQueue has not been called. DCHECK(any_thread().time_domain); if (!any_thread().time_domain) return; DCHECK(main_thread_checker_.CalledOnValidThread()); if (time_domain == main_thread_only().time_domain) return; any_thread().time_domain = time_domain; } main_thread_only().time_domain->UnregisterQueue(this); main_thread_only().time_domain = time_domain; LazyNow lazy_now = time_domain->CreateLazyNow(); // Clear scheduled wake up to ensure that new notifications are issued // correctly. // TODO(altimin): Remove this when we won't have to support changing time // domains. main_thread_only().scheduled_wake_up = nullopt; UpdateDelayedWakeUp(&lazy_now); } TimeDomain* TaskQueueImpl::GetTimeDomain() const { if (PlatformThread::CurrentId() == thread_id_) return main_thread_only().time_domain; AutoLock lock(any_thread_lock_); return any_thread().time_domain; } void TaskQueueImpl::SetBlameContext(trace_event::BlameContext* blame_context) { main_thread_only().blame_context = blame_context; } void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) { if (!main_thread_only().sequence_manager) return; // Only one fence may be present at a time. main_thread_only().delayed_fence = nullopt; EnqueueOrder previous_fence = main_thread_only().current_fence; EnqueueOrder current_fence = position == TaskQueue::InsertFencePosition::kNow ? main_thread_only().sequence_manager->GetNextSequenceNumber() : EnqueueOrder::blocking_fence(); // Tasks posted after this point will have a strictly higher enqueue order // and will be blocked from running. main_thread_only().current_fence = current_fence; bool task_unblocked = main_thread_only().immediate_work_queue->InsertFence(current_fence); task_unblocked |= main_thread_only().delayed_work_queue->InsertFence(current_fence); if (!task_unblocked && previous_fence && previous_fence < current_fence) { AutoLock lock(immediate_incoming_queue_lock_); if (!immediate_incoming_queue().empty() && immediate_incoming_queue().front().enqueue_order() > previous_fence && immediate_incoming_queue().front().enqueue_order() < current_fence) { task_unblocked = true; } } if (IsQueueEnabled() && task_unblocked) { main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE); } } void TaskQueueImpl::InsertFenceAt(TimeTicks time) { // Task queue can have only one fence, delayed or not. RemoveFence(); main_thread_only().delayed_fence = time; } void TaskQueueImpl::RemoveFence() { if (!main_thread_only().sequence_manager) return; EnqueueOrder previous_fence = main_thread_only().current_fence; main_thread_only().current_fence = EnqueueOrder::none(); main_thread_only().delayed_fence = nullopt; bool task_unblocked = main_thread_only().immediate_work_queue->RemoveFence(); task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence(); if (!task_unblocked && previous_fence) { AutoLock lock(immediate_incoming_queue_lock_); if (!immediate_incoming_queue().empty() && immediate_incoming_queue().front().enqueue_order() > previous_fence) { task_unblocked = true; } } if (IsQueueEnabled() && task_unblocked) { main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE); } } bool TaskQueueImpl::BlockedByFence() const { if (!main_thread_only().current_fence) return false; if (!main_thread_only().immediate_work_queue->BlockedByFence() || !main_thread_only().delayed_work_queue->BlockedByFence()) { return false; } AutoLock lock(immediate_incoming_queue_lock_); if (immediate_incoming_queue().empty()) return true; return immediate_incoming_queue().front().enqueue_order() > main_thread_only().current_fence; } bool TaskQueueImpl::HasActiveFence() { if (main_thread_only().delayed_fence && main_thread_only().time_domain->Now() > main_thread_only().delayed_fence.value()) { return true; } return !!main_thread_only().current_fence; } bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const { if (!IsQueueEnabled()) return false; if (!main_thread_only().current_fence) return true; return enqueue_order < main_thread_only().current_fence; } // static void TaskQueueImpl::QueueAsValueInto(const TaskDeque& queue, TimeTicks now, trace_event::TracedValue* state) { for (const Task& task : queue) { TaskAsValueInto(task, now, state); } } // static void TaskQueueImpl::QueueAsValueInto(const std::priority_queue& queue, TimeTicks now, trace_event::TracedValue* state) { // Remove const to search |queue| in the destructive manner. Restore the // content from |visited| later. std::priority_queue* mutable_queue = const_cast*>(&queue); std::priority_queue visited; while (!mutable_queue->empty()) { TaskAsValueInto(mutable_queue->top(), now, state); visited.push(std::move(const_cast(mutable_queue->top()))); mutable_queue->pop(); } *mutable_queue = std::move(visited); } // static void TaskQueueImpl::TaskAsValueInto(const Task& task, TimeTicks now, trace_event::TracedValue* state) { state->BeginDictionary(); state->SetString("posted_from", task.posted_from.ToString()); if (task.enqueue_order_set()) state->SetInteger("enqueue_order", task.enqueue_order()); state->SetInteger("sequence_num", task.sequence_num); state->SetBoolean("nestable", task.nestable == Nestable::kNestable); state->SetBoolean("is_high_res", task.is_high_res); state->SetBoolean("is_cancelled", task.task.IsCancelled()); state->SetDouble("delayed_run_time", (task.delayed_run_time - TimeTicks()).InMillisecondsF()); state->SetDouble("delayed_run_time_milliseconds_from_now", (task.delayed_run_time - now).InMillisecondsF()); state->EndDictionary(); } TaskQueueImpl::QueueEnabledVoterImpl::QueueEnabledVoterImpl( scoped_refptr task_queue) : task_queue_(task_queue), enabled_(true) {} TaskQueueImpl::QueueEnabledVoterImpl::~QueueEnabledVoterImpl() { if (task_queue_->GetTaskQueueImpl()) task_queue_->GetTaskQueueImpl()->RemoveQueueEnabledVoter(this); } void TaskQueueImpl::QueueEnabledVoterImpl::SetQueueEnabled(bool enabled) { if (enabled_ == enabled) return; task_queue_->GetTaskQueueImpl()->OnQueueEnabledVoteChanged(enabled); enabled_ = enabled; } void TaskQueueImpl::RemoveQueueEnabledVoter( const QueueEnabledVoterImpl* voter) { // Bail out if we're being called from TaskQueueImpl::UnregisterTaskQueue. if (!main_thread_only().time_domain) return; bool was_enabled = IsQueueEnabled(); if (voter->enabled_) { main_thread_only().is_enabled_refcount--; DCHECK_GE(main_thread_only().is_enabled_refcount, 0); } main_thread_only().voter_refcount--; DCHECK_GE(main_thread_only().voter_refcount, 0); bool is_enabled = IsQueueEnabled(); if (was_enabled != is_enabled) EnableOrDisableWithSelector(is_enabled); } bool TaskQueueImpl::IsQueueEnabled() const { // By default is_enabled_refcount and voter_refcount both equal zero. return (main_thread_only().is_enabled_refcount == main_thread_only().voter_refcount) && main_thread_only().is_enabled_for_test; } void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) { bool was_enabled = IsQueueEnabled(); if (enabled) { main_thread_only().is_enabled_refcount++; DCHECK_LE(main_thread_only().is_enabled_refcount, main_thread_only().voter_refcount); } else { main_thread_only().is_enabled_refcount--; DCHECK_GE(main_thread_only().is_enabled_refcount, 0); } bool is_enabled = IsQueueEnabled(); if (was_enabled != is_enabled) EnableOrDisableWithSelector(is_enabled); } void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) { if (!main_thread_only().sequence_manager) return; LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow(); UpdateDelayedWakeUp(&lazy_now); if (enable) { if (HasPendingImmediateWork() && !main_thread_only().on_next_wake_up_changed_callback.is_null()) { // Delayed work notification will be issued via time domain. main_thread_only().on_next_wake_up_changed_callback.Run(TimeTicks()); } // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts // a DoWork if needed. main_thread_only() .sequence_manager->main_thread_only() .selector.EnableQueue(this); } else { main_thread_only() .sequence_manager->main_thread_only() .selector.DisableQueue(this); } } std::unique_ptr TaskQueueImpl::CreateQueueEnabledVoter(scoped_refptr task_queue) { DCHECK_EQ(task_queue->GetTaskQueueImpl(), this); main_thread_only().voter_refcount++; main_thread_only().is_enabled_refcount++; return std::make_unique(task_queue); } void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) { if (main_thread_only().delayed_incoming_queue.empty()) return; // Remove canceled tasks. std::priority_queue remaining_tasks; while (!main_thread_only().delayed_incoming_queue.empty()) { if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled()) { remaining_tasks.push(std::move( const_cast(main_thread_only().delayed_incoming_queue.top()))); } main_thread_only().delayed_incoming_queue.pop(); } main_thread_only().delayed_incoming_queue = std::move(remaining_tasks); LazyNow lazy_now(now); UpdateDelayedWakeUp(&lazy_now); } void TaskQueueImpl::PushImmediateIncomingTaskForTest( TaskQueueImpl::Task&& task) { AutoLock lock(immediate_incoming_queue_lock_); immediate_incoming_queue().push_back(std::move(task)); } void TaskQueueImpl::RequeueDeferredNonNestableTask( DeferredNonNestableTask task) { DCHECK(task.task.nestable == Nestable::kNonNestable); // The re-queued tasks have to be pushed onto the front because we'd otherwise // violate the strict monotonically increasing enqueue order within the // WorkQueue. We can't assign them a new enqueue order here because that will // not behave correctly with fences and things will break (e.g Idle TQ). if (task.work_queue_type == WorkQueueType::kDelayed) { main_thread_only().delayed_work_queue->PushNonNestableTaskToFront( std::move(task.task)); } else { main_thread_only().immediate_work_queue->PushNonNestableTaskToFront( std::move(task.task)); } } void TaskQueueImpl::SetOnNextWakeUpChangedCallback( TaskQueueImpl::OnNextWakeUpChangedCallback callback) { #if DCHECK_IS_ON() if (callback) { DCHECK(main_thread_only().on_next_wake_up_changed_callback.is_null()) << "Can't assign two different observers to " "blink::scheduler::TaskQueue"; } #endif AutoLock lock(any_thread_lock_); any_thread().on_next_wake_up_changed_callback = callback; main_thread_only().on_next_wake_up_changed_callback = callback; } void TaskQueueImpl::UpdateDelayedWakeUp(LazyNow* lazy_now) { return UpdateDelayedWakeUpImpl(lazy_now, GetNextScheduledWakeUpImpl()); } void TaskQueueImpl::UpdateDelayedWakeUpImpl( LazyNow* lazy_now, Optional wake_up) { if (main_thread_only().scheduled_wake_up == wake_up) return; main_thread_only().scheduled_wake_up = wake_up; if (wake_up && !main_thread_only().on_next_wake_up_changed_callback.is_null() && !HasPendingImmediateWork()) { main_thread_only().on_next_wake_up_changed_callback.Run(wake_up->time); } main_thread_only().time_domain->SetNextWakeUpForQueue(this, wake_up, lazy_now); } void TaskQueueImpl::SetDelayedWakeUpForTesting( Optional wake_up) { LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow(); UpdateDelayedWakeUpImpl(&lazy_now, wake_up); } bool TaskQueueImpl::HasPendingImmediateWork() { // Any work queue tasks count as immediate work. if (!main_thread_only().delayed_work_queue->Empty() || !main_thread_only().immediate_work_queue->Empty()) { return true; } // Finally tasks on |immediate_incoming_queue| count as immediate work. AutoLock lock(immediate_incoming_queue_lock_); return !immediate_incoming_queue().empty(); } void TaskQueueImpl::SetOnTaskStartedHandler( TaskQueueImpl::OnTaskStartedHandler handler) { main_thread_only().on_task_started_handler = std::move(handler); } void TaskQueueImpl::OnTaskStarted(const TaskQueue::Task& task, const TaskQueue::TaskTiming& task_timing) { if (!main_thread_only().on_task_started_handler.is_null()) main_thread_only().on_task_started_handler.Run(task, task_timing); } void TaskQueueImpl::SetOnTaskCompletedHandler( TaskQueueImpl::OnTaskCompletedHandler handler) { main_thread_only().on_task_completed_handler = std::move(handler); } void TaskQueueImpl::OnTaskCompleted(const TaskQueue::Task& task, const TaskQueue::TaskTiming& task_timing) { if (!main_thread_only().on_task_completed_handler.is_null()) main_thread_only().on_task_completed_handler.Run(task, task_timing); } bool TaskQueueImpl::RequiresTaskTiming() const { return !main_thread_only().on_task_started_handler.is_null() || !main_thread_only().on_task_completed_handler.is_null(); } bool TaskQueueImpl::IsUnregistered() const { AutoLock lock(any_thread_lock_); return !any_thread().sequence_manager; } WeakPtr TaskQueueImpl::GetSequenceManagerWeakPtr() { return main_thread_only().sequence_manager->GetWeakPtr(); } scoped_refptr TaskQueueImpl::GetGracefulQueueShutdownHelper() { return main_thread_only().sequence_manager->GetGracefulQueueShutdownHelper(); } void TaskQueueImpl::SetQueueEnabledForTest(bool enabled) { main_thread_only().is_enabled_for_test = enabled; EnableOrDisableWithSelector(IsQueueEnabled()); } void TaskQueueImpl::ActivateDelayedFenceIfNeeded(TimeTicks now) { if (!main_thread_only().delayed_fence) return; if (main_thread_only().delayed_fence.value() > now) return; InsertFence(TaskQueue::InsertFencePosition::kNow); main_thread_only().delayed_fence = nullopt; } } // namespace internal } // namespace sequence_manager } // namespace base