• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
7 
8 #include <stddef.h>
9 
10 #include <memory>
11 #include <string>
12 #include <vector>
13 
14 #include "base/base_export.h"
15 #include "base/check.h"
16 #include "base/compiler_specific.h"
17 #include "base/dcheck_is_on.h"
18 #include "base/gtest_prod_util.h"
19 #include "base/memory/raw_ptr.h"
20 #include "base/strings/string_piece.h"
21 #include "base/synchronization/condition_variable.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/task/sequenced_task_runner.h"
24 #include "base/task/task_features.h"
25 #include "base/task/thread_pool/task.h"
26 #include "base/task/thread_pool/task_source.h"
27 #include "base/task/thread_pool/thread_group.h"
28 #include "base/task/thread_pool/tracked_ref.h"
29 #include "base/task/thread_pool/worker_thread.h"
30 #include "base/task/thread_pool/worker_thread_set.h"
31 #include "base/time/time.h"
32 #include "third_party/abseil-cpp/absl/types/optional.h"
33 
34 namespace base {
35 
36 class WorkerThreadObserver;
37 
38 namespace internal {
39 
40 class TaskTracker;
41 
42 // A group of workers that run Tasks.
43 //
44 // The thread group doesn't create threads until Start() is called. Tasks can be
45 // posted at any time but will not run until after Start() is called.
46 //
47 // This class is thread-safe.
48 class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
49  public:
50   // Constructs a group without workers.
51   //
52   // |histogram_label| is used to label the thread group's histograms as
53   // "ThreadPool." + histogram_name + "." + |histogram_label| + extra suffixes.
54   // It must not be empty. |thread group_label| is used to label the thread
55   // group's threads, it must not be empty. |thread_type_hint| is the preferred
56   // thread type; the actual thread type depends on shutdown state and platform
57   // capabilities. |task_tracker| keeps track of tasks.
58   ThreadGroupImpl(StringPiece histogram_label,
59                   StringPiece thread_group_label,
60                   ThreadType thread_type_hint,
61                   TrackedRef<TaskTracker> task_tracker,
62                   TrackedRef<Delegate> delegate);
63 
64   // Creates threads, allowing existing and future tasks to run. The thread
65   // group runs at most |max_tasks| / `max_best_effort_tasks` unblocked task
66   // with any / BEST_EFFORT priority concurrently. It reclaims unused threads
67   // after `suggested_reclaim_time`. It uses `service_thread_task_runner` to
68   // monitor for blocked tasks, `service_thread_task_runner` is used to setup
69   // FileDescriptorWatcher on worker threads. It must refer to a Thread with
70   // MessagePumpType::IO. If specified, it notifies |worker_thread_observer|
71   // when a worker enters and exits its main function (the observer must not be
72   // destroyed before JoinForTesting() has returned). |worker_environment|
73   // specifies the environment in which tasks are executed.
74   // |may_block_threshold| is the timeout after which a task in a MAY_BLOCK
75   // ScopedBlockingCall is considered blocked (the thread group will choose an
76   // appropriate value if none is specified).
77   // `synchronous_thread_start_for_testing` is true if this ThreadGroupImpl
78   // should synchronously wait for OnMainEntry() after starting each worker. Can
79   // only be called once. CHECKs on failure.
80   void Start(size_t max_tasks,
81              size_t max_best_effort_tasks,
82              TimeDelta suggested_reclaim_time,
83              scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
84              WorkerThreadObserver* worker_thread_observer,
85              WorkerEnvironment worker_environment,
86              bool synchronous_thread_start_for_testing = false,
87              absl::optional<TimeDelta> may_block_threshold =
88                  absl::optional<TimeDelta>());
89 
90   ThreadGroupImpl(const ThreadGroupImpl&) = delete;
91   ThreadGroupImpl& operator=(const ThreadGroupImpl&) = delete;
92   // Destroying a ThreadGroupImpl is not allowed in production; it is always
93   // leaked. In tests, it can only be destroyed after JoinForTesting() has
94   // returned.
95   ~ThreadGroupImpl() override;
96 
97   // ThreadGroup:
98   void JoinForTesting() override;
99   size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override;
100   void DidUpdateCanRunPolicy() override;
101   void OnShutdownStarted() override;
102 
103   // Waits until at least |n| workers are idle. Note that while workers are
104   // disallowed from cleaning up during this call: tests using a custom
105   // |suggested_reclaim_time_| need to be careful to invoke this swiftly after
106   // unblocking the waited upon workers as: if a worker is already detached by
107   // the time this is invoked, it will never make it onto the idle set and
108   // this call will hang.
109   void WaitForWorkersIdleForTesting(size_t n);
110 
111   // Waits until at least |n| workers are idle.
112   void WaitForWorkersIdleLockRequiredForTesting(size_t n)
113       EXCLUSIVE_LOCKS_REQUIRED(lock_);
114 
115   // Waits until all workers are idle.
116   void WaitForAllWorkersIdleForTesting();
117 
118   // Waits until |n| workers have cleaned up (went through
119   // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
120   // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
121   void WaitForWorkersCleanedUpForTesting(size_t n);
122 
123   // Returns the number of workers in this thread group.
124   size_t NumberOfWorkersForTesting() const;
125 
126   // Returns |max_tasks_|/|max_best_effort_tasks_|.
127   size_t GetMaxTasksForTesting() const;
128   size_t GetMaxBestEffortTasksForTesting() const;
129 
130   // Returns the number of workers that are idle (i.e. not running tasks).
131   size_t NumberOfIdleWorkersForTesting() const;
132 
133  private:
134   class ScopedCommandsExecutor;
135   class WorkerThreadDelegateImpl;
136 
137   // Friend tests so that they can access |blocked_workers_poll_period| and
138   // may_block_threshold().
139   friend class ThreadGroupImplBlockingTest;
140   friend class ThreadGroupImplMayBlockTest;
141   FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
142                            ThreadBlockUnblockPremature);
143   FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
144                            ThreadBlockUnblockPrematureBestEffort);
145 
146   // ThreadGroup:
147   void UpdateSortKey(TaskSource::Transaction transaction) override;
148   void PushTaskSourceAndWakeUpWorkers(
149       RegisteredTaskSourceAndTransaction transaction_with_task_source) override;
150   void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor)
151       override EXCLUSIVE_LOCKS_REQUIRED(lock_);
152 
153   // Creates a worker and schedules its start, if needed, to maintain one idle
154   // worker, |max_tasks_| permitting.
155   void MaintainAtLeastOneIdleWorkerLockRequired(
156       ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
157 
158   // Creates a worker, adds it to the thread group, schedules its start and
159   // returns it. Cannot be called before Start().
160   scoped_refptr<WorkerThreadWaitableEvent> CreateAndRegisterWorkerLockRequired(
161       ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
162 
163   // Returns the number of workers that are awake (i.e. not on the idle set).
164   size_t GetNumAwakeWorkersLockRequired() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
165 
166   // Returns the desired number of awake workers, given current workload and
167   // concurrency limits.
168   size_t GetDesiredNumAwakeWorkersLockRequired() const
169       EXCLUSIVE_LOCKS_REQUIRED(lock_);
170 
171   // Examines the list of WorkerThreads and increments |max_tasks_| for each
172   // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for
173   // more than BlockedThreshold(). Reschedules a call if necessary.
174   void AdjustMaxTasks();
175 
176   // Returns the threshold after which the max tasks is increased to compensate
177   // for a worker that is within a MAY_BLOCK ScopedBlockingCall.
may_block_threshold_for_testing()178   TimeDelta may_block_threshold_for_testing() const {
179     return after_start().may_block_threshold;
180   }
181 
182   // Interval at which the service thread checks for workers in this thread
183   // group that have been in a MAY_BLOCK ScopedBlockingCall for more than
184   // may_block_threshold().
blocked_workers_poll_period_for_testing()185   TimeDelta blocked_workers_poll_period_for_testing() const {
186     return after_start().blocked_workers_poll_period;
187   }
188 
189   // Starts calling AdjustMaxTasks() periodically on
190   // |service_thread_task_runner_|.
191   void ScheduleAdjustMaxTasks();
192 
193   // Schedules AdjustMaxTasks() through |executor| if required.
194   void MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor* executor)
195       EXCLUSIVE_LOCKS_REQUIRED(lock_);
196 
197   // Returns true if AdjustMaxTasks() should periodically be called on
198   // |service_thread_task_runner_|.
199   bool ShouldPeriodicallyAdjustMaxTasksLockRequired()
200       EXCLUSIVE_LOCKS_REQUIRED(lock_);
201 
202   // Updates the minimum priority allowed to run below which tasks should yield.
203   // This should be called whenever |num_running_tasks_| or |max_tasks| changes,
204   // or when a new task is added to |priority_queue_|.
205   void UpdateMinAllowedPriorityLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
206 
207   bool IsOnIdleSetLockRequired(WorkerThreadWaitableEvent* worker) const
208       EXCLUSIVE_LOCKS_REQUIRED(lock_);
209 
210   // Increments/decrements the number of tasks of |priority| that are currently
211   // running in this thread group. Must be invoked before/after running a task.
212   void DecrementTasksRunningLockRequired(TaskPriority priority)
213       EXCLUSIVE_LOCKS_REQUIRED(lock_);
214   void IncrementTasksRunningLockRequired(TaskPriority priority)
215       EXCLUSIVE_LOCKS_REQUIRED(lock_);
216 
217   // Increments/decrements the number of [best effort] tasks that can run in
218   // this thread group.
219   void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
220   void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
221   void DecrementMaxBestEffortTasksLockRequired()
222       EXCLUSIVE_LOCKS_REQUIRED(lock_);
223   void IncrementMaxBestEffortTasksLockRequired()
224       EXCLUSIVE_LOCKS_REQUIRED(lock_);
225 
226   // Values set at Start() and never modified afterwards.
227   struct InitializedInStart {
228     InitializedInStart();
229     ~InitializedInStart();
230 
231 #if DCHECK_IS_ON()
232     // Set after all members of this struct are set.
233     bool initialized = false;
234 #endif
235 
236     // Initial value of |max_tasks_|.
237     size_t initial_max_tasks = 0;
238 
239     // Suggested reclaim time for workers.
240     TimeDelta suggested_reclaim_time;
241     bool no_worker_reclaim = false;
242 
243     // Environment to be initialized per worker.
244     WorkerEnvironment worker_environment = WorkerEnvironment::NONE;
245 
246     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner;
247 
248     // Optional observer notified when a worker enters and exits its main.
249     raw_ptr<WorkerThreadObserver> worker_thread_observer = nullptr;
250 
251     // Threshold after which the max tasks is increased to compensate for a
252     // worker that is within a MAY_BLOCK ScopedBlockingCall.
253     TimeDelta may_block_threshold;
254 
255     // The period between calls to AdjustMaxTasks() when the thread group is at
256     // capacity.
257     TimeDelta blocked_workers_poll_period;
258 
259     // Whether EnsureEnoughWorkersLockRequired() should be called at the end of
260     // GetWork() instead of at the beginning.
261     bool ensure_enough_workers_at_end_of_get_work = false;
262 
263   } initialized_in_start_;
264 
in_start()265   InitializedInStart& in_start() {
266 #if DCHECK_IS_ON()
267     DCHECK(!initialized_in_start_.initialized);
268 #endif
269     return initialized_in_start_;
270   }
after_start()271   const InitializedInStart& after_start() const {
272 #if DCHECK_IS_ON()
273     DCHECK(initialized_in_start_.initialized);
274 #endif
275     return initialized_in_start_;
276   }
277 
278   const std::string histogram_label_;
279   const std::string thread_group_label_;
280   const ThreadType thread_type_hint_;
281 
282   // All workers owned by this thread group.
283   std::vector<scoped_refptr<WorkerThreadWaitableEvent>> workers_
284       GUARDED_BY(lock_);
285   size_t worker_sequence_num_ GUARDED_BY(lock_) = 0;
286 
287   bool shutdown_started_ GUARDED_BY(lock_) = false;
288 
289   // Maximum number of tasks of any priority / BEST_EFFORT priority that can run
290   // concurrently in this thread group.
291   size_t max_tasks_ GUARDED_BY(lock_) = 0;
292   size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0;
293 
294   // Number of tasks of any priority / BEST_EFFORT priority that are currently
295   // running in this thread group.
296   size_t num_running_tasks_ GUARDED_BY(lock_) = 0;
297   size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0;
298 
299   // Number of workers running a task of any priority / BEST_EFFORT priority
300   // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't
301   // caused a max tasks increase yet.
302   int num_unresolved_may_block_ GUARDED_BY(lock_) = 0;
303   int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0;
304 
305   // Ordered set of idle workers; the order uses pointer comparison, this is
306   // arbitrary but stable. Initially, all workers are on this set. A worker is
307   // removed from the set before its WakeUp() function is called and when it
308   // receives work from GetWork() (a worker calls GetWork() when its sleep
309   // timeout expires, even if its WakeUp() method hasn't been called). A worker
310   // is inserted on this set when it receives nullptr from GetWork().
311   WorkerThreadSet idle_workers_set_ GUARDED_BY(lock_);
312 
313   // Signaled when a worker is added to the idle workers set.
314   std::unique_ptr<ConditionVariable> idle_workers_set_cv_for_testing_
315       GUARDED_BY(lock_);
316 
317   // Whether an AdjustMaxTasks() task was posted to the service thread.
318   bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false;
319 
320   // Indicates to the delegates that workers are not permitted to cleanup.
321   bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false;
322 
323   // Counts the number of workers cleaned up (went through
324   // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
325   // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
326   // |some_workers_cleaned_up_for_testing_| is true if this was ever
327   // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
328   // specific number of workers being cleaned up via
329   // WaitForWorkersCleanedUpForTesting().
330   size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0;
331 #if DCHECK_IS_ON()
332   bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false;
333 #endif
334 
335   // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
336   // incremented.
337   std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
338       GUARDED_BY(lock_);
339 
340   // Set at the start of JoinForTesting().
341   bool join_for_testing_started_ GUARDED_BY(lock_) = false;
342 
343   // Null-opt unless |synchronous_thread_start_for_testing| was true at
344   // construction. In that case, it's signaled each time
345   // WorkerThreadDelegateImpl::OnMainEntry() completes.
346   absl::optional<WaitableEvent> worker_started_for_testing_;
347 
348   // Ensures recently cleaned up workers (ref.
349   // WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as
350   // they have a raw reference to |this| (and to TaskTracker) which can
351   // otherwise result in racy use-after-frees per no longer being part of
352   // |workers_| and hence not being explicitly joined in JoinForTesting():
353   // https://crbug.com/810464. Uses AtomicRefCount to make its only public
354   // method thread-safe.
355   TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_;
356 };
357 
358 }  // namespace internal
359 }  // namespace base
360 
361 #endif  // BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
362