1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 7 8 #include <cstddef> 9 #include <string> 10 11 #include "base/base_export.h" 12 #include "base/basictypes.h" 13 #include "base/callback_forward.h" 14 #include "base/memory/ref_counted.h" 15 #include "base/memory/scoped_ptr.h" 16 #include "base/task_runner.h" 17 18 namespace tracked_objects { 19 class Location; 20 } // namespace tracked_objects 21 22 namespace base { 23 24 class MessageLoopProxy; 25 26 template <class T> class DeleteHelper; 27 28 class SequencedTaskRunner; 29 30 // A worker thread pool that enforces ordering between sets of tasks. It also 31 // allows you to specify what should happen to your tasks on shutdown. 32 // 33 // To enforce ordering, get a unique sequence token from the pool and post all 34 // tasks you want to order with the token. All tasks with the same token are 35 // guaranteed to execute serially, though not necessarily on the same thread. 36 // This means that: 37 // 38 // - No two tasks with the same token will run at the same time. 39 // 40 // - Given two tasks T1 and T2 with the same token such that T2 will 41 // run after T1, then T2 will start after T1 is destroyed. 42 // 43 // - If T2 will run after T1, then all memory changes in T1 and T1's 44 // destruction will be visible to T2. 45 // 46 // Example: 47 // SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken(); 48 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 49 // FROM_HERE, base::Bind(...)); 50 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 51 // FROM_HERE, base::Bind(...)); 52 // 53 // You can make named sequence tokens to make it easier to share a token 54 // across different components. 55 // 56 // You can also post tasks to the pool without ordering using PostWorkerTask. 57 // These will be executed in an unspecified order. The order of execution 58 // between tasks with different sequence tokens is also unspecified. 59 // 60 // This class may be leaked on shutdown to facilitate fast shutdown. The 61 // expected usage, however, is to call Shutdown(), which correctly accounts 62 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN 63 // behavior. 64 // 65 // Implementation note: This does not use a base::WorkerPool since that does 66 // not enforce shutdown semantics or allow us to specify how many worker 67 // threads to run. For the typical use case of random background work, we don't 68 // necessarily want to be super aggressive about creating threads. 69 // 70 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited 71 // from TaskRunner). 72 // 73 // Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid 74 // memory leaks. See http://crbug.com/273800 75 class BASE_EXPORT SequencedWorkerPool : public TaskRunner { 76 public: 77 // Defines what should happen to a task posted to the worker pool on 78 // shutdown. 79 enum WorkerShutdown { 80 // Tasks posted with this mode which have not run at shutdown will be 81 // deleted rather than run, and any tasks with this mode running at 82 // shutdown will be ignored (the worker thread will not be joined). 83 // 84 // This option provides a nice way to post stuff you don't want blocking 85 // shutdown. For example, you might be doing a slow DNS lookup and if it's 86 // blocked on the OS, you may not want to stop shutdown, since the result 87 // doesn't really matter at that point. 88 // 89 // However, you need to be very careful what you do in your callback when 90 // you use this option. Since the thread will continue to run until the OS 91 // terminates the process, the app can be in the process of tearing down 92 // when you're running. This means any singletons or global objects you 93 // use may suddenly become invalid out from under you. For this reason, 94 // it's best to use this only for slow but simple operations like the DNS 95 // example. 96 CONTINUE_ON_SHUTDOWN, 97 98 // Tasks posted with this mode that have not started executing at 99 // shutdown will be deleted rather than executed. However, any tasks that 100 // have already begun executing when shutdown is called will be allowed 101 // to continue, and will block shutdown until completion. 102 // 103 // Note: Because Shutdown() may block while these tasks are executing, 104 // care must be taken to ensure that they do not block on the thread that 105 // called Shutdown(), as this may lead to deadlock. 106 SKIP_ON_SHUTDOWN, 107 108 // Tasks posted with this mode will block shutdown until they're 109 // executed. Since this can have significant performance implications, 110 // use sparingly. 111 // 112 // Generally, this should be used only for user data, for example, a task 113 // writing a preference file. 114 // 115 // If a task is posted during shutdown, it will not get run since the 116 // workers may already be stopped. In this case, the post operation will 117 // fail (return false) and the task will be deleted. 118 BLOCK_SHUTDOWN, 119 }; 120 121 // Opaque identifier that defines sequencing of tasks posted to the worker 122 // pool. 123 class SequenceToken { 124 public: SequenceToken()125 SequenceToken() : id_(0) {} ~SequenceToken()126 ~SequenceToken() {} 127 Equals(const SequenceToken & other)128 bool Equals(const SequenceToken& other) const { 129 return id_ == other.id_; 130 } 131 132 // Returns false if current thread is executing an unsequenced task. IsValid()133 bool IsValid() const { 134 return id_ != 0; 135 } 136 137 private: 138 friend class SequencedWorkerPool; 139 SequenceToken(int id)140 explicit SequenceToken(int id) : id_(id) {} 141 142 int id_; 143 }; 144 145 // Allows tests to perform certain actions. 146 class TestingObserver { 147 public: ~TestingObserver()148 virtual ~TestingObserver() {} 149 virtual void OnHasWork() = 0; 150 virtual void WillWaitForShutdown() = 0; 151 virtual void OnDestruct() = 0; 152 }; 153 154 // Gets the SequencedToken of the current thread. 155 // If current thread is not a SequencedWorkerPool worker thread or is running 156 // an unsequenced task, returns an invalid SequenceToken. 157 static SequenceToken GetSequenceTokenForCurrentThread(); 158 159 // When constructing a SequencedWorkerPool, there must be a 160 // MessageLoop on the current thread unless you plan to deliberately 161 // leak it. 162 163 // Pass the maximum number of threads (they will be lazily created as needed) 164 // and a prefix for the thread name to aid in debugging. 165 SequencedWorkerPool(size_t max_threads, 166 const std::string& thread_name_prefix); 167 168 // Like above, but with |observer| for testing. Does not take 169 // ownership of |observer|. 170 SequencedWorkerPool(size_t max_threads, 171 const std::string& thread_name_prefix, 172 TestingObserver* observer); 173 174 // Returns a unique token that can be used to sequence tasks posted to 175 // PostSequencedWorkerTask(). Valid tokens are always nonzero. 176 SequenceToken GetSequenceToken(); 177 178 // Returns the sequence token associated with the given name. Calling this 179 // function multiple times with the same string will always produce the 180 // same sequence token. If the name has not been used before, a new token 181 // will be created. 182 SequenceToken GetNamedSequenceToken(const std::string& name); 183 184 // Returns a SequencedTaskRunner wrapper which posts to this 185 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 186 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 187 // are posted with BLOCK_SHUTDOWN behavior. 188 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner( 189 SequenceToken token); 190 191 // Returns a SequencedTaskRunner wrapper which posts to this 192 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 193 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 194 // are posted with the given shutdown behavior. 195 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior( 196 SequenceToken token, 197 WorkerShutdown shutdown_behavior); 198 199 // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using 200 // the given shutdown behavior. Tasks with nonzero delay are posted with 201 // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the 202 // given shutdown behavior. 203 scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior( 204 WorkerShutdown shutdown_behavior); 205 206 // Posts the given task for execution in the worker pool. Tasks posted with 207 // this function will execute in an unspecified order on a background thread. 208 // Returns true if the task was posted. If your tasks have ordering 209 // requirements, see PostSequencedWorkerTask(). 210 // 211 // This class will attempt to delete tasks that aren't run 212 // (non-block-shutdown semantics) but can't guarantee that this happens. If 213 // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there 214 // will be no workers available to delete these tasks. And there may be 215 // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN 216 // tasks. Deleting those tasks before the previous one has completed could 217 // cause nondeterministic crashes because the task could be keeping some 218 // objects alive which do work in their destructor, which could voilate the 219 // assumptions of the running task. 220 // 221 // The task will be guaranteed to run to completion before shutdown 222 // (BLOCK_SHUTDOWN semantics). 223 // 224 // Returns true if the task was posted successfully. This may fail during 225 // shutdown regardless of the specified ShutdownBehavior. 226 bool PostWorkerTask(const tracked_objects::Location& from_here, 227 const Closure& task); 228 229 // Same as PostWorkerTask but allows a delay to be specified (although doing 230 // so changes the shutdown behavior). The task will be run after the given 231 // delay has elapsed. 232 // 233 // If the delay is nonzero, the task won't be guaranteed to run to completion 234 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 235 // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the 236 // task will be guaranteed to run to completion before shutdown 237 // (BLOCK_SHUTDOWN semantics). 238 bool PostDelayedWorkerTask(const tracked_objects::Location& from_here, 239 const Closure& task, 240 TimeDelta delay); 241 242 // Same as PostWorkerTask but allows specification of the shutdown behavior. 243 bool PostWorkerTaskWithShutdownBehavior( 244 const tracked_objects::Location& from_here, 245 const Closure& task, 246 WorkerShutdown shutdown_behavior); 247 248 // Like PostWorkerTask above, but provides sequencing semantics. This means 249 // that tasks posted with the same sequence token (see GetSequenceToken()) 250 // are guaranteed to execute in order. This is useful in cases where you're 251 // doing operations that may depend on previous ones, like appending to a 252 // file. 253 // 254 // The task will be guaranteed to run to completion before shutdown 255 // (BLOCK_SHUTDOWN semantics). 256 // 257 // Returns true if the task was posted successfully. This may fail during 258 // shutdown regardless of the specified ShutdownBehavior. 259 bool PostSequencedWorkerTask(SequenceToken sequence_token, 260 const tracked_objects::Location& from_here, 261 const Closure& task); 262 263 // Like PostSequencedWorkerTask above, but allows you to specify a named 264 // token, which saves an extra call to GetNamedSequenceToken. 265 bool PostNamedSequencedWorkerTask(const std::string& token_name, 266 const tracked_objects::Location& from_here, 267 const Closure& task); 268 269 // Same as PostSequencedWorkerTask but allows a delay to be specified 270 // (although doing so changes the shutdown behavior). The task will be run 271 // after the given delay has elapsed. 272 // 273 // If the delay is nonzero, the task won't be guaranteed to run to completion 274 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 275 // If the delay is zero, this behaves exactly like PostSequencedWorkerTask, 276 // i.e. the task will be guaranteed to run to completion before shutdown 277 // (BLOCK_SHUTDOWN semantics). 278 bool PostDelayedSequencedWorkerTask( 279 SequenceToken sequence_token, 280 const tracked_objects::Location& from_here, 281 const Closure& task, 282 TimeDelta delay); 283 284 // Same as PostSequencedWorkerTask but allows specification of the shutdown 285 // behavior. 286 bool PostSequencedWorkerTaskWithShutdownBehavior( 287 SequenceToken sequence_token, 288 const tracked_objects::Location& from_here, 289 const Closure& task, 290 WorkerShutdown shutdown_behavior); 291 292 // TaskRunner implementation. Forwards to PostDelayedWorkerTask(). 293 virtual bool PostDelayedTask(const tracked_objects::Location& from_here, 294 const Closure& task, 295 TimeDelta delay) OVERRIDE; 296 virtual bool RunsTasksOnCurrentThread() const OVERRIDE; 297 298 // Returns true if the current thread is processing a task with the given 299 // sequence_token. 300 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 301 302 // Blocks until all pending tasks are complete. This should only be called in 303 // unit tests when you want to validate something that should have happened. 304 // This will not flush delayed tasks; delayed tasks get deleted. 305 // 306 // Note that calling this will not prevent other threads from posting work to 307 // the queue while the calling thread is waiting on Flush(). In this case, 308 // Flush will return only when there's no more work in the queue. Normally, 309 // this doesn't come up since in a test, all the work is being posted from 310 // the main thread. 311 void FlushForTesting(); 312 313 // Spuriously signal that there is work to be done. 314 void SignalHasWorkForTesting(); 315 316 // Implements the worker pool shutdown. This should be called during app 317 // shutdown, and will discard/join with appropriate tasks before returning. 318 // After this call, subsequent calls to post tasks will fail. 319 // 320 // Must be called from the same thread this object was constructed on. Shutdown()321 void Shutdown() { Shutdown(0); } 322 323 // A variant that allows an arbitrary number of new blocking tasks to 324 // be posted during shutdown from within tasks that execute during shutdown. 325 // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if 326 // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once 327 // the limit is reached, subsequent calls to post task fail in all cases. 328 // 329 // Must be called from the same thread this object was constructed on. 330 void Shutdown(int max_new_blocking_tasks_after_shutdown); 331 332 // Check if Shutdown was called for given threading pool. This method is used 333 // for aborting time consuming operation to avoid blocking shutdown. 334 // 335 // Can be called from any thread. 336 bool IsShutdownInProgress(); 337 338 protected: 339 virtual ~SequencedWorkerPool(); 340 341 virtual void OnDestruct() const OVERRIDE; 342 343 private: 344 friend class RefCountedThreadSafe<SequencedWorkerPool>; 345 friend class DeleteHelper<SequencedWorkerPool>; 346 347 class Inner; 348 class Worker; 349 350 const scoped_refptr<MessageLoopProxy> constructor_message_loop_; 351 352 // Avoid pulling in too many headers by putting (almost) everything 353 // into |inner_|. 354 const scoped_ptr<Inner> inner_; 355 356 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); 357 }; 358 359 } // namespace base 360 361 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 362