• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
7 
8 #include <cstddef>
9 #include <string>
10 
11 #include "base/base_export.h"
12 #include "base/basictypes.h"
13 #include "base/callback_forward.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/memory/scoped_ptr.h"
16 #include "base/task_runner.h"
17 
18 namespace tracked_objects {
19 class Location;
20 }  // namespace tracked_objects
21 
22 namespace base {
23 
24 class MessageLoopProxy;
25 
26 template <class T> class DeleteHelper;
27 
28 class SequencedTaskRunner;
29 
30 // A worker thread pool that enforces ordering between sets of tasks. It also
31 // allows you to specify what should happen to your tasks on shutdown.
32 //
33 // To enforce ordering, get a unique sequence token from the pool and post all
34 // tasks you want to order with the token. All tasks with the same token are
35 // guaranteed to execute serially, though not necessarily on the same thread.
36 // This means that:
37 //
38 //   - No two tasks with the same token will run at the same time.
39 //
40 //   - Given two tasks T1 and T2 with the same token such that T2 will
41 //     run after T1, then T2 will start after T1 is destroyed.
42 //
43 //   - If T2 will run after T1, then all memory changes in T1 and T1's
44 //     destruction will be visible to T2.
45 //
46 // Example:
47 //   SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken();
48 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
49 //                                FROM_HERE, base::Bind(...));
50 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
51 //                                FROM_HERE, base::Bind(...));
52 //
53 // You can make named sequence tokens to make it easier to share a token
54 // across different components.
55 //
56 // You can also post tasks to the pool without ordering using PostWorkerTask.
57 // These will be executed in an unspecified order. The order of execution
58 // between tasks with different sequence tokens is also unspecified.
59 //
60 // This class may be leaked on shutdown to facilitate fast shutdown. The
61 // expected usage, however, is to call Shutdown(), which correctly accounts
62 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
63 // behavior.
64 //
65 // Implementation note: This does not use a base::WorkerPool since that does
66 // not enforce shutdown semantics or allow us to specify how many worker
67 // threads to run. For the typical use case of random background work, we don't
68 // necessarily want to be super aggressive about creating threads.
69 //
70 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
71 // from TaskRunner).
72 //
73 // Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid
74 // memory leaks. See http://crbug.com/273800
75 class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
76  public:
77   // Defines what should happen to a task posted to the worker pool on
78   // shutdown.
79   enum WorkerShutdown {
80     // Tasks posted with this mode which have not run at shutdown will be
81     // deleted rather than run, and any tasks with this mode running at
82     // shutdown will be ignored (the worker thread will not be joined).
83     //
84     // This option provides a nice way to post stuff you don't want blocking
85     // shutdown. For example, you might be doing a slow DNS lookup and if it's
86     // blocked on the OS, you may not want to stop shutdown, since the result
87     // doesn't really matter at that point.
88     //
89     // However, you need to be very careful what you do in your callback when
90     // you use this option. Since the thread will continue to run until the OS
91     // terminates the process, the app can be in the process of tearing down
92     // when you're running. This means any singletons or global objects you
93     // use may suddenly become invalid out from under you. For this reason,
94     // it's best to use this only for slow but simple operations like the DNS
95     // example.
96     CONTINUE_ON_SHUTDOWN,
97 
98     // Tasks posted with this mode that have not started executing at
99     // shutdown will be deleted rather than executed. However, any tasks that
100     // have already begun executing when shutdown is called will be allowed
101     // to continue, and will block shutdown until completion.
102     //
103     // Note: Because Shutdown() may block while these tasks are executing,
104     // care must be taken to ensure that they do not block on the thread that
105     // called Shutdown(), as this may lead to deadlock.
106     SKIP_ON_SHUTDOWN,
107 
108     // Tasks posted with this mode will block shutdown until they're
109     // executed. Since this can have significant performance implications,
110     // use sparingly.
111     //
112     // Generally, this should be used only for user data, for example, a task
113     // writing a preference file.
114     //
115     // If a task is posted during shutdown, it will not get run since the
116     // workers may already be stopped. In this case, the post operation will
117     // fail (return false) and the task will be deleted.
118     BLOCK_SHUTDOWN,
119   };
120 
121   // Opaque identifier that defines sequencing of tasks posted to the worker
122   // pool.
123   class SequenceToken {
124    public:
SequenceToken()125     SequenceToken() : id_(0) {}
~SequenceToken()126     ~SequenceToken() {}
127 
Equals(const SequenceToken & other)128     bool Equals(const SequenceToken& other) const {
129       return id_ == other.id_;
130     }
131 
132     // Returns false if current thread is executing an unsequenced task.
IsValid()133     bool IsValid() const {
134       return id_ != 0;
135     }
136 
137    private:
138     friend class SequencedWorkerPool;
139 
SequenceToken(int id)140     explicit SequenceToken(int id) : id_(id) {}
141 
142     int id_;
143   };
144 
145   // Allows tests to perform certain actions.
146   class TestingObserver {
147    public:
~TestingObserver()148     virtual ~TestingObserver() {}
149     virtual void OnHasWork() = 0;
150     virtual void WillWaitForShutdown() = 0;
151     virtual void OnDestruct() = 0;
152   };
153 
154   // Gets the SequencedToken of the current thread.
155   // If current thread is not a SequencedWorkerPool worker thread or is running
156   // an unsequenced task, returns an invalid SequenceToken.
157   static SequenceToken GetSequenceTokenForCurrentThread();
158 
159   // When constructing a SequencedWorkerPool, there must be a
160   // MessageLoop on the current thread unless you plan to deliberately
161   // leak it.
162 
163   // Pass the maximum number of threads (they will be lazily created as needed)
164   // and a prefix for the thread name to aid in debugging.
165   SequencedWorkerPool(size_t max_threads,
166                       const std::string& thread_name_prefix);
167 
168   // Like above, but with |observer| for testing.  Does not take
169   // ownership of |observer|.
170   SequencedWorkerPool(size_t max_threads,
171                       const std::string& thread_name_prefix,
172                       TestingObserver* observer);
173 
174   // Returns a unique token that can be used to sequence tasks posted to
175   // PostSequencedWorkerTask(). Valid tokens are always nonzero.
176   SequenceToken GetSequenceToken();
177 
178   // Returns the sequence token associated with the given name. Calling this
179   // function multiple times with the same string will always produce the
180   // same sequence token. If the name has not been used before, a new token
181   // will be created.
182   SequenceToken GetNamedSequenceToken(const std::string& name);
183 
184   // Returns a SequencedTaskRunner wrapper which posts to this
185   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
186   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
187   // are posted with BLOCK_SHUTDOWN behavior.
188   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
189       SequenceToken token);
190 
191   // Returns a SequencedTaskRunner wrapper which posts to this
192   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
193   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
194   // are posted with the given shutdown behavior.
195   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
196       SequenceToken token,
197       WorkerShutdown shutdown_behavior);
198 
199   // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
200   // the given shutdown behavior. Tasks with nonzero delay are posted with
201   // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
202   // given shutdown behavior.
203   scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
204       WorkerShutdown shutdown_behavior);
205 
206   // Posts the given task for execution in the worker pool. Tasks posted with
207   // this function will execute in an unspecified order on a background thread.
208   // Returns true if the task was posted. If your tasks have ordering
209   // requirements, see PostSequencedWorkerTask().
210   //
211   // This class will attempt to delete tasks that aren't run
212   // (non-block-shutdown semantics) but can't guarantee that this happens. If
213   // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
214   // will be no workers available to delete these tasks. And there may be
215   // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
216   // tasks. Deleting those tasks before the previous one has completed could
217   // cause nondeterministic crashes because the task could be keeping some
218   // objects alive which do work in their destructor, which could voilate the
219   // assumptions of the running task.
220   //
221   // The task will be guaranteed to run to completion before shutdown
222   // (BLOCK_SHUTDOWN semantics).
223   //
224   // Returns true if the task was posted successfully. This may fail during
225   // shutdown regardless of the specified ShutdownBehavior.
226   bool PostWorkerTask(const tracked_objects::Location& from_here,
227                       const Closure& task);
228 
229   // Same as PostWorkerTask but allows a delay to be specified (although doing
230   // so changes the shutdown behavior). The task will be run after the given
231   // delay has elapsed.
232   //
233   // If the delay is nonzero, the task won't be guaranteed to run to completion
234   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
235   // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
236   // task will be guaranteed to run to completion before shutdown
237   // (BLOCK_SHUTDOWN semantics).
238   bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
239                              const Closure& task,
240                              TimeDelta delay);
241 
242   // Same as PostWorkerTask but allows specification of the shutdown behavior.
243   bool PostWorkerTaskWithShutdownBehavior(
244       const tracked_objects::Location& from_here,
245       const Closure& task,
246       WorkerShutdown shutdown_behavior);
247 
248   // Like PostWorkerTask above, but provides sequencing semantics. This means
249   // that tasks posted with the same sequence token (see GetSequenceToken())
250   // are guaranteed to execute in order. This is useful in cases where you're
251   // doing operations that may depend on previous ones, like appending to a
252   // file.
253   //
254   // The task will be guaranteed to run to completion before shutdown
255   // (BLOCK_SHUTDOWN semantics).
256   //
257   // Returns true if the task was posted successfully. This may fail during
258   // shutdown regardless of the specified ShutdownBehavior.
259   bool PostSequencedWorkerTask(SequenceToken sequence_token,
260                                const tracked_objects::Location& from_here,
261                                const Closure& task);
262 
263   // Like PostSequencedWorkerTask above, but allows you to specify a named
264   // token, which saves an extra call to GetNamedSequenceToken.
265   bool PostNamedSequencedWorkerTask(const std::string& token_name,
266                                     const tracked_objects::Location& from_here,
267                                     const Closure& task);
268 
269   // Same as PostSequencedWorkerTask but allows a delay to be specified
270   // (although doing so changes the shutdown behavior). The task will be run
271   // after the given delay has elapsed.
272   //
273   // If the delay is nonzero, the task won't be guaranteed to run to completion
274   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
275   // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
276   // i.e. the task will be guaranteed to run to completion before shutdown
277   // (BLOCK_SHUTDOWN semantics).
278   bool PostDelayedSequencedWorkerTask(
279       SequenceToken sequence_token,
280       const tracked_objects::Location& from_here,
281       const Closure& task,
282       TimeDelta delay);
283 
284   // Same as PostSequencedWorkerTask but allows specification of the shutdown
285   // behavior.
286   bool PostSequencedWorkerTaskWithShutdownBehavior(
287       SequenceToken sequence_token,
288       const tracked_objects::Location& from_here,
289       const Closure& task,
290       WorkerShutdown shutdown_behavior);
291 
292   // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
293   virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
294                                const Closure& task,
295                                TimeDelta delay) OVERRIDE;
296   virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
297 
298   // Returns true if the current thread is processing a task with the given
299   // sequence_token.
300   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
301 
302   // Blocks until all pending tasks are complete. This should only be called in
303   // unit tests when you want to validate something that should have happened.
304   // This will not flush delayed tasks; delayed tasks get deleted.
305   //
306   // Note that calling this will not prevent other threads from posting work to
307   // the queue while the calling thread is waiting on Flush(). In this case,
308   // Flush will return only when there's no more work in the queue. Normally,
309   // this doesn't come up since in a test, all the work is being posted from
310   // the main thread.
311   void FlushForTesting();
312 
313   // Spuriously signal that there is work to be done.
314   void SignalHasWorkForTesting();
315 
316   // Implements the worker pool shutdown. This should be called during app
317   // shutdown, and will discard/join with appropriate tasks before returning.
318   // After this call, subsequent calls to post tasks will fail.
319   //
320   // Must be called from the same thread this object was constructed on.
Shutdown()321   void Shutdown() { Shutdown(0); }
322 
323   // A variant that allows an arbitrary number of new blocking tasks to
324   // be posted during shutdown from within tasks that execute during shutdown.
325   // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if
326   // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once
327   // the limit is reached, subsequent calls to post task fail in all cases.
328   //
329   // Must be called from the same thread this object was constructed on.
330   void Shutdown(int max_new_blocking_tasks_after_shutdown);
331 
332   // Check if Shutdown was called for given threading pool. This method is used
333   // for aborting time consuming operation to avoid blocking shutdown.
334   //
335   // Can be called from any thread.
336   bool IsShutdownInProgress();
337 
338  protected:
339   virtual ~SequencedWorkerPool();
340 
341   virtual void OnDestruct() const OVERRIDE;
342 
343  private:
344   friend class RefCountedThreadSafe<SequencedWorkerPool>;
345   friend class DeleteHelper<SequencedWorkerPool>;
346 
347   class Inner;
348   class Worker;
349 
350   const scoped_refptr<MessageLoopProxy> constructor_message_loop_;
351 
352   // Avoid pulling in too many headers by putting (almost) everything
353   // into |inner_|.
354   const scoped_ptr<Inner> inner_;
355 
356   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
357 };
358 
359 }  // namespace base
360 
361 #endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
362