1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/sequenced_worker_pool.h"
6
7 #include <list>
8 #include <map>
9 #include <set>
10 #include <utility>
11 #include <vector>
12
13 #include "base/atomic_sequence_num.h"
14 #include "base/callback.h"
15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h"
17 #include "base/debug/trace_event.h"
18 #include "base/lazy_instance.h"
19 #include "base/logging.h"
20 #include "base/memory/linked_ptr.h"
21 #include "base/message_loop/message_loop_proxy.h"
22 #include "base/stl_util.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/lock.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/simple_thread.h"
28 #include "base/threading/thread_local.h"
29 #include "base/threading/thread_restrictions.h"
30 #include "base/time/time.h"
31 #include "base/tracked_objects.h"
32
33 #if defined(OS_MACOSX)
34 #include "base/mac/scoped_nsautorelease_pool.h"
35 #elif defined(OS_WIN)
36 #include "base/win/scoped_com_initializer.h"
37 #endif
38
39 #if !defined(OS_NACL)
40 #include "base/metrics/histogram.h"
41 #endif
42
43 namespace base {
44
45 namespace {
46
47 struct SequencedTask : public TrackingInfo {
SequencedTaskbase::__anonf5c1db210111::SequencedTask48 SequencedTask()
49 : sequence_token_id(0),
50 trace_id(0),
51 sequence_task_number(0),
52 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
53
SequencedTaskbase::__anonf5c1db210111::SequencedTask54 explicit SequencedTask(const tracked_objects::Location& from_here)
55 : base::TrackingInfo(from_here, TimeTicks()),
56 sequence_token_id(0),
57 trace_id(0),
58 sequence_task_number(0),
59 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
60
~SequencedTaskbase::__anonf5c1db210111::SequencedTask61 ~SequencedTask() {}
62
63 int sequence_token_id;
64 int trace_id;
65 int64 sequence_task_number;
66 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
67 tracked_objects::Location posted_from;
68 Closure task;
69
70 // Non-delayed tasks and delayed tasks are managed together by time-to-run
71 // order. We calculate the time by adding the posted time and the given delay.
72 TimeTicks time_to_run;
73 };
74
75 struct SequencedTaskLessThan {
76 public:
operator ()base::__anonf5c1db210111::SequencedTaskLessThan77 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
78 if (lhs.time_to_run < rhs.time_to_run)
79 return true;
80
81 if (lhs.time_to_run > rhs.time_to_run)
82 return false;
83
84 // If the time happen to match, then we use the sequence number to decide.
85 return lhs.sequence_task_number < rhs.sequence_task_number;
86 }
87 };
88
89 // SequencedWorkerPoolTaskRunner ---------------------------------------------
90 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
91 // fixed ShutdownBehavior.
92 //
93 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
94 class SequencedWorkerPoolTaskRunner : public TaskRunner {
95 public:
96 SequencedWorkerPoolTaskRunner(
97 const scoped_refptr<SequencedWorkerPool>& pool,
98 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
99
100 // TaskRunner implementation
101 virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
102 const Closure& task,
103 TimeDelta delay) OVERRIDE;
104 virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
105
106 private:
107 virtual ~SequencedWorkerPoolTaskRunner();
108
109 const scoped_refptr<SequencedWorkerPool> pool_;
110
111 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
112
113 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
114 };
115
SequencedWorkerPoolTaskRunner(const scoped_refptr<SequencedWorkerPool> & pool,SequencedWorkerPool::WorkerShutdown shutdown_behavior)116 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
117 const scoped_refptr<SequencedWorkerPool>& pool,
118 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
119 : pool_(pool),
120 shutdown_behavior_(shutdown_behavior) {
121 }
122
~SequencedWorkerPoolTaskRunner()123 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
124 }
125
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)126 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
127 const tracked_objects::Location& from_here,
128 const Closure& task,
129 TimeDelta delay) {
130 if (delay == TimeDelta()) {
131 return pool_->PostWorkerTaskWithShutdownBehavior(
132 from_here, task, shutdown_behavior_);
133 }
134 return pool_->PostDelayedWorkerTask(from_here, task, delay);
135 }
136
RunsTasksOnCurrentThread() const137 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
138 return pool_->RunsTasksOnCurrentThread();
139 }
140
141 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
142 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
143 // fixed sequence token.
144 //
145 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
146 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
147 public:
148 SequencedWorkerPoolSequencedTaskRunner(
149 const scoped_refptr<SequencedWorkerPool>& pool,
150 SequencedWorkerPool::SequenceToken token,
151 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
152
153 // TaskRunner implementation
154 virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
155 const Closure& task,
156 TimeDelta delay) OVERRIDE;
157 virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
158
159 // SequencedTaskRunner implementation
160 virtual bool PostNonNestableDelayedTask(
161 const tracked_objects::Location& from_here,
162 const Closure& task,
163 TimeDelta delay) OVERRIDE;
164
165 private:
166 virtual ~SequencedWorkerPoolSequencedTaskRunner();
167
168 const scoped_refptr<SequencedWorkerPool> pool_;
169
170 const SequencedWorkerPool::SequenceToken token_;
171
172 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
173
174 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
175 };
176
SequencedWorkerPoolSequencedTaskRunner(const scoped_refptr<SequencedWorkerPool> & pool,SequencedWorkerPool::SequenceToken token,SequencedWorkerPool::WorkerShutdown shutdown_behavior)177 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
178 const scoped_refptr<SequencedWorkerPool>& pool,
179 SequencedWorkerPool::SequenceToken token,
180 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
181 : pool_(pool),
182 token_(token),
183 shutdown_behavior_(shutdown_behavior) {
184 }
185
186 SequencedWorkerPoolSequencedTaskRunner::
~SequencedWorkerPoolSequencedTaskRunner()187 ~SequencedWorkerPoolSequencedTaskRunner() {
188 }
189
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)190 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
191 const tracked_objects::Location& from_here,
192 const Closure& task,
193 TimeDelta delay) {
194 if (delay == TimeDelta()) {
195 return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
196 token_, from_here, task, shutdown_behavior_);
197 }
198 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
199 }
200
RunsTasksOnCurrentThread() const201 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
202 return pool_->IsRunningSequenceOnCurrentThread(token_);
203 }
204
PostNonNestableDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)205 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
206 const tracked_objects::Location& from_here,
207 const Closure& task,
208 TimeDelta delay) {
209 // There's no way to run nested tasks, so simply forward to
210 // PostDelayedTask.
211 return PostDelayedTask(from_here, task, delay);
212 }
213
214 // Create a process-wide unique ID to represent this task in trace events. This
215 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
216 // with MessageLoop pointers on other processes.
GetTaskTraceID(const SequencedTask & task,void * pool)217 uint64 GetTaskTraceID(const SequencedTask& task,
218 void* pool) {
219 return (static_cast<uint64>(task.trace_id) << 32) |
220 static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
221 }
222
223 base::LazyInstance<base::ThreadLocalPointer<
224 SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
225 LAZY_INSTANCE_INITIALIZER;
226
227 } // namespace
228
229 // Worker ---------------------------------------------------------------------
230
231 class SequencedWorkerPool::Worker : public SimpleThread {
232 public:
233 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
234 // around as long as we are running.
235 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
236 int thread_number,
237 const std::string& thread_name_prefix);
238 virtual ~Worker();
239
240 // SimpleThread implementation. This actually runs the background thread.
241 virtual void Run() OVERRIDE;
242
set_running_task_info(SequenceToken token,WorkerShutdown shutdown_behavior)243 void set_running_task_info(SequenceToken token,
244 WorkerShutdown shutdown_behavior) {
245 running_sequence_ = token;
246 running_shutdown_behavior_ = shutdown_behavior;
247 }
248
running_sequence() const249 SequenceToken running_sequence() const {
250 return running_sequence_;
251 }
252
running_shutdown_behavior() const253 WorkerShutdown running_shutdown_behavior() const {
254 return running_shutdown_behavior_;
255 }
256
257 private:
258 scoped_refptr<SequencedWorkerPool> worker_pool_;
259 SequenceToken running_sequence_;
260 WorkerShutdown running_shutdown_behavior_;
261
262 DISALLOW_COPY_AND_ASSIGN(Worker);
263 };
264
265 // Inner ----------------------------------------------------------------------
266
267 class SequencedWorkerPool::Inner {
268 public:
269 // Take a raw pointer to |worker| to avoid cycles (since we're owned
270 // by it).
271 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
272 const std::string& thread_name_prefix,
273 TestingObserver* observer);
274
275 ~Inner();
276
277 SequenceToken GetSequenceToken();
278
279 SequenceToken GetNamedSequenceToken(const std::string& name);
280
281 // This function accepts a name and an ID. If the name is null, the
282 // token ID is used. This allows us to implement the optional name lookup
283 // from a single function without having to enter the lock a separate time.
284 bool PostTask(const std::string* optional_token_name,
285 SequenceToken sequence_token,
286 WorkerShutdown shutdown_behavior,
287 const tracked_objects::Location& from_here,
288 const Closure& task,
289 TimeDelta delay);
290
291 bool RunsTasksOnCurrentThread() const;
292
293 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
294
295 void CleanupForTesting();
296
297 void SignalHasWorkForTesting();
298
299 int GetWorkSignalCountForTesting() const;
300
301 void Shutdown(int max_blocking_tasks_after_shutdown);
302
303 bool IsShutdownInProgress();
304
305 // Runs the worker loop on the background thread.
306 void ThreadLoop(Worker* this_worker);
307
308 private:
309 enum GetWorkStatus {
310 GET_WORK_FOUND,
311 GET_WORK_NOT_FOUND,
312 GET_WORK_WAIT,
313 };
314
315 enum CleanupState {
316 CLEANUP_REQUESTED,
317 CLEANUP_STARTING,
318 CLEANUP_RUNNING,
319 CLEANUP_FINISHING,
320 CLEANUP_DONE,
321 };
322
323 // Called from within the lock, this converts the given token name into a
324 // token ID, creating a new one if necessary.
325 int LockedGetNamedTokenID(const std::string& name);
326
327 // Called from within the lock, this returns the next sequence task number.
328 int64 LockedGetNextSequenceTaskNumber();
329
330 // Called from within the lock, returns the shutdown behavior of the task
331 // running on the currently executing worker thread. If invoked from a thread
332 // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
333 WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
334
335 // Gets new task. There are 3 cases depending on the return value:
336 //
337 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
338 // be run immediately.
339 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
340 // and |task| is not filled in. In this case, the caller should wait until
341 // a task is posted.
342 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
343 // immediately, and |task| is not filled in. Likewise, |wait_time| is
344 // filled in the time to wait until the next task to run. In this case, the
345 // caller should wait the time.
346 //
347 // In any case, the calling code should clear the given
348 // delete_these_outside_lock vector the next time the lock is released.
349 // See the implementation for a more detailed description.
350 GetWorkStatus GetWork(SequencedTask* task,
351 TimeDelta* wait_time,
352 std::vector<Closure>* delete_these_outside_lock);
353
354 void HandleCleanup();
355
356 // Peforms init and cleanup around running the given task. WillRun...
357 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
358 // The calling code should call FinishStartingAdditionalThread once the
359 // lock is released if the return values is nonzero.
360 int WillRunWorkerTask(const SequencedTask& task);
361 void DidRunWorkerTask(const SequencedTask& task);
362
363 // Returns true if there are no threads currently running the given
364 // sequence token.
365 bool IsSequenceTokenRunnable(int sequence_token_id) const;
366
367 // Checks if all threads are busy and the addition of one more could run an
368 // additional task waiting in the queue. This must be called from within
369 // the lock.
370 //
371 // If another thread is helpful, this will mark the thread as being in the
372 // process of starting and returns the index of the new thread which will be
373 // 0 or more. The caller should then call FinishStartingAdditionalThread to
374 // complete initialization once the lock is released.
375 //
376 // If another thread is not necessary, returne 0;
377 //
378 // See the implementedion for more.
379 int PrepareToStartAdditionalThreadIfHelpful();
380
381 // The second part of thread creation after
382 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
383 // generated. This actually creates the thread and should be called outside
384 // the lock to avoid blocking important work starting a thread in the lock.
385 void FinishStartingAdditionalThread(int thread_number);
386
387 // Signal |has_work_| and increment |has_work_signal_count_|.
388 void SignalHasWork();
389
390 // Checks whether there is work left that's blocking shutdown. Must be
391 // called inside the lock.
392 bool CanShutdown() const;
393
394 SequencedWorkerPool* const worker_pool_;
395
396 // The last sequence number used. Managed by GetSequenceToken, since this
397 // only does threadsafe increment operations, you do not need to hold the
398 // lock. This is class-static to make SequenceTokens issued by
399 // GetSequenceToken unique across SequencedWorkerPool instances.
400 static base::StaticAtomicSequenceNumber g_last_sequence_number_;
401
402 // This lock protects |everything in this class|. Do not read or modify
403 // anything without holding this lock. Do not block while holding this
404 // lock.
405 mutable Lock lock_;
406
407 // Condition variable that is waited on by worker threads until new
408 // tasks are posted or shutdown starts.
409 ConditionVariable has_work_cv_;
410
411 // Condition variable that is waited on by non-worker threads (in
412 // Shutdown()) until CanShutdown() goes to true.
413 ConditionVariable can_shutdown_cv_;
414
415 // The maximum number of worker threads we'll create.
416 const size_t max_threads_;
417
418 const std::string thread_name_prefix_;
419
420 // Associates all known sequence token names with their IDs.
421 std::map<std::string, int> named_sequence_tokens_;
422
423 // Owning pointers to all threads we've created so far, indexed by
424 // ID. Since we lazily create threads, this may be less than
425 // max_threads_ and will be initially empty.
426 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
427 ThreadMap threads_;
428
429 // Set to true when we're in the process of creating another thread.
430 // See PrepareToStartAdditionalThreadIfHelpful for more.
431 bool thread_being_created_;
432
433 // Number of threads currently waiting for work.
434 size_t waiting_thread_count_;
435
436 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
437 // or SKIP_ON_SHUTDOWN flag set.
438 size_t blocking_shutdown_thread_count_;
439
440 // A set of all pending tasks in time-to-run order. These are tasks that are
441 // either waiting for a thread to run on, waiting for their time to run,
442 // or blocked on a previous task in their sequence. We have to iterate over
443 // the tasks by time-to-run order, so we use the set instead of the
444 // traditional priority_queue.
445 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
446 PendingTaskSet pending_tasks_;
447
448 // The next sequence number for a new sequenced task.
449 int64 next_sequence_task_number_;
450
451 // Number of tasks in the pending_tasks_ list that are marked as blocking
452 // shutdown.
453 size_t blocking_shutdown_pending_task_count_;
454
455 // Lists all sequence tokens currently executing.
456 std::set<int> current_sequences_;
457
458 // An ID for each posted task to distinguish the task from others in traces.
459 int trace_id_;
460
461 // Set when Shutdown is called and no further tasks should be
462 // allowed, though we may still be running existing tasks.
463 bool shutdown_called_;
464
465 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
466 // has been called.
467 int max_blocking_tasks_after_shutdown_;
468
469 // State used to cleanup for testing, all guarded by lock_.
470 CleanupState cleanup_state_;
471 size_t cleanup_idlers_;
472 ConditionVariable cleanup_cv_;
473
474 TestingObserver* const testing_observer_;
475
476 DISALLOW_COPY_AND_ASSIGN(Inner);
477 };
478
479 // Worker definitions ---------------------------------------------------------
480
Worker(const scoped_refptr<SequencedWorkerPool> & worker_pool,int thread_number,const std::string & prefix)481 SequencedWorkerPool::Worker::Worker(
482 const scoped_refptr<SequencedWorkerPool>& worker_pool,
483 int thread_number,
484 const std::string& prefix)
485 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
486 worker_pool_(worker_pool),
487 running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
488 Start();
489 }
490
~Worker()491 SequencedWorkerPool::Worker::~Worker() {
492 }
493
Run()494 void SequencedWorkerPool::Worker::Run() {
495 #if defined(OS_WIN)
496 win::ScopedCOMInitializer com_initializer;
497 #endif
498
499 // Store a pointer to the running sequence in thread local storage for
500 // static function access.
501 g_lazy_tls_ptr.Get().Set(&running_sequence_);
502
503 // Just jump back to the Inner object to run the thread, since it has all the
504 // tracking information and queues. It might be more natural to implement
505 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
506 // having these worker objects at all, but that method lacks the ability to
507 // send thread-specific information easily to the thread loop.
508 worker_pool_->inner_->ThreadLoop(this);
509 // Release our cyclic reference once we're done.
510 worker_pool_ = NULL;
511 }
512
513 // Inner definitions ---------------------------------------------------------
514
Inner(SequencedWorkerPool * worker_pool,size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)515 SequencedWorkerPool::Inner::Inner(
516 SequencedWorkerPool* worker_pool,
517 size_t max_threads,
518 const std::string& thread_name_prefix,
519 TestingObserver* observer)
520 : worker_pool_(worker_pool),
521 lock_(),
522 has_work_cv_(&lock_),
523 can_shutdown_cv_(&lock_),
524 max_threads_(max_threads),
525 thread_name_prefix_(thread_name_prefix),
526 thread_being_created_(false),
527 waiting_thread_count_(0),
528 blocking_shutdown_thread_count_(0),
529 next_sequence_task_number_(0),
530 blocking_shutdown_pending_task_count_(0),
531 trace_id_(0),
532 shutdown_called_(false),
533 max_blocking_tasks_after_shutdown_(0),
534 cleanup_state_(CLEANUP_DONE),
535 cleanup_idlers_(0),
536 cleanup_cv_(&lock_),
537 testing_observer_(observer) {}
538
~Inner()539 SequencedWorkerPool::Inner::~Inner() {
540 // You must call Shutdown() before destroying the pool.
541 DCHECK(shutdown_called_);
542
543 // Need to explicitly join with the threads before they're destroyed or else
544 // they will be running when our object is half torn down.
545 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
546 it->second->Join();
547 threads_.clear();
548
549 if (testing_observer_)
550 testing_observer_->OnDestruct();
551 }
552
553 SequencedWorkerPool::SequenceToken
GetSequenceToken()554 SequencedWorkerPool::Inner::GetSequenceToken() {
555 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
556 // is used as a sentinel value in SequenceTokens.
557 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
558 }
559
560 SequencedWorkerPool::SequenceToken
GetNamedSequenceToken(const std::string & name)561 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
562 AutoLock lock(lock_);
563 return SequenceToken(LockedGetNamedTokenID(name));
564 }
565
PostTask(const std::string * optional_token_name,SequenceToken sequence_token,WorkerShutdown shutdown_behavior,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)566 bool SequencedWorkerPool::Inner::PostTask(
567 const std::string* optional_token_name,
568 SequenceToken sequence_token,
569 WorkerShutdown shutdown_behavior,
570 const tracked_objects::Location& from_here,
571 const Closure& task,
572 TimeDelta delay) {
573 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
574 SequencedTask sequenced(from_here);
575 sequenced.sequence_token_id = sequence_token.id_;
576 sequenced.shutdown_behavior = shutdown_behavior;
577 sequenced.posted_from = from_here;
578 sequenced.task =
579 shutdown_behavior == BLOCK_SHUTDOWN ?
580 base::MakeCriticalClosure(task) : task;
581 sequenced.time_to_run = TimeTicks::Now() + delay;
582
583 int create_thread_id = 0;
584 {
585 AutoLock lock(lock_);
586 if (shutdown_called_) {
587 if (shutdown_behavior != BLOCK_SHUTDOWN ||
588 LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
589 return false;
590 }
591 if (max_blocking_tasks_after_shutdown_ <= 0) {
592 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
593 return false;
594 }
595 max_blocking_tasks_after_shutdown_ -= 1;
596 }
597
598 // The trace_id is used for identifying the task in about:tracing.
599 sequenced.trace_id = trace_id_++;
600
601 TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
602 "SequencedWorkerPool::PostTask",
603 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
604
605 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
606
607 // Now that we have the lock, apply the named token rules.
608 if (optional_token_name)
609 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
610
611 pending_tasks_.insert(sequenced);
612 if (shutdown_behavior == BLOCK_SHUTDOWN)
613 blocking_shutdown_pending_task_count_++;
614
615 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
616 }
617
618 // Actually start the additional thread or signal an existing one now that
619 // we're outside the lock.
620 if (create_thread_id)
621 FinishStartingAdditionalThread(create_thread_id);
622 else
623 SignalHasWork();
624
625 return true;
626 }
627
RunsTasksOnCurrentThread() const628 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
629 AutoLock lock(lock_);
630 return ContainsKey(threads_, PlatformThread::CurrentId());
631 }
632
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const633 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
634 SequenceToken sequence_token) const {
635 AutoLock lock(lock_);
636 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
637 if (found == threads_.end())
638 return false;
639 return sequence_token.Equals(found->second->running_sequence());
640 }
641
642 // See https://code.google.com/p/chromium/issues/detail?id=168415
CleanupForTesting()643 void SequencedWorkerPool::Inner::CleanupForTesting() {
644 DCHECK(!RunsTasksOnCurrentThread());
645 base::ThreadRestrictions::ScopedAllowWait allow_wait;
646 AutoLock lock(lock_);
647 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
648 if (shutdown_called_)
649 return;
650 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
651 return;
652 cleanup_state_ = CLEANUP_REQUESTED;
653 cleanup_idlers_ = 0;
654 has_work_cv_.Signal();
655 while (cleanup_state_ != CLEANUP_DONE)
656 cleanup_cv_.Wait();
657 }
658
SignalHasWorkForTesting()659 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
660 SignalHasWork();
661 }
662
Shutdown(int max_new_blocking_tasks_after_shutdown)663 void SequencedWorkerPool::Inner::Shutdown(
664 int max_new_blocking_tasks_after_shutdown) {
665 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
666 {
667 AutoLock lock(lock_);
668 // Cleanup and Shutdown should not be called concurrently.
669 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
670 if (shutdown_called_)
671 return;
672 shutdown_called_ = true;
673 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
674
675 // Tickle the threads. This will wake up a waiting one so it will know that
676 // it can exit, which in turn will wake up any other waiting ones.
677 SignalHasWork();
678
679 // There are no pending or running tasks blocking shutdown, we're done.
680 if (CanShutdown())
681 return;
682 }
683
684 // If we're here, then something is blocking shutdown. So wait for
685 // CanShutdown() to go to true.
686
687 if (testing_observer_)
688 testing_observer_->WillWaitForShutdown();
689
690 #if !defined(OS_NACL)
691 TimeTicks shutdown_wait_begin = TimeTicks::Now();
692 #endif
693
694 {
695 base::ThreadRestrictions::ScopedAllowWait allow_wait;
696 AutoLock lock(lock_);
697 while (!CanShutdown())
698 can_shutdown_cv_.Wait();
699 }
700 #if !defined(OS_NACL)
701 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
702 TimeTicks::Now() - shutdown_wait_begin);
703 #endif
704 }
705
IsShutdownInProgress()706 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
707 AutoLock lock(lock_);
708 return shutdown_called_;
709 }
710
ThreadLoop(Worker * this_worker)711 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
712 {
713 AutoLock lock(lock_);
714 DCHECK(thread_being_created_);
715 thread_being_created_ = false;
716 std::pair<ThreadMap::iterator, bool> result =
717 threads_.insert(
718 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
719 DCHECK(result.second);
720
721 while (true) {
722 #if defined(OS_MACOSX)
723 base::mac::ScopedNSAutoreleasePool autorelease_pool;
724 #endif
725
726 HandleCleanup();
727
728 // See GetWork for what delete_these_outside_lock is doing.
729 SequencedTask task;
730 TimeDelta wait_time;
731 std::vector<Closure> delete_these_outside_lock;
732 GetWorkStatus status =
733 GetWork(&task, &wait_time, &delete_these_outside_lock);
734 if (status == GET_WORK_FOUND) {
735 TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
736 "SequencedWorkerPool::PostTask",
737 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
738 TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop",
739 "src_file", task.posted_from.file_name(),
740 "src_func", task.posted_from.function_name());
741 int new_thread_id = WillRunWorkerTask(task);
742 {
743 AutoUnlock unlock(lock_);
744 // There may be more work available, so wake up another
745 // worker thread. (Technically not required, since we
746 // already get a signal for each new task, but it doesn't
747 // hurt.)
748 SignalHasWork();
749 delete_these_outside_lock.clear();
750
751 // Complete thread creation outside the lock if necessary.
752 if (new_thread_id)
753 FinishStartingAdditionalThread(new_thread_id);
754
755 this_worker->set_running_task_info(
756 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
757
758 tracked_objects::TrackedTime start_time =
759 tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
760
761 task.task.Run();
762
763 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
764 start_time, tracked_objects::ThreadData::NowForEndOfRun());
765
766 // Make sure our task is erased outside the lock for the
767 // same reason we do this with delete_these_oustide_lock.
768 // Also, do it before calling set_running_task_info() so
769 // that sequence-checking from within the task's destructor
770 // still works.
771 task.task = Closure();
772
773 this_worker->set_running_task_info(
774 SequenceToken(), CONTINUE_ON_SHUTDOWN);
775 }
776 DidRunWorkerTask(task); // Must be done inside the lock.
777 } else if (cleanup_state_ == CLEANUP_RUNNING) {
778 switch (status) {
779 case GET_WORK_WAIT: {
780 AutoUnlock unlock(lock_);
781 delete_these_outside_lock.clear();
782 }
783 break;
784 case GET_WORK_NOT_FOUND:
785 CHECK(delete_these_outside_lock.empty());
786 cleanup_state_ = CLEANUP_FINISHING;
787 cleanup_cv_.Broadcast();
788 break;
789 default:
790 NOTREACHED();
791 }
792 } else {
793 // When we're terminating and there's no more work, we can
794 // shut down, other workers can complete any pending or new tasks.
795 // We can get additional tasks posted after shutdown_called_ is set
796 // but only worker threads are allowed to post tasks at that time, and
797 // the workers responsible for posting those tasks will be available
798 // to run them. Also, there may be some tasks stuck behind running
799 // ones with the same sequence token, but additional threads won't
800 // help this case.
801 if (shutdown_called_ &&
802 blocking_shutdown_pending_task_count_ == 0)
803 break;
804 waiting_thread_count_++;
805
806 switch (status) {
807 case GET_WORK_NOT_FOUND:
808 has_work_cv_.Wait();
809 break;
810 case GET_WORK_WAIT:
811 has_work_cv_.TimedWait(wait_time);
812 break;
813 default:
814 NOTREACHED();
815 }
816 waiting_thread_count_--;
817 }
818 }
819 } // Release lock_.
820
821 // We noticed we should exit. Wake up the next worker so it knows it should
822 // exit as well (because the Shutdown() code only signals once).
823 SignalHasWork();
824
825 // Possibly unblock shutdown.
826 can_shutdown_cv_.Signal();
827 }
828
HandleCleanup()829 void SequencedWorkerPool::Inner::HandleCleanup() {
830 lock_.AssertAcquired();
831 if (cleanup_state_ == CLEANUP_DONE)
832 return;
833 if (cleanup_state_ == CLEANUP_REQUESTED) {
834 // We win, we get to do the cleanup as soon as the others wise up and idle.
835 cleanup_state_ = CLEANUP_STARTING;
836 while (thread_being_created_ ||
837 cleanup_idlers_ != threads_.size() - 1) {
838 has_work_cv_.Signal();
839 cleanup_cv_.Wait();
840 }
841 cleanup_state_ = CLEANUP_RUNNING;
842 return;
843 }
844 if (cleanup_state_ == CLEANUP_STARTING) {
845 // Another worker thread is cleaning up, we idle here until thats done.
846 ++cleanup_idlers_;
847 cleanup_cv_.Broadcast();
848 while (cleanup_state_ != CLEANUP_FINISHING) {
849 cleanup_cv_.Wait();
850 }
851 --cleanup_idlers_;
852 cleanup_cv_.Broadcast();
853 return;
854 }
855 if (cleanup_state_ == CLEANUP_FINISHING) {
856 // We wait for all idlers to wake up prior to being DONE.
857 while (cleanup_idlers_ != 0) {
858 cleanup_cv_.Broadcast();
859 cleanup_cv_.Wait();
860 }
861 if (cleanup_state_ == CLEANUP_FINISHING) {
862 cleanup_state_ = CLEANUP_DONE;
863 cleanup_cv_.Signal();
864 }
865 return;
866 }
867 }
868
LockedGetNamedTokenID(const std::string & name)869 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
870 const std::string& name) {
871 lock_.AssertAcquired();
872 DCHECK(!name.empty());
873
874 std::map<std::string, int>::const_iterator found =
875 named_sequence_tokens_.find(name);
876 if (found != named_sequence_tokens_.end())
877 return found->second; // Got an existing one.
878
879 // Create a new one for this name.
880 SequenceToken result = GetSequenceToken();
881 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
882 return result.id_;
883 }
884
LockedGetNextSequenceTaskNumber()885 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
886 lock_.AssertAcquired();
887 // We assume that we never create enough tasks to wrap around.
888 return next_sequence_task_number_++;
889 }
890
891 SequencedWorkerPool::WorkerShutdown
LockedCurrentThreadShutdownBehavior() const892 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
893 lock_.AssertAcquired();
894 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
895 if (found == threads_.end())
896 return CONTINUE_ON_SHUTDOWN;
897 return found->second->running_shutdown_behavior();
898 }
899
GetWork(SequencedTask * task,TimeDelta * wait_time,std::vector<Closure> * delete_these_outside_lock)900 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
901 SequencedTask* task,
902 TimeDelta* wait_time,
903 std::vector<Closure>* delete_these_outside_lock) {
904 lock_.AssertAcquired();
905
906 #if !defined(OS_NACL)
907 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
908 static_cast<int>(pending_tasks_.size()));
909 #endif
910
911 // Find the next task with a sequence token that's not currently in use.
912 // If the token is in use, that means another thread is running something
913 // in that sequence, and we can't run it without going out-of-order.
914 //
915 // This algorithm is simple and fair, but inefficient in some cases. For
916 // example, say somebody schedules 1000 slow tasks with the same sequence
917 // number. We'll have to go through all those tasks each time we feel like
918 // there might be work to schedule. If this proves to be a problem, we
919 // should make this more efficient.
920 //
921 // One possible enhancement would be to keep a map from sequence ID to a
922 // list of pending but currently blocked SequencedTasks for that ID.
923 // When a worker finishes a task of one sequence token, it can pick up the
924 // next one from that token right away.
925 //
926 // This may lead to starvation if there are sufficient numbers of sequences
927 // in use. To alleviate this, we could add an incrementing priority counter
928 // to each SequencedTask. Then maintain a priority_queue of all runnable
929 // tasks, sorted by priority counter. When a sequenced task is completed
930 // we would pop the head element off of that tasks pending list and add it
931 // to the priority queue. Then we would run the first item in the priority
932 // queue.
933
934 GetWorkStatus status = GET_WORK_NOT_FOUND;
935 int unrunnable_tasks = 0;
936 PendingTaskSet::iterator i = pending_tasks_.begin();
937 // We assume that the loop below doesn't take too long and so we can just do
938 // a single call to TimeTicks::Now().
939 const TimeTicks current_time = TimeTicks::Now();
940 while (i != pending_tasks_.end()) {
941 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
942 unrunnable_tasks++;
943 ++i;
944 continue;
945 }
946
947 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
948 // We're shutting down and the task we just found isn't blocking
949 // shutdown. Delete it and get more work.
950 //
951 // Note that we do not want to delete unrunnable tasks. Deleting a task
952 // can have side effects (like freeing some objects) and deleting a
953 // task that's supposed to run after one that's currently running could
954 // cause an obscure crash.
955 //
956 // We really want to delete these tasks outside the lock in case the
957 // closures are holding refs to objects that want to post work from
958 // their destructorss (which would deadlock). The closures are
959 // internally refcounted, so we just need to keep a copy of them alive
960 // until the lock is exited. The calling code can just clear() the
961 // vector they passed to us once the lock is exited to make this
962 // happen.
963 delete_these_outside_lock->push_back(i->task);
964 pending_tasks_.erase(i++);
965 continue;
966 }
967
968 if (i->time_to_run > current_time) {
969 // The time to run has not come yet.
970 *wait_time = i->time_to_run - current_time;
971 status = GET_WORK_WAIT;
972 if (cleanup_state_ == CLEANUP_RUNNING) {
973 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
974 delete_these_outside_lock->push_back(i->task);
975 pending_tasks_.erase(i);
976 }
977 break;
978 }
979
980 // Found a runnable task.
981 *task = *i;
982 pending_tasks_.erase(i);
983 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
984 blocking_shutdown_pending_task_count_--;
985 }
986
987 status = GET_WORK_FOUND;
988 break;
989 }
990
991 // Track the number of tasks we had to skip over to see if we should be
992 // making this more efficient. If this number ever becomes large or is
993 // frequently "some", we should consider the optimization above.
994 #if !defined(OS_NACL)
995 UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
996 unrunnable_tasks);
997 #endif
998 return status;
999 }
1000
WillRunWorkerTask(const SequencedTask & task)1001 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1002 lock_.AssertAcquired();
1003
1004 // Mark the task's sequence number as in use.
1005 if (task.sequence_token_id)
1006 current_sequences_.insert(task.sequence_token_id);
1007
1008 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1009 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1010 // completes.
1011 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1012 blocking_shutdown_thread_count_++;
1013
1014 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1015 // creates a new thread if there is no free one, there is a race when posting
1016 // tasks that many tasks could have been posted before a thread started
1017 // running them, so only one thread would have been created. So we also check
1018 // whether we should create more threads after removing our task from the
1019 // queue, which also has the nice side effect of creating the workers from
1020 // background threads rather than the main thread of the app.
1021 //
1022 // If another thread wasn't created, we want to wake up an existing thread
1023 // if there is one waiting to pick up the next task.
1024 //
1025 // Note that we really need to do this *before* running the task, not
1026 // after. Otherwise, if more than one task is posted, the creation of the
1027 // second thread (since we only create one at a time) will be blocked by
1028 // the execution of the first task, which could be arbitrarily long.
1029 return PrepareToStartAdditionalThreadIfHelpful();
1030 }
1031
DidRunWorkerTask(const SequencedTask & task)1032 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1033 lock_.AssertAcquired();
1034
1035 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1036 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1037 blocking_shutdown_thread_count_--;
1038 }
1039
1040 if (task.sequence_token_id)
1041 current_sequences_.erase(task.sequence_token_id);
1042 }
1043
IsSequenceTokenRunnable(int sequence_token_id) const1044 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1045 int sequence_token_id) const {
1046 lock_.AssertAcquired();
1047 return !sequence_token_id ||
1048 current_sequences_.find(sequence_token_id) ==
1049 current_sequences_.end();
1050 }
1051
PrepareToStartAdditionalThreadIfHelpful()1052 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1053 lock_.AssertAcquired();
1054 // How thread creation works:
1055 //
1056 // We'de like to avoid creating threads with the lock held. However, we
1057 // need to be sure that we have an accurate accounting of the threads for
1058 // proper Joining and deltion on shutdown.
1059 //
1060 // We need to figure out if we need another thread with the lock held, which
1061 // is what this function does. It then marks us as in the process of creating
1062 // a thread. When we do shutdown, we wait until the thread_being_created_
1063 // flag is cleared, which ensures that the new thread is properly added to
1064 // all the data structures and we can't leak it. Once shutdown starts, we'll
1065 // refuse to create more threads or they would be leaked.
1066 //
1067 // Note that this creates a mostly benign race condition on shutdown that
1068 // will cause fewer workers to be created than one would expect. It isn't
1069 // much of an issue in real life, but affects some tests. Since we only spawn
1070 // one worker at a time, the following sequence of events can happen:
1071 //
1072 // 1. Main thread posts a bunch of unrelated tasks that would normally be
1073 // run on separate threads.
1074 // 2. The first task post causes us to start a worker. Other tasks do not
1075 // cause a worker to start since one is pending.
1076 // 3. Main thread initiates shutdown.
1077 // 4. No more threads are created since the shutdown_called_ flag is set.
1078 //
1079 // The result is that one may expect that max_threads_ workers to be created
1080 // given the workload, but in reality fewer may be created because the
1081 // sequence of thread creation on the background threads is racing with the
1082 // shutdown call.
1083 if (!shutdown_called_ &&
1084 !thread_being_created_ &&
1085 cleanup_state_ == CLEANUP_DONE &&
1086 threads_.size() < max_threads_ &&
1087 waiting_thread_count_ == 0) {
1088 // We could use an additional thread if there's work to be done.
1089 for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1090 i != pending_tasks_.end(); ++i) {
1091 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1092 // Found a runnable task, mark the thread as being started.
1093 thread_being_created_ = true;
1094 return static_cast<int>(threads_.size() + 1);
1095 }
1096 }
1097 }
1098 return 0;
1099 }
1100
FinishStartingAdditionalThread(int thread_number)1101 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1102 int thread_number) {
1103 // Called outside of the lock.
1104 DCHECK(thread_number > 0);
1105
1106 // The worker is assigned to the list when the thread actually starts, which
1107 // will manage the memory of the pointer.
1108 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1109 }
1110
SignalHasWork()1111 void SequencedWorkerPool::Inner::SignalHasWork() {
1112 has_work_cv_.Signal();
1113 if (testing_observer_) {
1114 testing_observer_->OnHasWork();
1115 }
1116 }
1117
CanShutdown() const1118 bool SequencedWorkerPool::Inner::CanShutdown() const {
1119 lock_.AssertAcquired();
1120 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1121 return !thread_being_created_ &&
1122 blocking_shutdown_thread_count_ == 0 &&
1123 blocking_shutdown_pending_task_count_ == 0;
1124 }
1125
1126 base::StaticAtomicSequenceNumber
1127 SequencedWorkerPool::Inner::g_last_sequence_number_;
1128
1129 // SequencedWorkerPool --------------------------------------------------------
1130
1131 // static
1132 SequencedWorkerPool::SequenceToken
GetSequenceTokenForCurrentThread()1133 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1134 // Don't construct lazy instance on check.
1135 if (g_lazy_tls_ptr == NULL)
1136 return SequenceToken();
1137
1138 SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
1139 if (!token)
1140 return SequenceToken();
1141 return *token;
1142 }
1143
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix)1144 SequencedWorkerPool::SequencedWorkerPool(
1145 size_t max_threads,
1146 const std::string& thread_name_prefix)
1147 : constructor_message_loop_(MessageLoopProxy::current()),
1148 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1149 }
1150
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)1151 SequencedWorkerPool::SequencedWorkerPool(
1152 size_t max_threads,
1153 const std::string& thread_name_prefix,
1154 TestingObserver* observer)
1155 : constructor_message_loop_(MessageLoopProxy::current()),
1156 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1157 }
1158
~SequencedWorkerPool()1159 SequencedWorkerPool::~SequencedWorkerPool() {}
1160
OnDestruct() const1161 void SequencedWorkerPool::OnDestruct() const {
1162 DCHECK(constructor_message_loop_.get());
1163 // Avoid deleting ourselves on a worker thread (which would
1164 // deadlock).
1165 if (RunsTasksOnCurrentThread()) {
1166 constructor_message_loop_->DeleteSoon(FROM_HERE, this);
1167 } else {
1168 delete this;
1169 }
1170 }
1171
GetSequenceToken()1172 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1173 return inner_->GetSequenceToken();
1174 }
1175
GetNamedSequenceToken(const std::string & name)1176 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1177 const std::string& name) {
1178 return inner_->GetNamedSequenceToken(name);
1179 }
1180
GetSequencedTaskRunner(SequenceToken token)1181 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1182 SequenceToken token) {
1183 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1184 }
1185
1186 scoped_refptr<SequencedTaskRunner>
GetSequencedTaskRunnerWithShutdownBehavior(SequenceToken token,WorkerShutdown shutdown_behavior)1187 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1188 SequenceToken token, WorkerShutdown shutdown_behavior) {
1189 return new SequencedWorkerPoolSequencedTaskRunner(
1190 this, token, shutdown_behavior);
1191 }
1192
1193 scoped_refptr<TaskRunner>
GetTaskRunnerWithShutdownBehavior(WorkerShutdown shutdown_behavior)1194 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1195 WorkerShutdown shutdown_behavior) {
1196 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1197 }
1198
PostWorkerTask(const tracked_objects::Location & from_here,const Closure & task)1199 bool SequencedWorkerPool::PostWorkerTask(
1200 const tracked_objects::Location& from_here,
1201 const Closure& task) {
1202 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1203 from_here, task, TimeDelta());
1204 }
1205
PostDelayedWorkerTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1206 bool SequencedWorkerPool::PostDelayedWorkerTask(
1207 const tracked_objects::Location& from_here,
1208 const Closure& task,
1209 TimeDelta delay) {
1210 WorkerShutdown shutdown_behavior =
1211 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1212 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1213 from_here, task, delay);
1214 }
1215
PostWorkerTaskWithShutdownBehavior(const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1216 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1217 const tracked_objects::Location& from_here,
1218 const Closure& task,
1219 WorkerShutdown shutdown_behavior) {
1220 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1221 from_here, task, TimeDelta());
1222 }
1223
PostSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task)1224 bool SequencedWorkerPool::PostSequencedWorkerTask(
1225 SequenceToken sequence_token,
1226 const tracked_objects::Location& from_here,
1227 const Closure& task) {
1228 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1229 from_here, task, TimeDelta());
1230 }
1231
PostDelayedSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1232 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1233 SequenceToken sequence_token,
1234 const tracked_objects::Location& from_here,
1235 const Closure& task,
1236 TimeDelta delay) {
1237 WorkerShutdown shutdown_behavior =
1238 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1239 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1240 from_here, task, delay);
1241 }
1242
PostNamedSequencedWorkerTask(const std::string & token_name,const tracked_objects::Location & from_here,const Closure & task)1243 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1244 const std::string& token_name,
1245 const tracked_objects::Location& from_here,
1246 const Closure& task) {
1247 DCHECK(!token_name.empty());
1248 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1249 from_here, task, TimeDelta());
1250 }
1251
PostSequencedWorkerTaskWithShutdownBehavior(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1252 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1253 SequenceToken sequence_token,
1254 const tracked_objects::Location& from_here,
1255 const Closure& task,
1256 WorkerShutdown shutdown_behavior) {
1257 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1258 from_here, task, TimeDelta());
1259 }
1260
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1261 bool SequencedWorkerPool::PostDelayedTask(
1262 const tracked_objects::Location& from_here,
1263 const Closure& task,
1264 TimeDelta delay) {
1265 return PostDelayedWorkerTask(from_here, task, delay);
1266 }
1267
RunsTasksOnCurrentThread() const1268 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1269 return inner_->RunsTasksOnCurrentThread();
1270 }
1271
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const1272 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1273 SequenceToken sequence_token) const {
1274 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1275 }
1276
FlushForTesting()1277 void SequencedWorkerPool::FlushForTesting() {
1278 inner_->CleanupForTesting();
1279 }
1280
SignalHasWorkForTesting()1281 void SequencedWorkerPool::SignalHasWorkForTesting() {
1282 inner_->SignalHasWorkForTesting();
1283 }
1284
Shutdown(int max_new_blocking_tasks_after_shutdown)1285 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1286 DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1287 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1288 }
1289
IsShutdownInProgress()1290 bool SequencedWorkerPool::IsShutdownInProgress() {
1291 return inner_->IsShutdownInProgress();
1292 }
1293
1294 } // namespace base
1295