1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6
7 #include <stddef.h>
8
9 #include <algorithm>
10 #include <utility>
11
12 #include "base/atomicops.h"
13 #include "base/auto_reset.h"
14 #include "base/bind.h"
15 #include "base/bind_helpers.h"
16 #include "base/compiler_specific.h"
17 #include "base/location.h"
18 #include "base/memory/ptr_util.h"
19 #include "base/metrics/histogram.h"
20 #include "base/sequence_token.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/task_scheduler/scheduler_worker_pool_params.h"
24 #include "base/task_scheduler/task_tracker.h"
25 #include "base/task_scheduler/task_traits.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/scoped_blocking_call.h"
28 #include "base/threading/thread_checker.h"
29 #include "base/threading/thread_restrictions.h"
30
31 #if defined(OS_WIN)
32 #include "base/win/scoped_com_initializer.h"
33 #include "base/win/scoped_windows_thread_environment.h"
34 #include "base/win/scoped_winrt_initializer.h"
35 #include "base/win/windows_version.h"
36 #endif // defined(OS_WIN)
37
38 namespace base {
39 namespace internal {
40
41 constexpr TimeDelta SchedulerWorkerPoolImpl::kBlockedWorkersPollPeriod;
42
43 namespace {
44
45 constexpr char kPoolNameSuffix[] = "Pool";
46 constexpr char kDetachDurationHistogramPrefix[] =
47 "TaskScheduler.DetachDuration.";
48 constexpr char kNumTasksBeforeDetachHistogramPrefix[] =
49 "TaskScheduler.NumTasksBeforeDetach.";
50 constexpr char kNumTasksBetweenWaitsHistogramPrefix[] =
51 "TaskScheduler.NumTasksBetweenWaits.";
52 constexpr size_t kMaxNumberOfWorkers = 256;
53
54 // Only used in DCHECKs.
ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>> & workers,const SchedulerWorker * worker)55 bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers,
56 const SchedulerWorker* worker) {
57 auto it = std::find_if(workers.begin(), workers.end(),
58 [worker](const scoped_refptr<SchedulerWorker>& i) {
59 return i.get() == worker;
60 });
61 return it != workers.end();
62 }
63
64 } // namespace
65
66 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
67 : public SchedulerWorker::Delegate,
68 public BlockingObserver {
69 public:
70 // |outer| owns the worker for which this delegate is constructed.
71 SchedulerWorkerDelegateImpl(TrackedRef<SchedulerWorkerPoolImpl> outer);
72 ~SchedulerWorkerDelegateImpl() override;
73
74 // SchedulerWorker::Delegate:
75 void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
76 SchedulerWorker::ThreadLabel GetThreadLabel() const override;
77 void OnMainEntry(const SchedulerWorker* worker) override;
78 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
79 void DidRunTask() override;
80 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
81 TimeDelta GetSleepTimeout() override;
82 void OnMainExit(SchedulerWorker* worker) override;
83
84 // BlockingObserver:
85 void BlockingStarted(BlockingType blocking_type) override;
86 void BlockingTypeUpgraded() override;
87 void BlockingEnded() override;
88
89 void MayBlockEntered();
90 void WillBlockEntered();
91
92 // Returns true iff this worker has been within a MAY_BLOCK ScopedBlockingCall
93 // for more than |outer_->MayBlockThreshold()|. The max tasks must be
94 // incremented if this returns true.
95 bool MustIncrementMaxTasksLockRequired();
96
is_running_background_task_lock_required() const97 bool is_running_background_task_lock_required() const {
98 outer_->lock_.AssertAcquired();
99 return is_running_background_task_;
100 }
101
102 private:
103 // Returns true if |worker| is allowed to cleanup and remove itself from the
104 // pool. Called from GetWork() when no work is available.
105 bool CanCleanupLockRequired(const SchedulerWorker* worker) const;
106
107 // Calls cleanup on |worker| and removes it from the pool. Called from
108 // GetWork() when no work is available and CanCleanupLockRequired() returns
109 // true.
110 void CleanupLockRequired(SchedulerWorker* worker);
111
112 // Called in GetWork() when a worker becomes idle.
113 void OnWorkerBecomesIdleLockRequired(SchedulerWorker* worker);
114
115 const TrackedRef<SchedulerWorkerPoolImpl> outer_;
116
117 // Time of the last detach.
118 TimeTicks last_detach_time_;
119
120 // Number of tasks executed since the last time the
121 // TaskScheduler.NumTasksBetweenWaits histogram was recorded.
122 size_t num_tasks_since_last_wait_ = 0;
123
124 // Number of tasks executed since the last time the
125 // TaskScheduler.NumTasksBeforeDetach histogram was recorded.
126 size_t num_tasks_since_last_detach_ = 0;
127
128 // Whether |outer_->max_tasks_| was incremented due to a ScopedBlockingCall on
129 // the thread. Access synchronized by |outer_->lock_|.
130 bool incremented_max_tasks_since_blocked_ = false;
131
132 // Time when MayBlockScopeEntered() was last called. Reset when
133 // BlockingScopeExited() is called. Access synchronized by |outer_->lock_|.
134 TimeTicks may_block_start_time_;
135
136 // Whether this worker is currently running a task (i.e. GetWork() has
137 // returned a non-empty sequence and DidRunTask() hasn't been called yet).
138 bool is_running_task_ = false;
139
140 // Whether this worker is currently running a TaskPriority::BACKGROUND task.
141 // Writes are made from the worker thread and are protected by
142 // |outer_->lock_|. Reads are made from any thread, they are protected by
143 // |outer_->lock_| when made outside of the worker thread.
144 bool is_running_background_task_ = false;
145
146 #if defined(OS_WIN)
147 std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment_;
148 #endif // defined(OS_WIN)
149
150 // Verifies that specific calls are always made from the worker thread.
151 THREAD_CHECKER(worker_thread_checker_);
152
153 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
154 };
155
SchedulerWorkerPoolImpl(StringPiece histogram_label,StringPiece pool_label,ThreadPriority priority_hint,TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)156 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
157 StringPiece histogram_label,
158 StringPiece pool_label,
159 ThreadPriority priority_hint,
160 TrackedRef<TaskTracker> task_tracker,
161 DelayedTaskManager* delayed_task_manager)
162 : SchedulerWorkerPool(std::move(task_tracker), delayed_task_manager),
163 pool_label_(pool_label.as_string()),
164 priority_hint_(priority_hint),
165 lock_(shared_priority_queue_.container_lock()),
166 idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()),
167 // Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
168 detach_duration_histogram_(Histogram::FactoryTimeGet(
169 JoinString({kDetachDurationHistogramPrefix, histogram_label,
170 kPoolNameSuffix},
171 ""),
172 TimeDelta::FromMilliseconds(1),
173 TimeDelta::FromHours(1),
174 50,
175 HistogramBase::kUmaTargetedHistogramFlag)),
176 // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more
177 // than 1000 tasks before detaching, there is no need to know the exact
178 // number of tasks that ran.
179 num_tasks_before_detach_histogram_(Histogram::FactoryGet(
180 JoinString({kNumTasksBeforeDetachHistogramPrefix, histogram_label,
181 kPoolNameSuffix},
182 ""),
183 1,
184 1000,
185 50,
186 HistogramBase::kUmaTargetedHistogramFlag)),
187 // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is
188 // expected to run between zero and a few tens of tasks between waits.
189 // When it runs more than 100 tasks, there is no need to know the exact
190 // number of tasks that ran.
191 num_tasks_between_waits_histogram_(Histogram::FactoryGet(
192 JoinString({kNumTasksBetweenWaitsHistogramPrefix, histogram_label,
193 kPoolNameSuffix},
194 ""),
195 1,
196 100,
197 50,
198 HistogramBase::kUmaTargetedHistogramFlag)),
199 tracked_ref_factory_(this) {
200 DCHECK(!histogram_label.empty());
201 DCHECK(!pool_label_.empty());
202 }
203
Start(const SchedulerWorkerPoolParams & params,int max_background_tasks,scoped_refptr<TaskRunner> service_thread_task_runner,SchedulerWorkerObserver * scheduler_worker_observer,WorkerEnvironment worker_environment)204 void SchedulerWorkerPoolImpl::Start(
205 const SchedulerWorkerPoolParams& params,
206 int max_background_tasks,
207 scoped_refptr<TaskRunner> service_thread_task_runner,
208 SchedulerWorkerObserver* scheduler_worker_observer,
209 WorkerEnvironment worker_environment) {
210 AutoSchedulerLock auto_lock(lock_);
211
212 DCHECK(workers_.empty());
213
214 max_tasks_ = params.max_tasks();
215 DCHECK_GE(max_tasks_, 1U);
216 initial_max_tasks_ = max_tasks_;
217 DCHECK_LE(initial_max_tasks_, kMaxNumberOfWorkers);
218 max_background_tasks_ = max_background_tasks;
219 suggested_reclaim_time_ = params.suggested_reclaim_time();
220 backward_compatibility_ = params.backward_compatibility();
221 worker_environment_ = worker_environment;
222
223 service_thread_task_runner_ = std::move(service_thread_task_runner);
224
225 DCHECK(!scheduler_worker_observer_);
226 scheduler_worker_observer_ = scheduler_worker_observer;
227
228 // The initial number of workers is |num_wake_ups_before_start_| + 1 to try to
229 // keep one at least one standby thread at all times (capacity permitting).
230 const int num_initial_workers =
231 std::min(num_wake_ups_before_start_ + 1, static_cast<int>(max_tasks_));
232 workers_.reserve(num_initial_workers);
233
234 for (int index = 0; index < num_initial_workers; ++index) {
235 SchedulerWorker* worker =
236 CreateRegisterAndStartSchedulerWorkerLockRequired();
237
238 // CHECK that the first worker can be started (assume that failure means
239 // that threads can't be created on this machine).
240 CHECK(worker || index > 0);
241
242 if (worker) {
243 if (index < num_wake_ups_before_start_) {
244 worker->WakeUp();
245 } else {
246 idle_workers_stack_.Push(worker);
247 }
248 }
249 }
250 }
251
~SchedulerWorkerPoolImpl()252 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
253 // SchedulerWorkerPool should only ever be deleted:
254 // 1) In tests, after JoinForTesting().
255 // 2) In production, iff initialization failed.
256 // In both cases |workers_| should be empty.
257 DCHECK(workers_.empty());
258 }
259
OnCanScheduleSequence(scoped_refptr<Sequence> sequence)260 void SchedulerWorkerPoolImpl::OnCanScheduleSequence(
261 scoped_refptr<Sequence> sequence) {
262 const auto sequence_sort_key = sequence->GetSortKey();
263 shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
264 sequence_sort_key);
265
266 WakeUpOneWorker();
267 }
268
GetHistograms(std::vector<const HistogramBase * > * histograms) const269 void SchedulerWorkerPoolImpl::GetHistograms(
270 std::vector<const HistogramBase*>* histograms) const {
271 histograms->push_back(detach_duration_histogram_);
272 histograms->push_back(num_tasks_between_waits_histogram_);
273 }
274
GetMaxConcurrentNonBlockedTasksDeprecated() const275 int SchedulerWorkerPoolImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const {
276 #if DCHECK_IS_ON()
277 AutoSchedulerLock auto_lock(lock_);
278 DCHECK_NE(initial_max_tasks_, 0U)
279 << "GetMaxConcurrentTasksDeprecated() should only be called after the "
280 << "worker pool has started.";
281 #endif
282 return initial_max_tasks_;
283 }
284
WaitForWorkersIdleForTesting(size_t n)285 void SchedulerWorkerPoolImpl::WaitForWorkersIdleForTesting(size_t n) {
286 AutoSchedulerLock auto_lock(lock_);
287
288 #if DCHECK_IS_ON()
289 DCHECK(!some_workers_cleaned_up_for_testing_)
290 << "Workers detached prior to waiting for a specific number of idle "
291 "workers. Doing the wait under such conditions is flaky. Consider "
292 "using |suggested_reclaim_time_ = TimeDelta::Max()| for this test.";
293 #endif
294
295 WaitForWorkersIdleLockRequiredForTesting(n);
296 }
297
WaitForAllWorkersIdleForTesting()298 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
299 AutoSchedulerLock auto_lock(lock_);
300 WaitForWorkersIdleLockRequiredForTesting(workers_.size());
301 }
302
WaitForWorkersCleanedUpForTesting(size_t n)303 void SchedulerWorkerPoolImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
304 AutoSchedulerLock auto_lock(lock_);
305
306 if (!num_workers_cleaned_up_for_testing_cv_)
307 num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable();
308
309 while (num_workers_cleaned_up_for_testing_ < n)
310 num_workers_cleaned_up_for_testing_cv_->Wait();
311
312 num_workers_cleaned_up_for_testing_ = 0;
313 }
314
JoinForTesting()315 void SchedulerWorkerPoolImpl::JoinForTesting() {
316 #if DCHECK_IS_ON()
317 join_for_testing_started_.Set();
318 #endif
319
320 decltype(workers_) workers_copy;
321 {
322 AutoSchedulerLock auto_lock(lock_);
323
324 DCHECK_GT(workers_.size(), size_t(0)) << "Joined an unstarted worker pool.";
325
326 // Ensure SchedulerWorkers in |workers_| do not attempt to cleanup while
327 // being joined.
328 worker_cleanup_disallowed_for_testing_ = true;
329
330 // Make a copy of the SchedulerWorkers so that we can call
331 // SchedulerWorker::JoinForTesting() without holding |lock_| since
332 // SchedulerWorkers may need to access |workers_|.
333 workers_copy = workers_;
334 }
335 for (const auto& worker : workers_copy)
336 worker->JoinForTesting();
337
338 AutoSchedulerLock auto_lock(lock_);
339 DCHECK(workers_ == workers_copy);
340 // Release |workers_| to clear their TrackedRef against |this|.
341 workers_.clear();
342 }
343
NumberOfWorkersForTesting() const344 size_t SchedulerWorkerPoolImpl::NumberOfWorkersForTesting() const {
345 AutoSchedulerLock auto_lock(lock_);
346 return workers_.size();
347 }
348
GetMaxTasksForTesting() const349 size_t SchedulerWorkerPoolImpl::GetMaxTasksForTesting() const {
350 AutoSchedulerLock auto_lock(lock_);
351 return max_tasks_;
352 }
353
NumberOfIdleWorkersForTesting() const354 size_t SchedulerWorkerPoolImpl::NumberOfIdleWorkersForTesting() const {
355 AutoSchedulerLock auto_lock(lock_);
356 return idle_workers_stack_.Size();
357 }
358
MaximizeMayBlockThresholdForTesting()359 void SchedulerWorkerPoolImpl::MaximizeMayBlockThresholdForTesting() {
360 maximum_blocked_threshold_for_testing_.Set();
361 }
362
363 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerDelegateImpl(TrackedRef<SchedulerWorkerPoolImpl> outer)364 SchedulerWorkerDelegateImpl(TrackedRef<SchedulerWorkerPoolImpl> outer)
365 : outer_(std::move(outer)) {
366 // Bound in OnMainEntry().
367 DETACH_FROM_THREAD(worker_thread_checker_);
368 }
369
370 // OnMainExit() handles the thread-affine cleanup; SchedulerWorkerDelegateImpl
371 // can thereafter safely be deleted from any thread.
372 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
373 ~SchedulerWorkerDelegateImpl() = default;
374
375 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
OnCanScheduleSequence(scoped_refptr<Sequence> sequence)376 OnCanScheduleSequence(scoped_refptr<Sequence> sequence) {
377 outer_->OnCanScheduleSequence(std::move(sequence));
378 }
379
380 SchedulerWorker::ThreadLabel
GetThreadLabel() const381 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetThreadLabel() const {
382 return SchedulerWorker::ThreadLabel::POOLED;
383 }
384
OnMainEntry(const SchedulerWorker * worker)385 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
386 const SchedulerWorker* worker) {
387 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
388
389 {
390 #if DCHECK_IS_ON()
391 AutoSchedulerLock auto_lock(outer_->lock_);
392 DCHECK(ContainsWorker(outer_->workers_, worker));
393 #endif
394 }
395
396 #if defined(OS_WIN)
397 if (outer_->worker_environment_ == WorkerEnvironment::COM_MTA) {
398 if (win::GetVersion() >= win::VERSION_WIN8) {
399 win_thread_environment_ = std::make_unique<win::ScopedWinrtInitializer>();
400 } else {
401 win_thread_environment_ = std::make_unique<win::ScopedCOMInitializer>(
402 win::ScopedCOMInitializer::kMTA);
403 }
404 DCHECK(win_thread_environment_->Succeeded());
405 }
406 #endif // defined(OS_WIN)
407
408 DCHECK_EQ(num_tasks_since_last_wait_, 0U);
409
410 PlatformThread::SetName(
411 StringPrintf("TaskScheduler%sWorker", outer_->pool_label_.c_str()));
412
413 outer_->BindToCurrentThread();
414 SetBlockingObserverForCurrentThread(this);
415 }
416
417 scoped_refptr<Sequence>
GetWork(SchedulerWorker * worker)418 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
419 SchedulerWorker* worker) {
420 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
421 DCHECK(!is_running_task_);
422 DCHECK(!is_running_background_task_);
423
424 {
425 AutoSchedulerLock auto_lock(outer_->lock_);
426
427 DCHECK(ContainsWorker(outer_->workers_, worker));
428
429 // Calling GetWork() while on the idle worker stack indicates that we
430 // must've reached GetWork() because of the WaitableEvent timing out. In
431 // which case, we return no work and possibly cleanup the worker. To avoid
432 // searching through the idle stack : use GetLastUsedTime() not being null
433 // (or being directly on top of the idle stack) as a proxy for being on the
434 // idle stack.
435 const bool is_on_idle_workers_stack =
436 outer_->idle_workers_stack_.Peek() == worker ||
437 !worker->GetLastUsedTime().is_null();
438 DCHECK_EQ(is_on_idle_workers_stack,
439 outer_->idle_workers_stack_.Contains(worker));
440 if (is_on_idle_workers_stack) {
441 if (CanCleanupLockRequired(worker))
442 CleanupLockRequired(worker);
443 return nullptr;
444 }
445
446 // Excess workers should not get work, until they are no longer excess (i.e.
447 // max tasks increases or another worker cleans up). This ensures that if we
448 // have excess workers in the pool, they get a chance to no longer be excess
449 // before being cleaned up.
450 if (outer_->NumberOfExcessWorkersLockRequired() >
451 outer_->idle_workers_stack_.Size()) {
452 OnWorkerBecomesIdleLockRequired(worker);
453 return nullptr;
454 }
455 }
456 scoped_refptr<Sequence> sequence;
457 {
458 std::unique_ptr<PriorityQueue::Transaction> transaction(
459 outer_->shared_priority_queue_.BeginTransaction());
460
461 if (transaction->IsEmpty()) {
462 // |transaction| is kept alive while |worker| is added to
463 // |idle_workers_stack_| to avoid this race:
464 // 1. This thread creates a Transaction, finds |shared_priority_queue_|
465 // empty and ends the Transaction.
466 // 2. Other thread creates a Transaction, inserts a Sequence into
467 // |shared_priority_queue_| and ends the Transaction. This can't happen
468 // if the Transaction of step 1 is still active because because there
469 // can only be one active Transaction per PriorityQueue at a time.
470 // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because
471 // |idle_workers_stack_| is empty.
472 // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep.
473 // No thread runs the Sequence inserted in step 2.
474 AutoSchedulerLock auto_lock(outer_->lock_);
475 OnWorkerBecomesIdleLockRequired(worker);
476 return nullptr;
477 }
478
479 // Enforce that no more than |max_background_tasks_| run concurrently.
480 const TaskPriority priority = transaction->PeekSortKey().priority();
481 if (priority == TaskPriority::BACKGROUND) {
482 AutoSchedulerLock auto_lock(outer_->lock_);
483 if (outer_->num_running_background_tasks_ <
484 outer_->max_background_tasks_) {
485 ++outer_->num_running_background_tasks_;
486 is_running_background_task_ = true;
487 } else {
488 OnWorkerBecomesIdleLockRequired(worker);
489 return nullptr;
490 }
491 }
492
493 sequence = transaction->PopSequence();
494 }
495 DCHECK(sequence);
496 #if DCHECK_IS_ON()
497 {
498 AutoSchedulerLock auto_lock(outer_->lock_);
499 DCHECK(!outer_->idle_workers_stack_.Contains(worker));
500 }
501 #endif
502
503 is_running_task_ = true;
504 return sequence;
505 }
506
DidRunTask()507 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
508 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
509 DCHECK(may_block_start_time_.is_null());
510 DCHECK(!incremented_max_tasks_since_blocked_);
511 DCHECK(is_running_task_);
512
513 is_running_task_ = false;
514
515 if (is_running_background_task_) {
516 AutoSchedulerLock auto_lock(outer_->lock_);
517 --outer_->num_running_background_tasks_;
518 is_running_background_task_ = false;
519 }
520
521 ++num_tasks_since_last_wait_;
522 ++num_tasks_since_last_detach_;
523 }
524
525 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
ReEnqueueSequence(scoped_refptr<Sequence> sequence)526 ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
527 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
528
529 const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
530 outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
531 sequence_sort_key);
532 // This worker will soon call GetWork(). Therefore, there is no need to wake
533 // up a worker to run the sequence that was just inserted into
534 // |outer_->shared_priority_queue_|.
535 }
536
537 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
GetSleepTimeout()538 GetSleepTimeout() {
539 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
540 // Sleep for an extra 10% to avoid the following pathological case:
541 // 0) A task is running on a timer which matches |suggested_reclaim_time_|.
542 // 1) The timer fires and this worker is created by
543 // MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
544 // worker was assigned the task.
545 // 2) This worker begins sleeping |suggested_reclaim_time_| (on top of the
546 // idle stack).
547 // 3) The task assigned to the other worker completes and the worker goes
548 // back on the idle stack (this worker is now second on the idle stack;
549 // its GetLastUsedTime() is set to Now()).
550 // 4) The sleep in (2) expires. Since (3) was fast this worker is likely to
551 // have been second on the idle stack long enough for
552 // CanCleanupLockRequired() to be satisfied in which case this worker is
553 // cleaned up.
554 // 5) The timer fires at roughly the same time and we're back to (1) if (4)
555 // resulted in a clean up; causing thread churn.
556 //
557 // Sleeping 10% longer in (2) makes it much less likely that (4) occurs
558 // before (5). In that case (5) will cause (3) and refresh this worker's
559 // GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
560 // and avoiding churn.
561 //
562 // Of course the same problem arises if in (0) the timer matches
563 // |suggested_reclaim_time_ * 1.1| but it's expected that any timer slower
564 // than |suggested_reclaim_time_| will cause such churn during long idle
565 // periods. If this is a problem in practice, the standby thread
566 // configuration and algorithm should be revisited.
567 return outer_->suggested_reclaim_time_ * 1.1;
568 }
569
570 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
CanCleanupLockRequired(const SchedulerWorker * worker) const571 CanCleanupLockRequired(const SchedulerWorker* worker) const {
572 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
573
574 const TimeTicks last_used_time = worker->GetLastUsedTime();
575 return !last_used_time.is_null() &&
576 TimeTicks::Now() - last_used_time >= outer_->suggested_reclaim_time_ &&
577 LIKELY(!outer_->worker_cleanup_disallowed_for_testing_);
578 }
579
CleanupLockRequired(SchedulerWorker * worker)580 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CleanupLockRequired(
581 SchedulerWorker* worker) {
582 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
583
584 outer_->lock_.AssertAcquired();
585 outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_);
586 outer_->cleanup_timestamps_.push(TimeTicks::Now());
587 worker->Cleanup();
588 outer_->RemoveFromIdleWorkersStackLockRequired(worker);
589
590 // Remove the worker from |workers_|.
591 auto worker_iter =
592 std::find(outer_->workers_.begin(), outer_->workers_.end(), worker);
593 DCHECK(worker_iter != outer_->workers_.end());
594 outer_->workers_.erase(worker_iter);
595
596 ++outer_->num_workers_cleaned_up_for_testing_;
597 #if DCHECK_IS_ON()
598 outer_->some_workers_cleaned_up_for_testing_ = true;
599 #endif
600 if (outer_->num_workers_cleaned_up_for_testing_cv_)
601 outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
602 }
603
604 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
OnWorkerBecomesIdleLockRequired(SchedulerWorker * worker)605 OnWorkerBecomesIdleLockRequired(SchedulerWorker* worker) {
606 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
607
608 outer_->lock_.AssertAcquired();
609 // Record the TaskScheduler.NumTasksBetweenWaits histogram. After GetWork()
610 // returns nullptr, the SchedulerWorker will perform a wait on its
611 // WaitableEvent, so we record how many tasks were ran since the last wait
612 // here.
613 outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_);
614 num_tasks_since_last_wait_ = 0;
615 outer_->AddToIdleWorkersStackLockRequired(worker);
616 }
617
OnMainExit(SchedulerWorker * worker)618 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainExit(
619 SchedulerWorker* worker) {
620 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
621
622 #if DCHECK_IS_ON()
623 {
624 bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
625 AutoSchedulerLock auto_lock(outer_->lock_);
626
627 // |worker| should already have been removed from the idle workers stack and
628 // |workers_| by the time the thread is about to exit. (except in the cases
629 // where the pool is no longer going to be used - in which case, it's fine
630 // for there to be invalid workers in the pool.
631 if (!shutdown_complete && !outer_->join_for_testing_started_.IsSet()) {
632 DCHECK(!outer_->idle_workers_stack_.Contains(worker));
633 DCHECK(!ContainsWorker(outer_->workers_, worker));
634 }
635 }
636 #endif
637
638 #if defined(OS_WIN)
639 win_thread_environment_.reset();
640 #endif // defined(OS_WIN)
641 }
642
BlockingStarted(BlockingType blocking_type)643 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingStarted(
644 BlockingType blocking_type) {
645 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
646
647 // Blocking calls made outside of tasks should not influence the max tasks.
648 if (!is_running_task_)
649 return;
650
651 switch (blocking_type) {
652 case BlockingType::MAY_BLOCK:
653 MayBlockEntered();
654 break;
655 case BlockingType::WILL_BLOCK:
656 WillBlockEntered();
657 break;
658 }
659 }
660
661 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
BlockingTypeUpgraded()662 BlockingTypeUpgraded() {
663 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
664
665 {
666 AutoSchedulerLock auto_lock(outer_->lock_);
667
668 // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
669 // same scope already caused the max tasks to be incremented.
670 if (incremented_max_tasks_since_blocked_)
671 return;
672
673 // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
674 // same scope.
675 if (!may_block_start_time_.is_null()) {
676 may_block_start_time_ = TimeTicks();
677 --outer_->num_pending_may_block_workers_;
678 if (is_running_background_task_)
679 --outer_->num_pending_background_may_block_workers_;
680 }
681 }
682
683 WillBlockEntered();
684 }
685
BlockingEnded()686 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingEnded() {
687 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
688
689 // Ignore blocking calls made outside of tasks.
690 if (!is_running_task_)
691 return;
692
693 AutoSchedulerLock auto_lock(outer_->lock_);
694 if (incremented_max_tasks_since_blocked_) {
695 outer_->DecrementMaxTasksLockRequired(is_running_background_task_);
696 } else {
697 DCHECK(!may_block_start_time_.is_null());
698 --outer_->num_pending_may_block_workers_;
699 if (is_running_background_task_)
700 --outer_->num_pending_background_may_block_workers_;
701 }
702
703 incremented_max_tasks_since_blocked_ = false;
704 may_block_start_time_ = TimeTicks();
705 }
706
MayBlockEntered()707 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::MayBlockEntered() {
708 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
709
710 {
711 AutoSchedulerLock auto_lock(outer_->lock_);
712
713 DCHECK(!incremented_max_tasks_since_blocked_);
714 DCHECK(may_block_start_time_.is_null());
715 may_block_start_time_ = TimeTicks::Now();
716 ++outer_->num_pending_may_block_workers_;
717 if (is_running_background_task_)
718 ++outer_->num_pending_background_may_block_workers_;
719 }
720 outer_->ScheduleAdjustMaxTasksIfNeeded();
721 }
722
WillBlockEntered()723 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::WillBlockEntered() {
724 DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
725
726 bool wake_up_allowed = false;
727 {
728 std::unique_ptr<PriorityQueue::Transaction> transaction(
729 outer_->shared_priority_queue_.BeginTransaction());
730 AutoSchedulerLock auto_lock(outer_->lock_);
731
732 DCHECK(!incremented_max_tasks_since_blocked_);
733 DCHECK(may_block_start_time_.is_null());
734 incremented_max_tasks_since_blocked_ = true;
735 outer_->IncrementMaxTasksLockRequired(is_running_background_task_);
736
737 // If the number of workers was less than the old max tasks, PostTask
738 // would've handled creating extra workers during WakeUpOneWorker.
739 // Therefore, we don't need to do anything here.
740 if (outer_->workers_.size() < outer_->max_tasks_ - 1)
741 return;
742
743 if (transaction->IsEmpty()) {
744 outer_->MaintainAtLeastOneIdleWorkerLockRequired();
745 } else {
746 // TODO(crbug.com/757897): We may create extra workers in this case:
747 // |workers.size()| was equal to the old |max_tasks_|, we had multiple
748 // ScopedBlockingCalls in parallel and we had work on the PQ.
749 wake_up_allowed = outer_->WakeUpOneWorkerLockRequired();
750 // |wake_up_allowed| is true when the pool is started, and a WILL_BLOCK
751 // scope cannot be entered before the pool starts.
752 DCHECK(wake_up_allowed);
753 }
754 }
755 // TODO(crbug.com/813857): This can be better handled in the PostTask()
756 // codepath. We really only should do this if there are tasks pending.
757 if (wake_up_allowed)
758 outer_->ScheduleAdjustMaxTasksIfNeeded();
759 }
760
761 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
MustIncrementMaxTasksLockRequired()762 MustIncrementMaxTasksLockRequired() {
763 outer_->lock_.AssertAcquired();
764
765 if (!incremented_max_tasks_since_blocked_ &&
766 !may_block_start_time_.is_null() &&
767 TimeTicks::Now() - may_block_start_time_ >= outer_->MayBlockThreshold()) {
768 incremented_max_tasks_since_blocked_ = true;
769
770 // Reset |may_block_start_time_| so that BlockingScopeExited() knows that it
771 // doesn't have to decrement the number of pending MAY_BLOCK workers.
772 may_block_start_time_ = TimeTicks();
773 --outer_->num_pending_may_block_workers_;
774 if (is_running_background_task_)
775 --outer_->num_pending_background_may_block_workers_;
776
777 return true;
778 }
779
780 return false;
781 }
782
WaitForWorkersIdleLockRequiredForTesting(size_t n)783 void SchedulerWorkerPoolImpl::WaitForWorkersIdleLockRequiredForTesting(
784 size_t n) {
785 lock_.AssertAcquired();
786
787 // Make sure workers do not cleanup while watching the idle count.
788 AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
789
790 while (idle_workers_stack_.Size() < n)
791 idle_workers_stack_cv_for_testing_->Wait();
792 }
793
WakeUpOneWorkerLockRequired()794 bool SchedulerWorkerPoolImpl::WakeUpOneWorkerLockRequired() {
795 lock_.AssertAcquired();
796
797 if (workers_.empty()) {
798 ++num_wake_ups_before_start_;
799 return false;
800 }
801
802 // Ensure that there is one worker that can run tasks on top of the idle
803 // stack, capacity permitting.
804 MaintainAtLeastOneIdleWorkerLockRequired();
805
806 // If the worker on top of the idle stack can run tasks, wake it up.
807 if (NumberOfExcessWorkersLockRequired() < idle_workers_stack_.Size()) {
808 SchedulerWorker* worker = idle_workers_stack_.Pop();
809 if (worker) {
810 worker->WakeUp();
811 }
812 }
813
814 // Ensure that there is one worker that can run tasks on top of the idle
815 // stack, capacity permitting.
816 MaintainAtLeastOneIdleWorkerLockRequired();
817
818 return true;
819 }
820
WakeUpOneWorker()821 void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
822 bool wake_up_allowed;
823 {
824 AutoSchedulerLock auto_lock(lock_);
825 wake_up_allowed = WakeUpOneWorkerLockRequired();
826 }
827 if (wake_up_allowed)
828 ScheduleAdjustMaxTasksIfNeeded();
829 }
830
MaintainAtLeastOneIdleWorkerLockRequired()831 void SchedulerWorkerPoolImpl::MaintainAtLeastOneIdleWorkerLockRequired() {
832 lock_.AssertAcquired();
833
834 if (workers_.size() == kMaxNumberOfWorkers)
835 return;
836 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
837
838 if (idle_workers_stack_.IsEmpty() && workers_.size() < max_tasks_) {
839 SchedulerWorker* new_worker =
840 CreateRegisterAndStartSchedulerWorkerLockRequired();
841 if (new_worker)
842 idle_workers_stack_.Push(new_worker);
843 }
844 }
845
AddToIdleWorkersStackLockRequired(SchedulerWorker * worker)846 void SchedulerWorkerPoolImpl::AddToIdleWorkersStackLockRequired(
847 SchedulerWorker* worker) {
848 lock_.AssertAcquired();
849
850 DCHECK(!idle_workers_stack_.Contains(worker));
851 idle_workers_stack_.Push(worker);
852
853 DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
854
855 idle_workers_stack_cv_for_testing_->Broadcast();
856 }
857
RemoveFromIdleWorkersStackLockRequired(SchedulerWorker * worker)858 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStackLockRequired(
859 SchedulerWorker* worker) {
860 lock_.AssertAcquired();
861 idle_workers_stack_.Remove(worker);
862 }
863
864 SchedulerWorker*
CreateRegisterAndStartSchedulerWorkerLockRequired()865 SchedulerWorkerPoolImpl::CreateRegisterAndStartSchedulerWorkerLockRequired() {
866 lock_.AssertAcquired();
867
868 DCHECK_LT(workers_.size(), max_tasks_);
869 DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
870 // SchedulerWorker needs |lock_| as a predecessor for its thread lock
871 // because in WakeUpOneWorker, |lock_| is first acquired and then
872 // the thread lock is acquired when WakeUp is called on the worker.
873 scoped_refptr<SchedulerWorker> worker = MakeRefCounted<SchedulerWorker>(
874 priority_hint_,
875 std::make_unique<SchedulerWorkerDelegateImpl>(
876 tracked_ref_factory_.GetTrackedRef()),
877 task_tracker_, &lock_, backward_compatibility_);
878
879 if (!worker->Start(scheduler_worker_observer_))
880 return nullptr;
881
882 workers_.push_back(worker);
883 DCHECK_LE(workers_.size(), max_tasks_);
884
885 if (!cleanup_timestamps_.empty()) {
886 detach_duration_histogram_->AddTime(TimeTicks::Now() -
887 cleanup_timestamps_.top());
888 cleanup_timestamps_.pop();
889 }
890 return worker.get();
891 }
892
NumberOfExcessWorkersLockRequired() const893 size_t SchedulerWorkerPoolImpl::NumberOfExcessWorkersLockRequired() const {
894 lock_.AssertAcquired();
895 return std::max<int>(0, workers_.size() - max_tasks_);
896 }
897
AdjustMaxTasks()898 void SchedulerWorkerPoolImpl::AdjustMaxTasks() {
899 DCHECK(service_thread_task_runner_->RunsTasksInCurrentSequence());
900
901 std::unique_ptr<PriorityQueue::Transaction> transaction(
902 shared_priority_queue_.BeginTransaction());
903 AutoSchedulerLock auto_lock(lock_);
904
905 const size_t previous_max_tasks = max_tasks_;
906
907 // Increment max tasks for each worker that has been within a MAY_BLOCK
908 // ScopedBlockingCall for more than MayBlockThreshold().
909 for (scoped_refptr<SchedulerWorker> worker : workers_) {
910 // The delegates of workers inside a SchedulerWorkerPoolImpl should be
911 // SchedulerWorkerDelegateImpls.
912 SchedulerWorkerDelegateImpl* delegate =
913 static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate());
914 if (delegate->MustIncrementMaxTasksLockRequired()) {
915 IncrementMaxTasksLockRequired(
916 delegate->is_running_background_task_lock_required());
917 }
918 }
919
920 // Wake up a worker per pending sequence, capacity permitting.
921 const size_t num_pending_sequences = transaction->Size();
922 const size_t num_wake_ups_needed =
923 std::min(max_tasks_ - previous_max_tasks, num_pending_sequences);
924
925 for (size_t i = 0; i < num_wake_ups_needed; ++i) {
926 // No need to call ScheduleAdjustMaxTasksIfNeeded() as the caller will
927 // take care of that for us.
928 WakeUpOneWorkerLockRequired();
929 }
930
931 MaintainAtLeastOneIdleWorkerLockRequired();
932 }
933
MayBlockThreshold() const934 TimeDelta SchedulerWorkerPoolImpl::MayBlockThreshold() const {
935 if (maximum_blocked_threshold_for_testing_.IsSet())
936 return TimeDelta::Max();
937 // This value was set unscientifically based on intuition and may be adjusted
938 // in the future. This value is smaller than |kBlockedWorkersPollPeriod|
939 // because we hope than when multiple workers block around the same time, a
940 // single AdjustMaxTasks() call will perform all the necessary max tasks
941 // adjustments.
942 return TimeDelta::FromMilliseconds(10);
943 }
944
ScheduleAdjustMaxTasksIfNeeded()945 void SchedulerWorkerPoolImpl::ScheduleAdjustMaxTasksIfNeeded() {
946 {
947 AutoSchedulerLock auto_lock(lock_);
948 if (polling_max_tasks_ || !ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
949 return;
950 }
951 polling_max_tasks_ = true;
952 }
953 service_thread_task_runner_->PostDelayedTask(
954 FROM_HERE,
955 BindOnce(&SchedulerWorkerPoolImpl::AdjustMaxTasksFunction,
956 Unretained(this)),
957 kBlockedWorkersPollPeriod);
958 }
959
AdjustMaxTasksFunction()960 void SchedulerWorkerPoolImpl::AdjustMaxTasksFunction() {
961 DCHECK(service_thread_task_runner_->RunsTasksInCurrentSequence());
962
963 AdjustMaxTasks();
964 {
965 AutoSchedulerLock auto_lock(lock_);
966 DCHECK(polling_max_tasks_);
967
968 if (!ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
969 polling_max_tasks_ = false;
970 return;
971 }
972 }
973 service_thread_task_runner_->PostDelayedTask(
974 FROM_HERE,
975 BindOnce(&SchedulerWorkerPoolImpl::AdjustMaxTasksFunction,
976 Unretained(this)),
977 kBlockedWorkersPollPeriod);
978 }
979
ShouldPeriodicallyAdjustMaxTasksLockRequired()980 bool SchedulerWorkerPoolImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
981 lock_.AssertAcquired();
982
983 // The maximum number of background tasks that can run concurrently must be
984 // adjusted periodically when (1) the number of background tasks that are
985 // currently running is equal to it and (2) there are workers running
986 // background tasks within the scope of a MAY_BLOCK ScopedBlockingCall but
987 // haven't cause a max background tasks increment yet.
988 // - When (1) is false: A newly posted background task will be allowed to run
989 // normally. There is no hurry to increase max background tasks.
990 // - When (2) is false: AdjustMaxTasks() wouldn't affect
991 // |max_background_tasks_|.
992 if (num_running_background_tasks_ >= max_background_tasks_ &&
993 num_pending_background_may_block_workers_ > 0) {
994 return true;
995 }
996
997 // The maximum number of tasks that can run concurrently must be adjusted
998 // periodically when (1) there are no idle workers that can do work (2) there
999 // are workers that are within the scope of a MAY_BLOCK ScopedBlockingCall but
1000 // haven't cause a max tasks increment yet.
1001 // - When (1) is false: A newly posted task will run on one of the idle
1002 // workers that are allowed to do work. There is no hurry to increase max
1003 // tasks.
1004 // - When (2) is false: AdjustMaxTasks() wouldn't affect |max_tasks_|.
1005 const int idle_workers_that_can_do_work =
1006 idle_workers_stack_.Size() - NumberOfExcessWorkersLockRequired();
1007 return idle_workers_that_can_do_work <= 0 &&
1008 num_pending_may_block_workers_ > 0;
1009 }
1010
DecrementMaxTasksLockRequired(bool is_running_background_task)1011 void SchedulerWorkerPoolImpl::DecrementMaxTasksLockRequired(
1012 bool is_running_background_task) {
1013 lock_.AssertAcquired();
1014 --max_tasks_;
1015 if (is_running_background_task)
1016 --max_background_tasks_;
1017 }
1018
IncrementMaxTasksLockRequired(bool is_running_background_task)1019 void SchedulerWorkerPoolImpl::IncrementMaxTasksLockRequired(
1020 bool is_running_background_task) {
1021 lock_.AssertAcquired();
1022 ++max_tasks_;
1023 if (is_running_background_task)
1024 ++max_background_tasks_;
1025 }
1026
1027 } // namespace internal
1028 } // namespace base
1029