• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
6 #define BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
7 
8 #include <stddef.h>
9 
10 #include <memory>
11 #include <string>
12 #include <vector>
13 
14 #include "base/base_export.h"
15 #include "base/check.h"
16 #include "base/compiler_specific.h"
17 #include "base/dcheck_is_on.h"
18 #include "base/gtest_prod_util.h"
19 #include "base/memory/raw_ptr.h"
20 #include "base/strings/string_piece.h"
21 #include "base/synchronization/condition_variable.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/task/sequenced_task_runner.h"
24 #include "base/task/task_features.h"
25 #include "base/task/thread_pool/task.h"
26 #include "base/task/thread_pool/task_source.h"
27 #include "base/task/thread_pool/thread_group.h"
28 #include "base/task/thread_pool/tracked_ref.h"
29 #include "base/task/thread_pool/worker_thread.h"
30 #include "base/task/thread_pool/worker_thread_set.h"
31 #include "base/time/time.h"
32 #include "third_party/abseil-cpp/absl/types/optional.h"
33 
34 namespace base {
35 
36 class WorkerThreadObserver;
37 
38 namespace internal {
39 
40 class TaskTracker;
41 
42 // A group of workers that run Tasks.
43 //
44 // The thread group doesn't create threads until Start() is called. Tasks can be
45 // posted at any time but will not run until after Start() is called.
46 //
47 // This class is thread-safe.
48 class BASE_EXPORT ThreadGroupImpl : public ThreadGroup {
49  public:
50   // Constructs a group without workers.
51   //
52   // |histogram_label| is used to label the thread group's histograms as
53   // "ThreadPool." + histogram_name + "." + |histogram_label| + extra suffixes.
54   // It must not be empty. |thread group_label| is used to label the thread
55   // group's threads, it must not be empty. |thread_type_hint| is the preferred
56   // thread type; the actual thread type depends on shutdown state and platform
57   // capabilities. |task_tracker| keeps track of tasks.
58   ThreadGroupImpl(StringPiece histogram_label,
59                   StringPiece thread_group_label,
60                   ThreadType thread_type_hint,
61                   TrackedRef<TaskTracker> task_tracker,
62                   TrackedRef<Delegate> delegate,
63                   ThreadGroup* predecessor_thread_group = nullptr);
64 
65   // Creates threads, allowing existing and future tasks to run. The thread
66   // group runs at most |max_tasks| / `max_best_effort_tasks` unblocked task
67   // with any / BEST_EFFORT priority concurrently. It reclaims unused threads
68   // after `suggested_reclaim_time`. It uses `service_thread_task_runner` to
69   // monitor for blocked tasks, `service_thread_task_runner` is used to setup
70   // FileDescriptorWatcher on worker threads. It must refer to a Thread with
71   // MessagePumpType::IO. If specified, it notifies |worker_thread_observer|
72   // when a worker enters and exits its main function (the observer must not be
73   // destroyed before JoinForTesting() has returned). |worker_environment|
74   // specifies the environment in which tasks are executed.
75   // |may_block_threshold| is the timeout after which a task in a MAY_BLOCK
76   // ScopedBlockingCall is considered blocked (the thread group will choose an
77   // appropriate value if none is specified).
78   // `synchronous_thread_start_for_testing` is true if this ThreadGroupImpl
79   // should synchronously wait for OnMainEntry() after starting each worker. Can
80   // only be called once. CHECKs on failure.
81   void Start(size_t max_tasks,
82              size_t max_best_effort_tasks,
83              TimeDelta suggested_reclaim_time,
84              scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
85              WorkerThreadObserver* worker_thread_observer,
86              WorkerEnvironment worker_environment,
87              bool synchronous_thread_start_for_testing = false,
88              absl::optional<TimeDelta> may_block_threshold =
89                  absl::optional<TimeDelta>());
90 
91   ThreadGroupImpl(const ThreadGroupImpl&) = delete;
92   ThreadGroupImpl& operator=(const ThreadGroupImpl&) = delete;
93   // Destroying a ThreadGroupImpl returned by Create() is not allowed in
94   // production; it is always leaked. In tests, it can only be destroyed after
95   // JoinForTesting() has returned.
96   ~ThreadGroupImpl() override;
97 
98   // ThreadGroup:
99   void JoinForTesting() override;
100   size_t GetMaxConcurrentNonBlockedTasksDeprecated() const override;
101   void DidUpdateCanRunPolicy() override;
102   void OnShutdownStarted() override;
103 
104   // Waits until at least |n| workers are idle. Note that while workers are
105   // disallowed from cleaning up during this call: tests using a custom
106   // |suggested_reclaim_time_| need to be careful to invoke this swiftly after
107   // unblocking the waited upon workers as: if a worker is already detached by
108   // the time this is invoked, it will never make it onto the idle set and
109   // this call will hang.
110   void WaitForWorkersIdleForTesting(size_t n);
111 
112   // Waits until at least |n| workers are idle.
113   void WaitForWorkersIdleLockRequiredForTesting(size_t n)
114       EXCLUSIVE_LOCKS_REQUIRED(lock_);
115 
116   // Waits until all workers are idle.
117   void WaitForAllWorkersIdleForTesting();
118 
119   // Waits until |n| workers have cleaned up (went through
120   // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
121   // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
122   void WaitForWorkersCleanedUpForTesting(size_t n);
123 
124   // Returns the number of workers in this thread group.
125   size_t NumberOfWorkersForTesting() const;
126 
127   // Returns |max_tasks_|/|max_best_effort_tasks_|.
128   size_t GetMaxTasksForTesting() const;
129   size_t GetMaxBestEffortTasksForTesting() const;
130 
131   // Returns the number of workers that are idle (i.e. not running tasks).
132   size_t NumberOfIdleWorkersForTesting() const;
133 
134  private:
135   class ScopedCommandsExecutor;
136   class WorkerThreadDelegateImpl;
137 
138   // Friend tests so that they can access |blocked_workers_poll_period| and
139   // may_block_threshold().
140   friend class ThreadGroupImplBlockingTest;
141   friend class ThreadGroupImplMayBlockTest;
142   FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
143                            ThreadBlockUnblockPremature);
144   FRIEND_TEST_ALL_PREFIXES(ThreadGroupImplBlockingTest,
145                            ThreadBlockUnblockPrematureBestEffort);
146 
147   // ThreadGroup:
148   void UpdateSortKey(TaskSource::Transaction transaction) override;
149   void PushTaskSourceAndWakeUpWorkers(
150       TransactionWithRegisteredTaskSource transaction_with_task_source)
151       override;
152   void EnsureEnoughWorkersLockRequired(BaseScopedCommandsExecutor* executor)
153       override EXCLUSIVE_LOCKS_REQUIRED(lock_);
154 
155   // Creates a worker and schedules its start, if needed, to maintain one idle
156   // worker, |max_tasks_| permitting.
157   void MaintainAtLeastOneIdleWorkerLockRequired(
158       ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
159 
160   // Returns true if worker cleanup is permitted.
161   bool CanWorkerCleanupForTestingLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
162 
163   // Creates a worker, adds it to the thread group, schedules its start and
164   // returns it. Cannot be called before Start().
165   scoped_refptr<WorkerThread> CreateAndRegisterWorkerLockRequired(
166       ScopedCommandsExecutor* executor) EXCLUSIVE_LOCKS_REQUIRED(lock_);
167 
168   // Returns the number of workers that are awake (i.e. not on the idle set).
169   size_t GetNumAwakeWorkersLockRequired() const EXCLUSIVE_LOCKS_REQUIRED(lock_);
170 
171   // Returns the desired number of awake workers, given current workload and
172   // concurrency limits.
173   size_t GetDesiredNumAwakeWorkersLockRequired() const
174       EXCLUSIVE_LOCKS_REQUIRED(lock_);
175 
176   // Examines the list of WorkerThreads and increments |max_tasks_| for each
177   // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for
178   // more than BlockedThreshold(). Reschedules a call if necessary.
179   void AdjustMaxTasks();
180 
181   // Returns the threshold after which the max tasks is increased to compensate
182   // for a worker that is within a MAY_BLOCK ScopedBlockingCall.
may_block_threshold_for_testing()183   TimeDelta may_block_threshold_for_testing() const {
184     return after_start().may_block_threshold;
185   }
186 
187   // Interval at which the service thread checks for workers in this thread
188   // group that have been in a MAY_BLOCK ScopedBlockingCall for more than
189   // may_block_threshold().
blocked_workers_poll_period_for_testing()190   TimeDelta blocked_workers_poll_period_for_testing() const {
191     return after_start().blocked_workers_poll_period;
192   }
193 
194   // Starts calling AdjustMaxTasks() periodically on
195   // |service_thread_task_runner_|.
196   void ScheduleAdjustMaxTasks();
197 
198   // Schedules AdjustMaxTasks() through |executor| if required.
199   void MaybeScheduleAdjustMaxTasksLockRequired(ScopedCommandsExecutor* executor)
200       EXCLUSIVE_LOCKS_REQUIRED(lock_);
201 
202   // Returns true if AdjustMaxTasks() should periodically be called on
203   // |service_thread_task_runner_|.
204   bool ShouldPeriodicallyAdjustMaxTasksLockRequired()
205       EXCLUSIVE_LOCKS_REQUIRED(lock_);
206 
207   // Updates the minimum priority allowed to run below which tasks should yield.
208   // This should be called whenever |num_running_tasks_| or |max_tasks| changes,
209   // or when a new task is added to |priority_queue_|.
210   void UpdateMinAllowedPriorityLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
211 
212   bool IsOnIdleSetLockRequired(WorkerThread* worker) const
213       EXCLUSIVE_LOCKS_REQUIRED(lock_);
214 
215   // Increments/decrements the number of tasks of |priority| that are currently
216   // running in this thread group. Must be invoked before/after running a task.
217   void DecrementTasksRunningLockRequired(TaskPriority priority)
218       EXCLUSIVE_LOCKS_REQUIRED(lock_);
219   void IncrementTasksRunningLockRequired(TaskPriority priority)
220       EXCLUSIVE_LOCKS_REQUIRED(lock_);
221 
222   // Increments/decrements the number of [best effort] tasks that can run in
223   // this thread group.
224   void DecrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
225   void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(lock_);
226   void DecrementMaxBestEffortTasksLockRequired()
227       EXCLUSIVE_LOCKS_REQUIRED(lock_);
228   void IncrementMaxBestEffortTasksLockRequired()
229       EXCLUSIVE_LOCKS_REQUIRED(lock_);
230 
231   // Values set at Start() and never modified afterwards.
232   struct InitializedInStart {
233     InitializedInStart();
234     ~InitializedInStart();
235 
236 #if DCHECK_IS_ON()
237     // Set after all members of this struct are set.
238     bool initialized = false;
239 #endif
240 
241     // Initial value of |max_tasks_|.
242     size_t initial_max_tasks = 0;
243 
244     // Suggested reclaim time for workers.
245     TimeDelta suggested_reclaim_time;
246     bool no_worker_reclaim = false;
247 
248     // Environment to be initialized per worker.
249     WorkerEnvironment worker_environment = WorkerEnvironment::NONE;
250 
251     scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner;
252 
253     // Optional observer notified when a worker enters and exits its main.
254     raw_ptr<WorkerThreadObserver> worker_thread_observer = nullptr;
255 
256     // Threshold after which the max tasks is increased to compensate for a
257     // worker that is within a MAY_BLOCK ScopedBlockingCall.
258     TimeDelta may_block_threshold;
259 
260     // The period between calls to AdjustMaxTasks() when the thread group is at
261     // capacity.
262     TimeDelta blocked_workers_poll_period;
263   } initialized_in_start_;
264 
in_start()265   InitializedInStart& in_start() {
266 #if DCHECK_IS_ON()
267     DCHECK(!initialized_in_start_.initialized);
268 #endif
269     return initialized_in_start_;
270   }
after_start()271   const InitializedInStart& after_start() const {
272 #if DCHECK_IS_ON()
273     DCHECK(initialized_in_start_.initialized);
274 #endif
275     return initialized_in_start_;
276   }
277 
278   const std::string histogram_label_;
279   const std::string thread_group_label_;
280   const ThreadType thread_type_hint_;
281 
282   // All workers owned by this thread group.
283   std::vector<scoped_refptr<WorkerThread>> workers_ GUARDED_BY(lock_);
284   size_t worker_sequence_num_ GUARDED_BY(lock_) = 0;
285 
286   bool shutdown_started_ GUARDED_BY(lock_) = false;
287 
288   // Maximum number of tasks of any priority / BEST_EFFORT priority that can run
289   // concurrently in this thread group.
290   size_t max_tasks_ GUARDED_BY(lock_) = 0;
291   size_t max_best_effort_tasks_ GUARDED_BY(lock_) = 0;
292 
293   // Number of tasks of any priority / BEST_EFFORT priority that are currently
294   // running in this thread group.
295   size_t num_running_tasks_ GUARDED_BY(lock_) = 0;
296   size_t num_running_best_effort_tasks_ GUARDED_BY(lock_) = 0;
297 
298   // Number of workers running a task of any priority / BEST_EFFORT priority
299   // that are within the scope of a MAY_BLOCK ScopedBlockingCall but haven't
300   // caused a max tasks increase yet.
301   int num_unresolved_may_block_ GUARDED_BY(lock_) = 0;
302   int num_unresolved_best_effort_may_block_ GUARDED_BY(lock_) = 0;
303 
304   // Ordered set of idle workers; the order uses pointer comparison, this is
305   // arbitrary but stable. Initially, all workers are on this set. A worker is
306   // removed from the set before its WakeUp() function is called and when it
307   // receives work from GetWork() (a worker calls GetWork() when its sleep
308   // timeout expires, even if its WakeUp() method hasn't been called). A worker
309   // is inserted on this set when it receives nullptr from GetWork().
310   WorkerThreadSet idle_workers_set_ GUARDED_BY(lock_);
311 
312   // Signaled when a worker is added to the idle workers set.
313   std::unique_ptr<ConditionVariable> idle_workers_set_cv_for_testing_
314       GUARDED_BY(lock_);
315 
316   // Whether an AdjustMaxTasks() task was posted to the service thread.
317   bool adjust_max_tasks_posted_ GUARDED_BY(lock_) = false;
318 
319   // Indicates to the delegates that workers are not permitted to cleanup.
320   bool worker_cleanup_disallowed_for_testing_ GUARDED_BY(lock_) = false;
321 
322   // Counts the number of workers cleaned up (went through
323   // WorkerThreadDelegateImpl::OnMainExit()) since the last call to
324   // WaitForWorkersCleanedUpForTesting() (or Start() if that wasn't called yet).
325   // |some_workers_cleaned_up_for_testing_| is true if this was ever
326   // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
327   // specific number of workers being cleaned up via
328   // WaitForWorkersCleanedUpForTesting().
329   size_t num_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = 0;
330 #if DCHECK_IS_ON()
331   bool some_workers_cleaned_up_for_testing_ GUARDED_BY(lock_) = false;
332 #endif
333 
334   // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
335   // incremented.
336   std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_
337       GUARDED_BY(lock_);
338 
339   // Set at the start of JoinForTesting().
340   bool join_for_testing_started_ GUARDED_BY(lock_) = false;
341 
342   // Null-opt unless |synchronous_thread_start_for_testing| was true at
343   // construction. In that case, it's signaled each time
344   // WorkerThreadDelegateImpl::OnMainEntry() completes.
345   absl::optional<WaitableEvent> worker_started_for_testing_;
346 
347   // Ensures recently cleaned up workers (ref.
348   // WorkerThreadDelegateImpl::CleanupLockRequired()) had time to exit as
349   // they have a raw reference to |this| (and to TaskTracker) which can
350   // otherwise result in racy use-after-frees per no longer being part of
351   // |workers_| and hence not being explicitly joined in JoinForTesting():
352   // https://crbug.com/810464. Uses AtomicRefCount to make its only public
353   // method thread-safe.
354   TrackedRefFactory<ThreadGroupImpl> tracked_ref_factory_;
355 };
356 
357 }  // namespace internal
358 }  // namespace base
359 
360 #endif  // BASE_TASK_THREAD_POOL_THREAD_GROUP_IMPL_H_
361