1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 7 8 #include <stddef.h> 9 10 #include <cstddef> 11 #include <memory> 12 #include <string> 13 14 #include "base/base_export.h" 15 #include "base/callback.h" 16 #include "base/compiler_specific.h" 17 #include "base/macros.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/task_runner.h" 20 #include "base/task_scheduler/task_traits.h" 21 22 namespace tracked_objects { 23 class Location; 24 } // namespace tracked_objects 25 26 namespace base { 27 28 class SequencedTaskRunner; 29 30 template <class T> class DeleteHelper; 31 32 // A worker thread pool that enforces ordering between sets of tasks. It also 33 // allows you to specify what should happen to your tasks on shutdown. 34 // 35 // To enforce ordering, get a unique sequence token from the pool and post all 36 // tasks you want to order with the token. All tasks with the same token are 37 // guaranteed to execute serially, though not necessarily on the same thread. 38 // This means that: 39 // 40 // - No two tasks with the same token will run at the same time. 41 // 42 // - Given two tasks T1 and T2 with the same token such that T2 will 43 // run after T1, then T2 will start after T1 is destroyed. 44 // 45 // - If T2 will run after T1, then all memory changes in T1 and T1's 46 // destruction will be visible to T2. 47 // 48 // Example: 49 // SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken(); 50 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 51 // FROM_HERE, base::Bind(...)); 52 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 53 // FROM_HERE, base::Bind(...)); 54 // 55 // You can make named sequence tokens to make it easier to share a token 56 // across different components. 57 // 58 // You can also post tasks to the pool without ordering using PostWorkerTask. 59 // These will be executed in an unspecified order. The order of execution 60 // between tasks with different sequence tokens is also unspecified. 61 // 62 // You must call EnableForProcess() or 63 // EnableWithRedirectionToTaskSchedulerForProcess() before starting to post 64 // tasks to a process' SequencedWorkerPools. 65 // 66 // This class may be leaked on shutdown to facilitate fast shutdown. The 67 // expected usage, however, is to call Shutdown(), which correctly accounts 68 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN 69 // behavior. 70 // 71 // Implementation note: This does not use a base::WorkerPool since that does 72 // not enforce shutdown semantics or allow us to specify how many worker 73 // threads to run. For the typical use case of random background work, we don't 74 // necessarily want to be super aggressive about creating threads. 75 // 76 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited 77 // from TaskRunner). 78 // 79 // Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid 80 // memory leaks. See http://crbug.com/273800 81 class BASE_EXPORT SequencedWorkerPool : public TaskRunner { 82 public: 83 // Defines what should happen to a task posted to the worker pool on 84 // shutdown. 85 enum WorkerShutdown { 86 // Tasks posted with this mode which have not run at shutdown will be 87 // deleted rather than run, and any tasks with this mode running at 88 // shutdown will be ignored (the worker thread will not be joined). 89 // 90 // This option provides a nice way to post stuff you don't want blocking 91 // shutdown. For example, you might be doing a slow DNS lookup and if it's 92 // blocked on the OS, you may not want to stop shutdown, since the result 93 // doesn't really matter at that point. 94 // 95 // However, you need to be very careful what you do in your callback when 96 // you use this option. Since the thread will continue to run until the OS 97 // terminates the process, the app can be in the process of tearing down 98 // when you're running. This means any singletons or global objects you 99 // use may suddenly become invalid out from under you. For this reason, 100 // it's best to use this only for slow but simple operations like the DNS 101 // example. 102 CONTINUE_ON_SHUTDOWN, 103 104 // Tasks posted with this mode that have not started executing at 105 // shutdown will be deleted rather than executed. However, any tasks that 106 // have already begun executing when shutdown is called will be allowed 107 // to continue, and will block shutdown until completion. 108 // 109 // Note: Because Shutdown() may block while these tasks are executing, 110 // care must be taken to ensure that they do not block on the thread that 111 // called Shutdown(), as this may lead to deadlock. 112 SKIP_ON_SHUTDOWN, 113 114 // Tasks posted with this mode will block shutdown until they're 115 // executed. Since this can have significant performance implications, 116 // use sparingly. 117 // 118 // Generally, this should be used only for user data, for example, a task 119 // writing a preference file. 120 // 121 // If a task is posted during shutdown, it will not get run since the 122 // workers may already be stopped. In this case, the post operation will 123 // fail (return false) and the task will be deleted. 124 BLOCK_SHUTDOWN, 125 }; 126 127 // Opaque identifier that defines sequencing of tasks posted to the worker 128 // pool. 129 class BASE_EXPORT SequenceToken { 130 public: SequenceToken()131 SequenceToken() : id_(0) {} ~SequenceToken()132 ~SequenceToken() {} 133 Equals(const SequenceToken & other)134 bool Equals(const SequenceToken& other) const { 135 return id_ == other.id_; 136 } 137 138 // Returns false if current thread is executing an unsequenced task. IsValid()139 bool IsValid() const { 140 return id_ != 0; 141 } 142 143 // Returns a string representation of this token. This method should only be 144 // used for debugging. 145 std::string ToString() const; 146 147 private: 148 friend class SequencedWorkerPool; 149 SequenceToken(int id)150 explicit SequenceToken(int id) : id_(id) {} 151 152 int id_; 153 }; 154 155 // Allows tests to perform certain actions. 156 class TestingObserver { 157 public: ~TestingObserver()158 virtual ~TestingObserver() {} 159 virtual void OnHasWork() = 0; 160 virtual void WillWaitForShutdown() = 0; 161 virtual void OnDestruct() = 0; 162 }; 163 164 // Gets the SequencedToken of the current thread. 165 // If current thread is not a SequencedWorkerPool worker thread or is running 166 // an unsequenced task, returns an invalid SequenceToken. 167 static SequenceToken GetSequenceTokenForCurrentThread(); 168 169 // Returns the SequencedWorkerPool that owns this thread, or null if the 170 // current thread is not a SequencedWorkerPool worker thread. 171 // 172 // Always returns nullptr when SequencedWorkerPool is redirected to 173 // TaskScheduler. 174 // 175 // DEPRECATED. Use SequencedTaskRunnerHandle::Get() instead. Consequentially 176 // the only remaining use case is in sequenced_task_runner_handle.cc to 177 // implement that and will soon be removed along with SequencedWorkerPool: 178 // http://crbug.com/622400. 179 static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread(); 180 181 // Returns a unique token that can be used to sequence tasks posted to 182 // PostSequencedWorkerTask(). Valid tokens are always nonzero. 183 static SequenceToken GetSequenceToken(); 184 185 // Enables posting tasks to this process' SequencedWorkerPools. Cannot be 186 // called if already enabled. This is not thread-safe; proper synchronization 187 // is required to use any SequencedWorkerPool method after calling this. 188 static void EnableForProcess(); 189 190 // Same as EnableForProcess(), but tasks are redirected to the registered 191 // TaskScheduler. All redirections' TaskPriority will be capped to 192 // |max_task_priority|. There must be a registered TaskScheduler when this is 193 // called. 194 // TODO(gab): Remove this if http://crbug.com/622400 fails 195 // (SequencedWorkerPool will be phased out completely otherwise). 196 static void EnableWithRedirectionToTaskSchedulerForProcess( 197 TaskPriority max_task_priority = TaskPriority::HIGHEST); 198 199 // Disables posting tasks to this process' SequencedWorkerPools. Calling this 200 // while there are active SequencedWorkerPools is not supported. This is not 201 // thread-safe; proper synchronization is required to use any 202 // SequencedWorkerPool method after calling this. 203 static void DisableForProcessForTesting(); 204 205 // Returns true if posting tasks to this process' SequencedWorkerPool is 206 // enabled (with or without redirection to TaskScheduler). 207 static bool IsEnabled(); 208 209 // When constructing a SequencedWorkerPool, there must be a 210 // ThreadTaskRunnerHandle on the current thread unless you plan to 211 // deliberately leak it. 212 213 // Constructs a SequencedWorkerPool which will lazily create up to 214 // |max_threads| and a prefix for the thread name to aid in debugging. 215 // |max_threads| must be greater than 1. |task_priority| will be used to hint 216 // base::TaskScheduler for an experiment in which all SequencedWorkerPool 217 // tasks will be redirected to it in processes where a base::TaskScheduler was 218 // instantiated. 219 SequencedWorkerPool(size_t max_threads, 220 const std::string& thread_name_prefix, 221 base::TaskPriority task_priority); 222 223 // Like above, but with |observer| for testing. Does not take ownership of 224 // |observer|. 225 SequencedWorkerPool(size_t max_threads, 226 const std::string& thread_name_prefix, 227 base::TaskPriority task_priority, 228 TestingObserver* observer); 229 230 // Returns the sequence token associated with the given name. Calling this 231 // function multiple times with the same string will always produce the 232 // same sequence token. If the name has not been used before, a new token 233 // will be created. 234 SequenceToken GetNamedSequenceToken(const std::string& name); 235 236 // Returns a SequencedTaskRunner wrapper which posts to this 237 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 238 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 239 // are posted with BLOCK_SHUTDOWN behavior. 240 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner( 241 SequenceToken token) WARN_UNUSED_RESULT; 242 243 // Returns a SequencedTaskRunner wrapper which posts to this 244 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 245 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 246 // are posted with the given shutdown behavior. 247 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior( 248 SequenceToken token, 249 WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT; 250 251 // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using 252 // the given shutdown behavior. Tasks with nonzero delay are posted with 253 // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the 254 // given shutdown behavior. 255 scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior( 256 WorkerShutdown shutdown_behavior) WARN_UNUSED_RESULT; 257 258 // Posts the given task for execution in the worker pool. Tasks posted with 259 // this function will execute in an unspecified order on a background thread. 260 // Returns true if the task was posted. If your tasks have ordering 261 // requirements, see PostSequencedWorkerTask(). 262 // 263 // This class will attempt to delete tasks that aren't run 264 // (non-block-shutdown semantics) but can't guarantee that this happens. If 265 // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there 266 // will be no workers available to delete these tasks. And there may be 267 // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN 268 // tasks. Deleting those tasks before the previous one has completed could 269 // cause nondeterministic crashes because the task could be keeping some 270 // objects alive which do work in their destructor, which could voilate the 271 // assumptions of the running task. 272 // 273 // The task will be guaranteed to run to completion before shutdown 274 // (BLOCK_SHUTDOWN semantics). 275 // 276 // Returns true if the task was posted successfully. This may fail during 277 // shutdown regardless of the specified ShutdownBehavior. 278 bool PostWorkerTask(const tracked_objects::Location& from_here, 279 OnceClosure task); 280 281 // Same as PostWorkerTask but allows a delay to be specified (although doing 282 // so changes the shutdown behavior). The task will be run after the given 283 // delay has elapsed. 284 // 285 // If the delay is nonzero, the task won't be guaranteed to run to completion 286 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 287 // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the 288 // task will be guaranteed to run to completion before shutdown 289 // (BLOCK_SHUTDOWN semantics). 290 bool PostDelayedWorkerTask(const tracked_objects::Location& from_here, 291 OnceClosure task, 292 TimeDelta delay); 293 294 // Same as PostWorkerTask but allows specification of the shutdown behavior. 295 bool PostWorkerTaskWithShutdownBehavior( 296 const tracked_objects::Location& from_here, 297 OnceClosure task, 298 WorkerShutdown shutdown_behavior); 299 300 // Like PostWorkerTask above, but provides sequencing semantics. This means 301 // that tasks posted with the same sequence token (see GetSequenceToken()) 302 // are guaranteed to execute in order. This is useful in cases where you're 303 // doing operations that may depend on previous ones, like appending to a 304 // file. 305 // 306 // The task will be guaranteed to run to completion before shutdown 307 // (BLOCK_SHUTDOWN semantics). 308 // 309 // Returns true if the task was posted successfully. This may fail during 310 // shutdown regardless of the specified ShutdownBehavior. 311 bool PostSequencedWorkerTask(SequenceToken sequence_token, 312 const tracked_objects::Location& from_here, 313 OnceClosure task); 314 315 // Like PostSequencedWorkerTask above, but allows you to specify a named 316 // token, which saves an extra call to GetNamedSequenceToken. 317 bool PostNamedSequencedWorkerTask(const std::string& token_name, 318 const tracked_objects::Location& from_here, 319 OnceClosure task); 320 321 // Same as PostSequencedWorkerTask but allows a delay to be specified 322 // (although doing so changes the shutdown behavior). The task will be run 323 // after the given delay has elapsed. 324 // 325 // If the delay is nonzero, the task won't be guaranteed to run to completion 326 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 327 // If the delay is zero, this behaves exactly like PostSequencedWorkerTask, 328 // i.e. the task will be guaranteed to run to completion before shutdown 329 // (BLOCK_SHUTDOWN semantics). 330 bool PostDelayedSequencedWorkerTask( 331 SequenceToken sequence_token, 332 const tracked_objects::Location& from_here, 333 OnceClosure task, 334 TimeDelta delay); 335 336 // Same as PostSequencedWorkerTask but allows specification of the shutdown 337 // behavior. 338 bool PostSequencedWorkerTaskWithShutdownBehavior( 339 SequenceToken sequence_token, 340 const tracked_objects::Location& from_here, 341 OnceClosure task, 342 WorkerShutdown shutdown_behavior); 343 344 // TaskRunner implementation. Forwards to PostDelayedWorkerTask(). 345 bool PostDelayedTask(const tracked_objects::Location& from_here, 346 OnceClosure task, 347 TimeDelta delay) override; 348 bool RunsTasksOnCurrentThread() const override; 349 350 // Blocks until all pending tasks are complete. This should only be called in 351 // unit tests when you want to validate something that should have happened. 352 // Does not wait for delayed tasks. If redirection to TaskScheduler is 353 // disabled, delayed tasks are deleted. If redirection to TaskScheduler is 354 // enabled, this will wait for all tasks posted to TaskScheduler (not just 355 // tasks posted to this SequencedWorkerPool). 356 // 357 // Note that calling this will not prevent other threads from posting work to 358 // the queue while the calling thread is waiting on Flush(). In this case, 359 // Flush will return only when there's no more work in the queue. Normally, 360 // this doesn't come up since in a test, all the work is being posted from 361 // the main thread. 362 // 363 // TODO(gab): Remove mentions of TaskScheduler in this comment if 364 // http://crbug.com/622400 fails. 365 void FlushForTesting(); 366 367 // Spuriously signal that there is work to be done. 368 void SignalHasWorkForTesting(); 369 370 // Implements the worker pool shutdown. This should be called during app 371 // shutdown, and will discard/join with appropriate tasks before returning. 372 // After this call, subsequent calls to post tasks will fail. 373 // 374 // Must be called from the same thread this object was constructed on. Shutdown()375 void Shutdown() { Shutdown(0); } 376 377 // A variant that allows an arbitrary number of new blocking tasks to be 378 // posted during shutdown. The tasks cannot be posted within the execution 379 // context of tasks whose shutdown behavior is not BLOCKING_SHUTDOWN. Once 380 // the limit is reached, subsequent calls to post task fail in all cases. 381 // Must be called from the same thread this object was constructed on. 382 void Shutdown(int max_new_blocking_tasks_after_shutdown); 383 384 // Check if Shutdown was called for given threading pool. This method is used 385 // for aborting time consuming operation to avoid blocking shutdown. 386 // 387 // Can be called from any thread. 388 bool IsShutdownInProgress(); 389 390 protected: 391 ~SequencedWorkerPool() override; 392 393 void OnDestruct() const override; 394 395 private: 396 friend class RefCountedThreadSafe<SequencedWorkerPool>; 397 friend class DeleteHelper<SequencedWorkerPool>; 398 399 class Inner; 400 class PoolSequencedTaskRunner; 401 class Worker; 402 403 // Returns true if the current thread is processing a task with the given 404 // sequence_token. 405 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 406 407 const scoped_refptr<SequencedTaskRunner> constructor_task_runner_; 408 409 // Avoid pulling in too many headers by putting (almost) everything 410 // into |inner_|. 411 const std::unique_ptr<Inner> inner_; 412 413 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); 414 }; 415 416 } // namespace base 417 418 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 419