// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "base/task_scheduler/scheduler_worker_pool_impl.h" #include #include #include #include "base/atomicops.h" #include "base/auto_reset.h" #include "base/bind.h" #include "base/bind_helpers.h" #include "base/compiler_specific.h" #include "base/location.h" #include "base/memory/ptr_util.h" #include "base/metrics/histogram.h" #include "base/sequence_token.h" #include "base/strings/string_util.h" #include "base/strings/stringprintf.h" #include "base/task_scheduler/scheduler_worker_pool_params.h" #include "base/task_scheduler/task_tracker.h" #include "base/task_scheduler/task_traits.h" #include "base/threading/platform_thread.h" #include "base/threading/scoped_blocking_call.h" #include "base/threading/thread_checker.h" #include "base/threading/thread_restrictions.h" #if defined(OS_WIN) #include "base/win/scoped_com_initializer.h" #include "base/win/scoped_windows_thread_environment.h" #include "base/win/scoped_winrt_initializer.h" #include "base/win/windows_version.h" #endif // defined(OS_WIN) namespace base { namespace internal { constexpr TimeDelta SchedulerWorkerPoolImpl::kBlockedWorkersPollPeriod; namespace { constexpr char kPoolNameSuffix[] = "Pool"; constexpr char kDetachDurationHistogramPrefix[] = "TaskScheduler.DetachDuration."; constexpr char kNumTasksBeforeDetachHistogramPrefix[] = "TaskScheduler.NumTasksBeforeDetach."; constexpr char kNumTasksBetweenWaitsHistogramPrefix[] = "TaskScheduler.NumTasksBetweenWaits."; constexpr size_t kMaxNumberOfWorkers = 256; // Only used in DCHECKs. bool ContainsWorker(const std::vector>& workers, const SchedulerWorker* worker) { auto it = std::find_if(workers.begin(), workers.end(), [worker](const scoped_refptr& i) { return i.get() == worker; }); return it != workers.end(); } } // namespace class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl : public SchedulerWorker::Delegate, public BlockingObserver { public: // |outer| owns the worker for which this delegate is constructed. SchedulerWorkerDelegateImpl(TrackedRef outer); ~SchedulerWorkerDelegateImpl() override; // SchedulerWorker::Delegate: void OnCanScheduleSequence(scoped_refptr sequence) override; SchedulerWorker::ThreadLabel GetThreadLabel() const override; void OnMainEntry(const SchedulerWorker* worker) override; scoped_refptr GetWork(SchedulerWorker* worker) override; void DidRunTask() override; void ReEnqueueSequence(scoped_refptr sequence) override; TimeDelta GetSleepTimeout() override; void OnMainExit(SchedulerWorker* worker) override; // BlockingObserver: void BlockingStarted(BlockingType blocking_type) override; void BlockingTypeUpgraded() override; void BlockingEnded() override; void MayBlockEntered(); void WillBlockEntered(); // Returns true iff this worker has been within a MAY_BLOCK ScopedBlockingCall // for more than |outer_->MayBlockThreshold()|. The max tasks must be // incremented if this returns true. bool MustIncrementMaxTasksLockRequired(); bool is_running_background_task_lock_required() const { outer_->lock_.AssertAcquired(); return is_running_background_task_; } private: // Returns true if |worker| is allowed to cleanup and remove itself from the // pool. Called from GetWork() when no work is available. bool CanCleanupLockRequired(const SchedulerWorker* worker) const; // Calls cleanup on |worker| and removes it from the pool. Called from // GetWork() when no work is available and CanCleanupLockRequired() returns // true. void CleanupLockRequired(SchedulerWorker* worker); // Called in GetWork() when a worker becomes idle. void OnWorkerBecomesIdleLockRequired(SchedulerWorker* worker); const TrackedRef outer_; // Time of the last detach. TimeTicks last_detach_time_; // Number of tasks executed since the last time the // TaskScheduler.NumTasksBetweenWaits histogram was recorded. size_t num_tasks_since_last_wait_ = 0; // Number of tasks executed since the last time the // TaskScheduler.NumTasksBeforeDetach histogram was recorded. size_t num_tasks_since_last_detach_ = 0; // Whether |outer_->max_tasks_| was incremented due to a ScopedBlockingCall on // the thread. Access synchronized by |outer_->lock_|. bool incremented_max_tasks_since_blocked_ = false; // Time when MayBlockScopeEntered() was last called. Reset when // BlockingScopeExited() is called. Access synchronized by |outer_->lock_|. TimeTicks may_block_start_time_; // Whether this worker is currently running a task (i.e. GetWork() has // returned a non-empty sequence and DidRunTask() hasn't been called yet). bool is_running_task_ = false; // Whether this worker is currently running a TaskPriority::BACKGROUND task. // Writes are made from the worker thread and are protected by // |outer_->lock_|. Reads are made from any thread, they are protected by // |outer_->lock_| when made outside of the worker thread. bool is_running_background_task_ = false; #if defined(OS_WIN) std::unique_ptr win_thread_environment_; #endif // defined(OS_WIN) // Verifies that specific calls are always made from the worker thread. THREAD_CHECKER(worker_thread_checker_); DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl); }; SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl( StringPiece histogram_label, StringPiece pool_label, ThreadPriority priority_hint, TrackedRef task_tracker, DelayedTaskManager* delayed_task_manager) : SchedulerWorkerPool(std::move(task_tracker), delayed_task_manager), pool_label_(pool_label.as_string()), priority_hint_(priority_hint), lock_(shared_priority_queue_.container_lock()), idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()), // Mimics the UMA_HISTOGRAM_LONG_TIMES macro. detach_duration_histogram_(Histogram::FactoryTimeGet( JoinString({kDetachDurationHistogramPrefix, histogram_label, kPoolNameSuffix}, ""), TimeDelta::FromMilliseconds(1), TimeDelta::FromHours(1), 50, HistogramBase::kUmaTargetedHistogramFlag)), // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more // than 1000 tasks before detaching, there is no need to know the exact // number of tasks that ran. num_tasks_before_detach_histogram_(Histogram::FactoryGet( JoinString({kNumTasksBeforeDetachHistogramPrefix, histogram_label, kPoolNameSuffix}, ""), 1, 1000, 50, HistogramBase::kUmaTargetedHistogramFlag)), // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is // expected to run between zero and a few tens of tasks between waits. // When it runs more than 100 tasks, there is no need to know the exact // number of tasks that ran. num_tasks_between_waits_histogram_(Histogram::FactoryGet( JoinString({kNumTasksBetweenWaitsHistogramPrefix, histogram_label, kPoolNameSuffix}, ""), 1, 100, 50, HistogramBase::kUmaTargetedHistogramFlag)), tracked_ref_factory_(this) { DCHECK(!histogram_label.empty()); DCHECK(!pool_label_.empty()); } void SchedulerWorkerPoolImpl::Start( const SchedulerWorkerPoolParams& params, int max_background_tasks, scoped_refptr service_thread_task_runner, SchedulerWorkerObserver* scheduler_worker_observer, WorkerEnvironment worker_environment) { AutoSchedulerLock auto_lock(lock_); DCHECK(workers_.empty()); max_tasks_ = params.max_tasks(); DCHECK_GE(max_tasks_, 1U); initial_max_tasks_ = max_tasks_; DCHECK_LE(initial_max_tasks_, kMaxNumberOfWorkers); max_background_tasks_ = max_background_tasks; suggested_reclaim_time_ = params.suggested_reclaim_time(); backward_compatibility_ = params.backward_compatibility(); worker_environment_ = worker_environment; service_thread_task_runner_ = std::move(service_thread_task_runner); DCHECK(!scheduler_worker_observer_); scheduler_worker_observer_ = scheduler_worker_observer; // The initial number of workers is |num_wake_ups_before_start_| + 1 to try to // keep one at least one standby thread at all times (capacity permitting). const int num_initial_workers = std::min(num_wake_ups_before_start_ + 1, static_cast(max_tasks_)); workers_.reserve(num_initial_workers); for (int index = 0; index < num_initial_workers; ++index) { SchedulerWorker* worker = CreateRegisterAndStartSchedulerWorkerLockRequired(); // CHECK that the first worker can be started (assume that failure means // that threads can't be created on this machine). CHECK(worker || index > 0); if (worker) { if (index < num_wake_ups_before_start_) { worker->WakeUp(); } else { idle_workers_stack_.Push(worker); } } } } SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() { // SchedulerWorkerPool should only ever be deleted: // 1) In tests, after JoinForTesting(). // 2) In production, iff initialization failed. // In both cases |workers_| should be empty. DCHECK(workers_.empty()); } void SchedulerWorkerPoolImpl::OnCanScheduleSequence( scoped_refptr sequence) { const auto sequence_sort_key = sequence->GetSortKey(); shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), sequence_sort_key); WakeUpOneWorker(); } void SchedulerWorkerPoolImpl::GetHistograms( std::vector* histograms) const { histograms->push_back(detach_duration_histogram_); histograms->push_back(num_tasks_between_waits_histogram_); } int SchedulerWorkerPoolImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const { #if DCHECK_IS_ON() AutoSchedulerLock auto_lock(lock_); DCHECK_NE(initial_max_tasks_, 0U) << "GetMaxConcurrentTasksDeprecated() should only be called after the " << "worker pool has started."; #endif return initial_max_tasks_; } void SchedulerWorkerPoolImpl::WaitForWorkersIdleForTesting(size_t n) { AutoSchedulerLock auto_lock(lock_); #if DCHECK_IS_ON() DCHECK(!some_workers_cleaned_up_for_testing_) << "Workers detached prior to waiting for a specific number of idle " "workers. Doing the wait under such conditions is flaky. Consider " "using |suggested_reclaim_time_ = TimeDelta::Max()| for this test."; #endif WaitForWorkersIdleLockRequiredForTesting(n); } void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() { AutoSchedulerLock auto_lock(lock_); WaitForWorkersIdleLockRequiredForTesting(workers_.size()); } void SchedulerWorkerPoolImpl::WaitForWorkersCleanedUpForTesting(size_t n) { AutoSchedulerLock auto_lock(lock_); if (!num_workers_cleaned_up_for_testing_cv_) num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable(); while (num_workers_cleaned_up_for_testing_ < n) num_workers_cleaned_up_for_testing_cv_->Wait(); num_workers_cleaned_up_for_testing_ = 0; } void SchedulerWorkerPoolImpl::JoinForTesting() { #if DCHECK_IS_ON() join_for_testing_started_.Set(); #endif decltype(workers_) workers_copy; { AutoSchedulerLock auto_lock(lock_); DCHECK_GT(workers_.size(), size_t(0)) << "Joined an unstarted worker pool."; // Ensure SchedulerWorkers in |workers_| do not attempt to cleanup while // being joined. worker_cleanup_disallowed_for_testing_ = true; // Make a copy of the SchedulerWorkers so that we can call // SchedulerWorker::JoinForTesting() without holding |lock_| since // SchedulerWorkers may need to access |workers_|. workers_copy = workers_; } for (const auto& worker : workers_copy) worker->JoinForTesting(); AutoSchedulerLock auto_lock(lock_); DCHECK(workers_ == workers_copy); // Release |workers_| to clear their TrackedRef against |this|. workers_.clear(); } size_t SchedulerWorkerPoolImpl::NumberOfWorkersForTesting() const { AutoSchedulerLock auto_lock(lock_); return workers_.size(); } size_t SchedulerWorkerPoolImpl::GetMaxTasksForTesting() const { AutoSchedulerLock auto_lock(lock_); return max_tasks_; } size_t SchedulerWorkerPoolImpl::NumberOfIdleWorkersForTesting() const { AutoSchedulerLock auto_lock(lock_); return idle_workers_stack_.Size(); } void SchedulerWorkerPoolImpl::MaximizeMayBlockThresholdForTesting() { maximum_blocked_threshold_for_testing_.Set(); } SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: SchedulerWorkerDelegateImpl(TrackedRef outer) : outer_(std::move(outer)) { // Bound in OnMainEntry(). DETACH_FROM_THREAD(worker_thread_checker_); } // OnMainExit() handles the thread-affine cleanup; SchedulerWorkerDelegateImpl // can thereafter safely be deleted from any thread. SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: ~SchedulerWorkerDelegateImpl() = default; void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: OnCanScheduleSequence(scoped_refptr sequence) { outer_->OnCanScheduleSequence(std::move(sequence)); } SchedulerWorker::ThreadLabel SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetThreadLabel() const { return SchedulerWorker::ThreadLabel::POOLED; } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry( const SchedulerWorker* worker) { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); { #if DCHECK_IS_ON() AutoSchedulerLock auto_lock(outer_->lock_); DCHECK(ContainsWorker(outer_->workers_, worker)); #endif } #if defined(OS_WIN) if (outer_->worker_environment_ == WorkerEnvironment::COM_MTA) { if (win::GetVersion() >= win::VERSION_WIN8) { win_thread_environment_ = std::make_unique(); } else { win_thread_environment_ = std::make_unique( win::ScopedCOMInitializer::kMTA); } DCHECK(win_thread_environment_->Succeeded()); } #endif // defined(OS_WIN) DCHECK_EQ(num_tasks_since_last_wait_, 0U); PlatformThread::SetName( StringPrintf("TaskScheduler%sWorker", outer_->pool_label_.c_str())); outer_->BindToCurrentThread(); SetBlockingObserverForCurrentThread(this); } scoped_refptr SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork( SchedulerWorker* worker) { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); DCHECK(!is_running_task_); DCHECK(!is_running_background_task_); { AutoSchedulerLock auto_lock(outer_->lock_); DCHECK(ContainsWorker(outer_->workers_, worker)); // Calling GetWork() while on the idle worker stack indicates that we // must've reached GetWork() because of the WaitableEvent timing out. In // which case, we return no work and possibly cleanup the worker. To avoid // searching through the idle stack : use GetLastUsedTime() not being null // (or being directly on top of the idle stack) as a proxy for being on the // idle stack. const bool is_on_idle_workers_stack = outer_->idle_workers_stack_.Peek() == worker || !worker->GetLastUsedTime().is_null(); DCHECK_EQ(is_on_idle_workers_stack, outer_->idle_workers_stack_.Contains(worker)); if (is_on_idle_workers_stack) { if (CanCleanupLockRequired(worker)) CleanupLockRequired(worker); return nullptr; } // Excess workers should not get work, until they are no longer excess (i.e. // max tasks increases or another worker cleans up). This ensures that if we // have excess workers in the pool, they get a chance to no longer be excess // before being cleaned up. if (outer_->NumberOfExcessWorkersLockRequired() > outer_->idle_workers_stack_.Size()) { OnWorkerBecomesIdleLockRequired(worker); return nullptr; } } scoped_refptr sequence; { std::unique_ptr transaction( outer_->shared_priority_queue_.BeginTransaction()); if (transaction->IsEmpty()) { // |transaction| is kept alive while |worker| is added to // |idle_workers_stack_| to avoid this race: // 1. This thread creates a Transaction, finds |shared_priority_queue_| // empty and ends the Transaction. // 2. Other thread creates a Transaction, inserts a Sequence into // |shared_priority_queue_| and ends the Transaction. This can't happen // if the Transaction of step 1 is still active because because there // can only be one active Transaction per PriorityQueue at a time. // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because // |idle_workers_stack_| is empty. // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep. // No thread runs the Sequence inserted in step 2. AutoSchedulerLock auto_lock(outer_->lock_); OnWorkerBecomesIdleLockRequired(worker); return nullptr; } // Enforce that no more than |max_background_tasks_| run concurrently. const TaskPriority priority = transaction->PeekSortKey().priority(); if (priority == TaskPriority::BACKGROUND) { AutoSchedulerLock auto_lock(outer_->lock_); if (outer_->num_running_background_tasks_ < outer_->max_background_tasks_) { ++outer_->num_running_background_tasks_; is_running_background_task_ = true; } else { OnWorkerBecomesIdleLockRequired(worker); return nullptr; } } sequence = transaction->PopSequence(); } DCHECK(sequence); #if DCHECK_IS_ON() { AutoSchedulerLock auto_lock(outer_->lock_); DCHECK(!outer_->idle_workers_stack_.Contains(worker)); } #endif is_running_task_ = true; return sequence; } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); DCHECK(may_block_start_time_.is_null()); DCHECK(!incremented_max_tasks_since_blocked_); DCHECK(is_running_task_); is_running_task_ = false; if (is_running_background_task_) { AutoSchedulerLock auto_lock(outer_->lock_); --outer_->num_running_background_tasks_; is_running_background_task_ = false; } ++num_tasks_since_last_wait_; ++num_tasks_since_last_detach_; } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: ReEnqueueSequence(scoped_refptr sequence) { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); const SequenceSortKey sequence_sort_key = sequence->GetSortKey(); outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence), sequence_sort_key); // This worker will soon call GetWork(). Therefore, there is no need to wake // up a worker to run the sequence that was just inserted into // |outer_->shared_priority_queue_|. } TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: GetSleepTimeout() { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); // Sleep for an extra 10% to avoid the following pathological case: // 0) A task is running on a timer which matches |suggested_reclaim_time_|. // 1) The timer fires and this worker is created by // MaintainAtLeastOneIdleWorkerLockRequired() because the last idle // worker was assigned the task. // 2) This worker begins sleeping |suggested_reclaim_time_| (on top of the // idle stack). // 3) The task assigned to the other worker completes and the worker goes // back on the idle stack (this worker is now second on the idle stack; // its GetLastUsedTime() is set to Now()). // 4) The sleep in (2) expires. Since (3) was fast this worker is likely to // have been second on the idle stack long enough for // CanCleanupLockRequired() to be satisfied in which case this worker is // cleaned up. // 5) The timer fires at roughly the same time and we're back to (1) if (4) // resulted in a clean up; causing thread churn. // // Sleeping 10% longer in (2) makes it much less likely that (4) occurs // before (5). In that case (5) will cause (3) and refresh this worker's // GetLastUsedTime(), making CanCleanupLockRequired() return false in (4) // and avoiding churn. // // Of course the same problem arises if in (0) the timer matches // |suggested_reclaim_time_ * 1.1| but it's expected that any timer slower // than |suggested_reclaim_time_| will cause such churn during long idle // periods. If this is a problem in practice, the standby thread // configuration and algorithm should be revisited. return outer_->suggested_reclaim_time_ * 1.1; } bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: CanCleanupLockRequired(const SchedulerWorker* worker) const { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); const TimeTicks last_used_time = worker->GetLastUsedTime(); return !last_used_time.is_null() && TimeTicks::Now() - last_used_time >= outer_->suggested_reclaim_time_ && LIKELY(!outer_->worker_cleanup_disallowed_for_testing_); } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CleanupLockRequired( SchedulerWorker* worker) { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); outer_->lock_.AssertAcquired(); outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_); outer_->cleanup_timestamps_.push(TimeTicks::Now()); worker->Cleanup(); outer_->RemoveFromIdleWorkersStackLockRequired(worker); // Remove the worker from |workers_|. auto worker_iter = std::find(outer_->workers_.begin(), outer_->workers_.end(), worker); DCHECK(worker_iter != outer_->workers_.end()); outer_->workers_.erase(worker_iter); ++outer_->num_workers_cleaned_up_for_testing_; #if DCHECK_IS_ON() outer_->some_workers_cleaned_up_for_testing_ = true; #endif if (outer_->num_workers_cleaned_up_for_testing_cv_) outer_->num_workers_cleaned_up_for_testing_cv_->Signal(); } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: OnWorkerBecomesIdleLockRequired(SchedulerWorker* worker) { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); outer_->lock_.AssertAcquired(); // Record the TaskScheduler.NumTasksBetweenWaits histogram. After GetWork() // returns nullptr, the SchedulerWorker will perform a wait on its // WaitableEvent, so we record how many tasks were ran since the last wait // here. outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_); num_tasks_since_last_wait_ = 0; outer_->AddToIdleWorkersStackLockRequired(worker); } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainExit( SchedulerWorker* worker) { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); #if DCHECK_IS_ON() { bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete(); AutoSchedulerLock auto_lock(outer_->lock_); // |worker| should already have been removed from the idle workers stack and // |workers_| by the time the thread is about to exit. (except in the cases // where the pool is no longer going to be used - in which case, it's fine // for there to be invalid workers in the pool. if (!shutdown_complete && !outer_->join_for_testing_started_.IsSet()) { DCHECK(!outer_->idle_workers_stack_.Contains(worker)); DCHECK(!ContainsWorker(outer_->workers_, worker)); } } #endif #if defined(OS_WIN) win_thread_environment_.reset(); #endif // defined(OS_WIN) } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingStarted( BlockingType blocking_type) { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); // Blocking calls made outside of tasks should not influence the max tasks. if (!is_running_task_) return; switch (blocking_type) { case BlockingType::MAY_BLOCK: MayBlockEntered(); break; case BlockingType::WILL_BLOCK: WillBlockEntered(); break; } } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: BlockingTypeUpgraded() { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); { AutoSchedulerLock auto_lock(outer_->lock_); // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the // same scope already caused the max tasks to be incremented. if (incremented_max_tasks_since_blocked_) return; // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the // same scope. if (!may_block_start_time_.is_null()) { may_block_start_time_ = TimeTicks(); --outer_->num_pending_may_block_workers_; if (is_running_background_task_) --outer_->num_pending_background_may_block_workers_; } } WillBlockEntered(); } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingEnded() { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); // Ignore blocking calls made outside of tasks. if (!is_running_task_) return; AutoSchedulerLock auto_lock(outer_->lock_); if (incremented_max_tasks_since_blocked_) { outer_->DecrementMaxTasksLockRequired(is_running_background_task_); } else { DCHECK(!may_block_start_time_.is_null()); --outer_->num_pending_may_block_workers_; if (is_running_background_task_) --outer_->num_pending_background_may_block_workers_; } incremented_max_tasks_since_blocked_ = false; may_block_start_time_ = TimeTicks(); } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::MayBlockEntered() { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); { AutoSchedulerLock auto_lock(outer_->lock_); DCHECK(!incremented_max_tasks_since_blocked_); DCHECK(may_block_start_time_.is_null()); may_block_start_time_ = TimeTicks::Now(); ++outer_->num_pending_may_block_workers_; if (is_running_background_task_) ++outer_->num_pending_background_may_block_workers_; } outer_->ScheduleAdjustMaxTasksIfNeeded(); } void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::WillBlockEntered() { DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_); bool wake_up_allowed = false; { std::unique_ptr transaction( outer_->shared_priority_queue_.BeginTransaction()); AutoSchedulerLock auto_lock(outer_->lock_); DCHECK(!incremented_max_tasks_since_blocked_); DCHECK(may_block_start_time_.is_null()); incremented_max_tasks_since_blocked_ = true; outer_->IncrementMaxTasksLockRequired(is_running_background_task_); // If the number of workers was less than the old max tasks, PostTask // would've handled creating extra workers during WakeUpOneWorker. // Therefore, we don't need to do anything here. if (outer_->workers_.size() < outer_->max_tasks_ - 1) return; if (transaction->IsEmpty()) { outer_->MaintainAtLeastOneIdleWorkerLockRequired(); } else { // TODO(crbug.com/757897): We may create extra workers in this case: // |workers.size()| was equal to the old |max_tasks_|, we had multiple // ScopedBlockingCalls in parallel and we had work on the PQ. wake_up_allowed = outer_->WakeUpOneWorkerLockRequired(); // |wake_up_allowed| is true when the pool is started, and a WILL_BLOCK // scope cannot be entered before the pool starts. DCHECK(wake_up_allowed); } } // TODO(crbug.com/813857): This can be better handled in the PostTask() // codepath. We really only should do this if there are tasks pending. if (wake_up_allowed) outer_->ScheduleAdjustMaxTasksIfNeeded(); } bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl:: MustIncrementMaxTasksLockRequired() { outer_->lock_.AssertAcquired(); if (!incremented_max_tasks_since_blocked_ && !may_block_start_time_.is_null() && TimeTicks::Now() - may_block_start_time_ >= outer_->MayBlockThreshold()) { incremented_max_tasks_since_blocked_ = true; // Reset |may_block_start_time_| so that BlockingScopeExited() knows that it // doesn't have to decrement the number of pending MAY_BLOCK workers. may_block_start_time_ = TimeTicks(); --outer_->num_pending_may_block_workers_; if (is_running_background_task_) --outer_->num_pending_background_may_block_workers_; return true; } return false; } void SchedulerWorkerPoolImpl::WaitForWorkersIdleLockRequiredForTesting( size_t n) { lock_.AssertAcquired(); // Make sure workers do not cleanup while watching the idle count. AutoReset ban_cleanups(&worker_cleanup_disallowed_for_testing_, true); while (idle_workers_stack_.Size() < n) idle_workers_stack_cv_for_testing_->Wait(); } bool SchedulerWorkerPoolImpl::WakeUpOneWorkerLockRequired() { lock_.AssertAcquired(); if (workers_.empty()) { ++num_wake_ups_before_start_; return false; } // Ensure that there is one worker that can run tasks on top of the idle // stack, capacity permitting. MaintainAtLeastOneIdleWorkerLockRequired(); // If the worker on top of the idle stack can run tasks, wake it up. if (NumberOfExcessWorkersLockRequired() < idle_workers_stack_.Size()) { SchedulerWorker* worker = idle_workers_stack_.Pop(); if (worker) { worker->WakeUp(); } } // Ensure that there is one worker that can run tasks on top of the idle // stack, capacity permitting. MaintainAtLeastOneIdleWorkerLockRequired(); return true; } void SchedulerWorkerPoolImpl::WakeUpOneWorker() { bool wake_up_allowed; { AutoSchedulerLock auto_lock(lock_); wake_up_allowed = WakeUpOneWorkerLockRequired(); } if (wake_up_allowed) ScheduleAdjustMaxTasksIfNeeded(); } void SchedulerWorkerPoolImpl::MaintainAtLeastOneIdleWorkerLockRequired() { lock_.AssertAcquired(); if (workers_.size() == kMaxNumberOfWorkers) return; DCHECK_LT(workers_.size(), kMaxNumberOfWorkers); if (idle_workers_stack_.IsEmpty() && workers_.size() < max_tasks_) { SchedulerWorker* new_worker = CreateRegisterAndStartSchedulerWorkerLockRequired(); if (new_worker) idle_workers_stack_.Push(new_worker); } } void SchedulerWorkerPoolImpl::AddToIdleWorkersStackLockRequired( SchedulerWorker* worker) { lock_.AssertAcquired(); DCHECK(!idle_workers_stack_.Contains(worker)); idle_workers_stack_.Push(worker); DCHECK_LE(idle_workers_stack_.Size(), workers_.size()); idle_workers_stack_cv_for_testing_->Broadcast(); } void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStackLockRequired( SchedulerWorker* worker) { lock_.AssertAcquired(); idle_workers_stack_.Remove(worker); } SchedulerWorker* SchedulerWorkerPoolImpl::CreateRegisterAndStartSchedulerWorkerLockRequired() { lock_.AssertAcquired(); DCHECK_LT(workers_.size(), max_tasks_); DCHECK_LT(workers_.size(), kMaxNumberOfWorkers); // SchedulerWorker needs |lock_| as a predecessor for its thread lock // because in WakeUpOneWorker, |lock_| is first acquired and then // the thread lock is acquired when WakeUp is called on the worker. scoped_refptr worker = MakeRefCounted( priority_hint_, std::make_unique( tracked_ref_factory_.GetTrackedRef()), task_tracker_, &lock_, backward_compatibility_); if (!worker->Start(scheduler_worker_observer_)) return nullptr; workers_.push_back(worker); DCHECK_LE(workers_.size(), max_tasks_); if (!cleanup_timestamps_.empty()) { detach_duration_histogram_->AddTime(TimeTicks::Now() - cleanup_timestamps_.top()); cleanup_timestamps_.pop(); } return worker.get(); } size_t SchedulerWorkerPoolImpl::NumberOfExcessWorkersLockRequired() const { lock_.AssertAcquired(); return std::max(0, workers_.size() - max_tasks_); } void SchedulerWorkerPoolImpl::AdjustMaxTasks() { DCHECK(service_thread_task_runner_->RunsTasksInCurrentSequence()); std::unique_ptr transaction( shared_priority_queue_.BeginTransaction()); AutoSchedulerLock auto_lock(lock_); const size_t previous_max_tasks = max_tasks_; // Increment max tasks for each worker that has been within a MAY_BLOCK // ScopedBlockingCall for more than MayBlockThreshold(). for (scoped_refptr worker : workers_) { // The delegates of workers inside a SchedulerWorkerPoolImpl should be // SchedulerWorkerDelegateImpls. SchedulerWorkerDelegateImpl* delegate = static_cast(worker->delegate()); if (delegate->MustIncrementMaxTasksLockRequired()) { IncrementMaxTasksLockRequired( delegate->is_running_background_task_lock_required()); } } // Wake up a worker per pending sequence, capacity permitting. const size_t num_pending_sequences = transaction->Size(); const size_t num_wake_ups_needed = std::min(max_tasks_ - previous_max_tasks, num_pending_sequences); for (size_t i = 0; i < num_wake_ups_needed; ++i) { // No need to call ScheduleAdjustMaxTasksIfNeeded() as the caller will // take care of that for us. WakeUpOneWorkerLockRequired(); } MaintainAtLeastOneIdleWorkerLockRequired(); } TimeDelta SchedulerWorkerPoolImpl::MayBlockThreshold() const { if (maximum_blocked_threshold_for_testing_.IsSet()) return TimeDelta::Max(); // This value was set unscientifically based on intuition and may be adjusted // in the future. This value is smaller than |kBlockedWorkersPollPeriod| // because we hope than when multiple workers block around the same time, a // single AdjustMaxTasks() call will perform all the necessary max tasks // adjustments. return TimeDelta::FromMilliseconds(10); } void SchedulerWorkerPoolImpl::ScheduleAdjustMaxTasksIfNeeded() { { AutoSchedulerLock auto_lock(lock_); if (polling_max_tasks_ || !ShouldPeriodicallyAdjustMaxTasksLockRequired()) { return; } polling_max_tasks_ = true; } service_thread_task_runner_->PostDelayedTask( FROM_HERE, BindOnce(&SchedulerWorkerPoolImpl::AdjustMaxTasksFunction, Unretained(this)), kBlockedWorkersPollPeriod); } void SchedulerWorkerPoolImpl::AdjustMaxTasksFunction() { DCHECK(service_thread_task_runner_->RunsTasksInCurrentSequence()); AdjustMaxTasks(); { AutoSchedulerLock auto_lock(lock_); DCHECK(polling_max_tasks_); if (!ShouldPeriodicallyAdjustMaxTasksLockRequired()) { polling_max_tasks_ = false; return; } } service_thread_task_runner_->PostDelayedTask( FROM_HERE, BindOnce(&SchedulerWorkerPoolImpl::AdjustMaxTasksFunction, Unretained(this)), kBlockedWorkersPollPeriod); } bool SchedulerWorkerPoolImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() { lock_.AssertAcquired(); // The maximum number of background tasks that can run concurrently must be // adjusted periodically when (1) the number of background tasks that are // currently running is equal to it and (2) there are workers running // background tasks within the scope of a MAY_BLOCK ScopedBlockingCall but // haven't cause a max background tasks increment yet. // - When (1) is false: A newly posted background task will be allowed to run // normally. There is no hurry to increase max background tasks. // - When (2) is false: AdjustMaxTasks() wouldn't affect // |max_background_tasks_|. if (num_running_background_tasks_ >= max_background_tasks_ && num_pending_background_may_block_workers_ > 0) { return true; } // The maximum number of tasks that can run concurrently must be adjusted // periodically when (1) there are no idle workers that can do work (2) there // are workers that are within the scope of a MAY_BLOCK ScopedBlockingCall but // haven't cause a max tasks increment yet. // - When (1) is false: A newly posted task will run on one of the idle // workers that are allowed to do work. There is no hurry to increase max // tasks. // - When (2) is false: AdjustMaxTasks() wouldn't affect |max_tasks_|. const int idle_workers_that_can_do_work = idle_workers_stack_.Size() - NumberOfExcessWorkersLockRequired(); return idle_workers_that_can_do_work <= 0 && num_pending_may_block_workers_ > 0; } void SchedulerWorkerPoolImpl::DecrementMaxTasksLockRequired( bool is_running_background_task) { lock_.AssertAcquired(); --max_tasks_; if (is_running_background_task) --max_background_tasks_; } void SchedulerWorkerPoolImpl::IncrementMaxTasksLockRequired( bool is_running_background_task) { lock_.AssertAcquired(); ++max_tasks_; if (is_running_background_task) ++max_background_tasks_; } } // namespace internal } // namespace base