• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
7 
8 #include <stddef.h>
9 
10 #include <cstddef>
11 #include <memory>
12 #include <string>
13 
14 #include "base/base_export.h"
15 #include "base/callback.h"
16 #include "base/compiler_specific.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/task_runner.h"
20 #include "base/task_scheduler/task_traits.h"
21 
22 namespace tracked_objects {
23 class Location;
24 }  // namespace tracked_objects
25 
26 namespace base {
27 
28 class SequencedTaskRunner;
29 
30 template <class T> class DeleteHelper;
31 
32 // A worker thread pool that enforces ordering between sets of tasks. It also
33 // allows you to specify what should happen to your tasks on shutdown.
34 //
35 // To enforce ordering, get a unique sequence token from the pool and post all
36 // tasks you want to order with the token. All tasks with the same token are
37 // guaranteed to execute serially, though not necessarily on the same thread.
38 // This means that:
39 //
40 //   - No two tasks with the same token will run at the same time.
41 //
42 //   - Given two tasks T1 and T2 with the same token such that T2 will
43 //     run after T1, then T2 will start after T1 is destroyed.
44 //
45 //   - If T2 will run after T1, then all memory changes in T1 and T1's
46 //     destruction will be visible to T2.
47 //
48 // Example:
49 //   SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
50 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
51 //                                FROM_HERE, base::Bind(...));
52 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
53 //                                FROM_HERE, base::Bind(...));
54 //
55 // You can make named sequence tokens to make it easier to share a token
56 // across different components.
57 //
58 // You can also post tasks to the pool without ordering using PostWorkerTask.
59 // These will be executed in an unspecified order. The order of execution
60 // between tasks with different sequence tokens is also unspecified.
61 //
62 // You must call EnableForProcess() or
63 // EnableWithRedirectionToTaskSchedulerForProcess() before starting to post
64 // tasks to a process' SequencedWorkerPools.
65 //
66 // This class may be leaked on shutdown to facilitate fast shutdown. The
67 // expected usage, however, is to call Shutdown(), which correctly accounts
68 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
69 // behavior.
70 //
71 // Implementation note: This does not use a base::WorkerPool since that does
72 // not enforce shutdown semantics or allow us to specify how many worker
73 // threads to run. For the typical use case of random background work, we don't
74 // necessarily want to be super aggressive about creating threads.
75 //
76 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
77 // from TaskRunner).
78 //
79 // Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid
80 // memory leaks. See http://crbug.com/273800
81 class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
82  public:
83   // Defines what should happen to a task posted to the worker pool on
84   // shutdown.
85   enum WorkerShutdown {
86     // Tasks posted with this mode which have not run at shutdown will be
87     // deleted rather than run, and any tasks with this mode running at
88     // shutdown will be ignored (the worker thread will not be joined).
89     //
90     // This option provides a nice way to post stuff you don't want blocking
91     // shutdown. For example, you might be doing a slow DNS lookup and if it's
92     // blocked on the OS, you may not want to stop shutdown, since the result
93     // doesn't really matter at that point.
94     //
95     // However, you need to be very careful what you do in your callback when
96     // you use this option. Since the thread will continue to run until the OS
97     // terminates the process, the app can be in the process of tearing down
98     // when you're running. This means any singletons or global objects you
99     // use may suddenly become invalid out from under you. For this reason,
100     // it's best to use this only for slow but simple operations like the DNS
101     // example.
102     CONTINUE_ON_SHUTDOWN,
103 
104     // Tasks posted with this mode that have not started executing at
105     // shutdown will be deleted rather than executed. However, any tasks that
106     // have already begun executing when shutdown is called will be allowed
107     // to continue, and will block shutdown until completion.
108     //
109     // Note: Because Shutdown() may block while these tasks are executing,
110     // care must be taken to ensure that they do not block on the thread that
111     // called Shutdown(), as this may lead to deadlock.
112     SKIP_ON_SHUTDOWN,
113 
114     // Tasks posted with this mode will block shutdown until they're
115     // executed. Since this can have significant performance implications,
116     // use sparingly.
117     //
118     // Generally, this should be used only for user data, for example, a task
119     // writing a preference file.
120     //
121     // If a task is posted during shutdown, it will not get run since the
122     // workers may already be stopped. In this case, the post operation will
123     // fail (return false) and the task will be deleted.
124     BLOCK_SHUTDOWN,
125   };
126 
127   // Opaque identifier that defines sequencing of tasks posted to the worker
128   // pool.
129   class BASE_EXPORT SequenceToken {
130    public:
SequenceToken()131     SequenceToken() : id_(0) {}
~SequenceToken()132     ~SequenceToken() {}
133 
Equals(const SequenceToken & other)134     bool Equals(const SequenceToken& other) const {
135       return id_ == other.id_;
136     }
137 
138     // Returns false if current thread is executing an unsequenced task.
IsValid()139     bool IsValid() const {
140       return id_ != 0;
141     }
142 
143     // Returns a string representation of this token. This method should only be
144     // used for debugging.
145     std::string ToString() const;
146 
147    private:
148     friend class SequencedWorkerPool;
149 
SequenceToken(int id)150     explicit SequenceToken(int id) : id_(id) {}
151 
152     int id_;
153   };
154 
155   // Allows tests to perform certain actions.
156   class TestingObserver {
157    public:
~TestingObserver()158     virtual ~TestingObserver() {}
159     virtual void OnHasWork() = 0;
160     virtual void WillWaitForShutdown() = 0;
161     virtual void OnDestruct() = 0;
162   };
163 
164   // Gets the SequencedToken of the current thread.
165   // If current thread is not a SequencedWorkerPool worker thread or is running
166   // an unsequenced task, returns an invalid SequenceToken.
167   static SequenceToken GetSequenceTokenForCurrentThread();
168 
169   // Returns the SequencedWorkerPool that owns this thread, or null if the
170   // current thread is not a SequencedWorkerPool worker thread.
171   //
172   // Always returns nullptr when SequencedWorkerPool is redirected to
173   // TaskScheduler.
174   //
175   // DEPRECATED. Use SequencedTaskRunnerHandle::Get() instead. Consequentially
176   // the only remaining use case is in sequenced_task_runner_handle.cc to
177   // implement that and will soon be removed along with SequencedWorkerPool:
178   // http://crbug.com/622400.
179   static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();
180 
181   // Returns a unique token that can be used to sequence tasks posted to
182   // PostSequencedWorkerTask(). Valid tokens are always nonzero.
183   static SequenceToken GetSequenceToken();
184 
185   // Enables posting tasks to this process' SequencedWorkerPools. Cannot be
186   // called if already enabled. This is not thread-safe; proper synchronization
187   // is required to use any SequencedWorkerPool method after calling this.
188   static void EnableForProcess();
189 
190   // Same as EnableForProcess(), but tasks are redirected to the registered
191   // TaskScheduler. All redirections' TaskPriority will be capped to
192   // |max_task_priority|. There must be a registered TaskScheduler when this is
193   // called.
194   // TODO(gab): Remove this if http://crbug.com/622400 fails
195   // (SequencedWorkerPool will be phased out completely otherwise).
196   static void EnableWithRedirectionToTaskSchedulerForProcess(
197       TaskPriority max_task_priority = TaskPriority::HIGHEST);
198 
199   // Disables posting tasks to this process' SequencedWorkerPools. Calling this
200   // while there are active SequencedWorkerPools is not supported. This is not
201   // thread-safe; proper synchronization is required to use any
202   // SequencedWorkerPool method after calling this.
203   static void DisableForProcessForTesting();
204 
205   // Returns true if posting tasks to this process' SequencedWorkerPool is
206   // enabled (with or without redirection to TaskScheduler).
207   static bool IsEnabled();
208 
209   // When constructing a SequencedWorkerPool, there must be a
210   // ThreadTaskRunnerHandle on the current thread unless you plan to
211   // deliberately leak it.
212 
213   // Constructs a SequencedWorkerPool which will lazily create up to
214   // |max_threads| and a prefix for the thread name to aid in debugging.
215   // |max_threads| must be greater than 1. |task_priority| will be used to hint
216   // base::TaskScheduler for an experiment in which all SequencedWorkerPool
217   // tasks will be redirected to it in processes where a base::TaskScheduler was
218   // instantiated.
219   SequencedWorkerPool(size_t max_threads,
220                       const std::string& thread_name_prefix,
221                       base::TaskPriority task_priority);
222 
223   // Like above, but with |observer| for testing.  Does not take ownership of
224   // |observer|.
225   SequencedWorkerPool(size_t max_threads,
226                       const std::string& thread_name_prefix,
227                       base::TaskPriority task_priority,
228                       TestingObserver* observer);
229 
230   // Returns the sequence token associated with the given name. Calling this
231   // function multiple times with the same string will always produce the
232   // same sequence token. If the name has not been used before, a new token
233   // will be created.
234   SequenceToken GetNamedSequenceToken(const std::string& name);
235 
236   // Returns a SequencedTaskRunner wrapper which posts to this
237   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
238   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
239   // are posted with BLOCK_SHUTDOWN behavior.
240   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
241       SequenceToken token) WARN_UNUSED_RESULT;
242 
243   // Returns a SequencedTaskRunner wrapper which posts to this
244   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
245   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
246   // are posted with the given shutdown behavior.
247   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
248       SequenceToken token,
249       WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT;
250 
251   // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
252   // the given shutdown behavior. Tasks with nonzero delay are posted with
253   // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
254   // given shutdown behavior.
255   scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
256       WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT;
257 
258   // Posts the given task for execution in the worker pool. Tasks posted with
259   // this function will execute in an unspecified order on a background thread.
260   // Returns true if the task was posted. If your tasks have ordering
261   // requirements, see PostSequencedWorkerTask().
262   //
263   // This class will attempt to delete tasks that aren't run
264   // (non-block-shutdown semantics) but can't guarantee that this happens. If
265   // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
266   // will be no workers available to delete these tasks. And there may be
267   // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
268   // tasks. Deleting those tasks before the previous one has completed could
269   // cause nondeterministic crashes because the task could be keeping some
270   // objects alive which do work in their destructor, which could voilate the
271   // assumptions of the running task.
272   //
273   // The task will be guaranteed to run to completion before shutdown
274   // (BLOCK_SHUTDOWN semantics).
275   //
276   // Returns true if the task was posted successfully. This may fail during
277   // shutdown regardless of the specified ShutdownBehavior.
278   bool PostWorkerTask(const tracked_objects::Location& from_here,
279                       OnceClosure task);
280 
281   // Same as PostWorkerTask but allows a delay to be specified (although doing
282   // so changes the shutdown behavior). The task will be run after the given
283   // delay has elapsed.
284   //
285   // If the delay is nonzero, the task won't be guaranteed to run to completion
286   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
287   // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
288   // task will be guaranteed to run to completion before shutdown
289   // (BLOCK_SHUTDOWN semantics).
290   bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
291                              OnceClosure task,
292                              TimeDelta delay);
293 
294   // Same as PostWorkerTask but allows specification of the shutdown behavior.
295   bool PostWorkerTaskWithShutdownBehavior(
296       const tracked_objects::Location& from_here,
297       OnceClosure task,
298       WorkerShutdown shutdown_behavior);
299 
300   // Like PostWorkerTask above, but provides sequencing semantics. This means
301   // that tasks posted with the same sequence token (see GetSequenceToken())
302   // are guaranteed to execute in order. This is useful in cases where you're
303   // doing operations that may depend on previous ones, like appending to a
304   // file.
305   //
306   // The task will be guaranteed to run to completion before shutdown
307   // (BLOCK_SHUTDOWN semantics).
308   //
309   // Returns true if the task was posted successfully. This may fail during
310   // shutdown regardless of the specified ShutdownBehavior.
311   bool PostSequencedWorkerTask(SequenceToken sequence_token,
312                                const tracked_objects::Location& from_here,
313                                OnceClosure task);
314 
315   // Like PostSequencedWorkerTask above, but allows you to specify a named
316   // token, which saves an extra call to GetNamedSequenceToken.
317   bool PostNamedSequencedWorkerTask(const std::string& token_name,
318                                     const tracked_objects::Location& from_here,
319                                     OnceClosure task);
320 
321   // Same as PostSequencedWorkerTask but allows a delay to be specified
322   // (although doing so changes the shutdown behavior). The task will be run
323   // after the given delay has elapsed.
324   //
325   // If the delay is nonzero, the task won't be guaranteed to run to completion
326   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
327   // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
328   // i.e. the task will be guaranteed to run to completion before shutdown
329   // (BLOCK_SHUTDOWN semantics).
330   bool PostDelayedSequencedWorkerTask(
331       SequenceToken sequence_token,
332       const tracked_objects::Location& from_here,
333       OnceClosure task,
334       TimeDelta delay);
335 
336   // Same as PostSequencedWorkerTask but allows specification of the shutdown
337   // behavior.
338   bool PostSequencedWorkerTaskWithShutdownBehavior(
339       SequenceToken sequence_token,
340       const tracked_objects::Location& from_here,
341       OnceClosure task,
342       WorkerShutdown shutdown_behavior);
343 
344   // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
345   bool PostDelayedTask(const tracked_objects::Location& from_here,
346                        OnceClosure task,
347                        TimeDelta delay) override;
348   bool RunsTasksOnCurrentThread() const override;
349 
350   // Blocks until all pending tasks are complete. This should only be called in
351   // unit tests when you want to validate something that should have happened.
352   // Does not wait for delayed tasks. If redirection to TaskScheduler is
353   // disabled, delayed tasks are deleted. If redirection to TaskScheduler is
354   // enabled, this will wait for all tasks posted to TaskScheduler (not just
355   // tasks posted to this SequencedWorkerPool).
356   //
357   // Note that calling this will not prevent other threads from posting work to
358   // the queue while the calling thread is waiting on Flush(). In this case,
359   // Flush will return only when there's no more work in the queue. Normally,
360   // this doesn't come up since in a test, all the work is being posted from
361   // the main thread.
362   //
363   // TODO(gab): Remove mentions of TaskScheduler in this comment if
364   // http://crbug.com/622400 fails.
365   void FlushForTesting();
366 
367   // Spuriously signal that there is work to be done.
368   void SignalHasWorkForTesting();
369 
370   // Implements the worker pool shutdown. This should be called during app
371   // shutdown, and will discard/join with appropriate tasks before returning.
372   // After this call, subsequent calls to post tasks will fail.
373   //
374   // Must be called from the same thread this object was constructed on.
Shutdown()375   void Shutdown() { Shutdown(0); }
376 
377   // A variant that allows an arbitrary number of new blocking tasks to be
378   // posted during shutdown. The tasks cannot be posted within the execution
379   // context of tasks whose shutdown behavior is not BLOCKING_SHUTDOWN. Once
380   // the limit is reached, subsequent calls to post task fail in all cases.
381   // Must be called from the same thread this object was constructed on.
382   void Shutdown(int max_new_blocking_tasks_after_shutdown);
383 
384   // Check if Shutdown was called for given threading pool. This method is used
385   // for aborting time consuming operation to avoid blocking shutdown.
386   //
387   // Can be called from any thread.
388   bool IsShutdownInProgress();
389 
390  protected:
391   ~SequencedWorkerPool() override;
392 
393   void OnDestruct() const override;
394 
395  private:
396   friend class RefCountedThreadSafe<SequencedWorkerPool>;
397   friend class DeleteHelper<SequencedWorkerPool>;
398 
399   class Inner;
400   class PoolSequencedTaskRunner;
401   class Worker;
402 
403   // Returns true if the current thread is processing a task with the given
404   // sequence_token.
405   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
406 
407   const scoped_refptr<SequencedTaskRunner> constructor_task_runner_;
408 
409   // Avoid pulling in too many headers by putting (almost) everything
410   // into |inner_|.
411   const std::unique_ptr<Inner> inner_;
412 
413   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
414 };
415 
416 }  // namespace base
417 
418 #endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
419