• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/threading/sequenced_worker_pool.h"
6 
7 #include <list>
8 #include <map>
9 #include <set>
10 #include <utility>
11 #include <vector>
12 
13 #include "base/atomic_sequence_num.h"
14 #include "base/callback.h"
15 #include "base/compiler_specific.h"
16 #include "base/critical_closure.h"
17 #include "base/debug/trace_event.h"
18 #include "base/lazy_instance.h"
19 #include "base/logging.h"
20 #include "base/memory/linked_ptr.h"
21 #include "base/message_loop/message_loop_proxy.h"
22 #include "base/stl_util.h"
23 #include "base/strings/stringprintf.h"
24 #include "base/synchronization/condition_variable.h"
25 #include "base/synchronization/lock.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/simple_thread.h"
28 #include "base/threading/thread_local.h"
29 #include "base/threading/thread_restrictions.h"
30 #include "base/time/time.h"
31 #include "base/tracked_objects.h"
32 
33 #if defined(OS_MACOSX)
34 #include "base/mac/scoped_nsautorelease_pool.h"
35 #elif defined(OS_WIN)
36 #include "base/win/scoped_com_initializer.h"
37 #endif
38 
39 #if !defined(OS_NACL)
40 #include "base/metrics/histogram.h"
41 #endif
42 
43 namespace base {
44 
45 namespace {
46 
47 struct SequencedTask : public TrackingInfo  {
SequencedTaskbase::__anonf5c1db210111::SequencedTask48   SequencedTask()
49       : sequence_token_id(0),
50         trace_id(0),
51         sequence_task_number(0),
52         shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
53 
SequencedTaskbase::__anonf5c1db210111::SequencedTask54   explicit SequencedTask(const tracked_objects::Location& from_here)
55       : base::TrackingInfo(from_here, TimeTicks()),
56         sequence_token_id(0),
57         trace_id(0),
58         sequence_task_number(0),
59         shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
60 
~SequencedTaskbase::__anonf5c1db210111::SequencedTask61   ~SequencedTask() {}
62 
63   int sequence_token_id;
64   int trace_id;
65   int64 sequence_task_number;
66   SequencedWorkerPool::WorkerShutdown shutdown_behavior;
67   tracked_objects::Location posted_from;
68   Closure task;
69 
70   // Non-delayed tasks and delayed tasks are managed together by time-to-run
71   // order. We calculate the time by adding the posted time and the given delay.
72   TimeTicks time_to_run;
73 };
74 
75 struct SequencedTaskLessThan {
76  public:
operator ()base::__anonf5c1db210111::SequencedTaskLessThan77   bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
78     if (lhs.time_to_run < rhs.time_to_run)
79       return true;
80 
81     if (lhs.time_to_run > rhs.time_to_run)
82       return false;
83 
84     // If the time happen to match, then we use the sequence number to decide.
85     return lhs.sequence_task_number < rhs.sequence_task_number;
86   }
87 };
88 
89 // SequencedWorkerPoolTaskRunner ---------------------------------------------
90 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
91 // fixed ShutdownBehavior.
92 //
93 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
94 class SequencedWorkerPoolTaskRunner : public TaskRunner {
95  public:
96   SequencedWorkerPoolTaskRunner(
97       const scoped_refptr<SequencedWorkerPool>& pool,
98       SequencedWorkerPool::WorkerShutdown shutdown_behavior);
99 
100   // TaskRunner implementation
101   virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
102                                const Closure& task,
103                                TimeDelta delay) OVERRIDE;
104   virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
105 
106  private:
107   virtual ~SequencedWorkerPoolTaskRunner();
108 
109   const scoped_refptr<SequencedWorkerPool> pool_;
110 
111   const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
112 
113   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
114 };
115 
SequencedWorkerPoolTaskRunner(const scoped_refptr<SequencedWorkerPool> & pool,SequencedWorkerPool::WorkerShutdown shutdown_behavior)116 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
117     const scoped_refptr<SequencedWorkerPool>& pool,
118     SequencedWorkerPool::WorkerShutdown shutdown_behavior)
119     : pool_(pool),
120       shutdown_behavior_(shutdown_behavior) {
121 }
122 
~SequencedWorkerPoolTaskRunner()123 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
124 }
125 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)126 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
127     const tracked_objects::Location& from_here,
128     const Closure& task,
129     TimeDelta delay) {
130   if (delay == TimeDelta()) {
131     return pool_->PostWorkerTaskWithShutdownBehavior(
132         from_here, task, shutdown_behavior_);
133   }
134   return pool_->PostDelayedWorkerTask(from_here, task, delay);
135 }
136 
RunsTasksOnCurrentThread() const137 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
138   return pool_->RunsTasksOnCurrentThread();
139 }
140 
141 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
142 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
143 // fixed sequence token.
144 //
145 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
146 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
147  public:
148   SequencedWorkerPoolSequencedTaskRunner(
149       const scoped_refptr<SequencedWorkerPool>& pool,
150       SequencedWorkerPool::SequenceToken token,
151       SequencedWorkerPool::WorkerShutdown shutdown_behavior);
152 
153   // TaskRunner implementation
154   virtual bool PostDelayedTask(const tracked_objects::Location& from_here,
155                                const Closure& task,
156                                TimeDelta delay) OVERRIDE;
157   virtual bool RunsTasksOnCurrentThread() const OVERRIDE;
158 
159   // SequencedTaskRunner implementation
160   virtual bool PostNonNestableDelayedTask(
161       const tracked_objects::Location& from_here,
162       const Closure& task,
163       TimeDelta delay) OVERRIDE;
164 
165  private:
166   virtual ~SequencedWorkerPoolSequencedTaskRunner();
167 
168   const scoped_refptr<SequencedWorkerPool> pool_;
169 
170   const SequencedWorkerPool::SequenceToken token_;
171 
172   const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
173 
174   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
175 };
176 
SequencedWorkerPoolSequencedTaskRunner(const scoped_refptr<SequencedWorkerPool> & pool,SequencedWorkerPool::SequenceToken token,SequencedWorkerPool::WorkerShutdown shutdown_behavior)177 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
178     const scoped_refptr<SequencedWorkerPool>& pool,
179     SequencedWorkerPool::SequenceToken token,
180     SequencedWorkerPool::WorkerShutdown shutdown_behavior)
181     : pool_(pool),
182       token_(token),
183       shutdown_behavior_(shutdown_behavior) {
184 }
185 
186 SequencedWorkerPoolSequencedTaskRunner::
~SequencedWorkerPoolSequencedTaskRunner()187 ~SequencedWorkerPoolSequencedTaskRunner() {
188 }
189 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)190 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
191     const tracked_objects::Location& from_here,
192     const Closure& task,
193     TimeDelta delay) {
194   if (delay == TimeDelta()) {
195     return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
196         token_, from_here, task, shutdown_behavior_);
197   }
198   return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
199 }
200 
RunsTasksOnCurrentThread() const201 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
202   return pool_->IsRunningSequenceOnCurrentThread(token_);
203 }
204 
PostNonNestableDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)205 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
206     const tracked_objects::Location& from_here,
207     const Closure& task,
208     TimeDelta delay) {
209   // There's no way to run nested tasks, so simply forward to
210   // PostDelayedTask.
211   return PostDelayedTask(from_here, task, delay);
212 }
213 
214 // Create a process-wide unique ID to represent this task in trace events. This
215 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
216 // with MessageLoop pointers on other processes.
GetTaskTraceID(const SequencedTask & task,void * pool)217 uint64 GetTaskTraceID(const SequencedTask& task,
218                       void* pool) {
219   return (static_cast<uint64>(task.trace_id) << 32) |
220          static_cast<uint64>(reinterpret_cast<intptr_t>(pool));
221 }
222 
223 base::LazyInstance<base::ThreadLocalPointer<
224     SequencedWorkerPool::SequenceToken> >::Leaky g_lazy_tls_ptr =
225         LAZY_INSTANCE_INITIALIZER;
226 
227 }  // namespace
228 
229 // Worker ---------------------------------------------------------------------
230 
231 class SequencedWorkerPool::Worker : public SimpleThread {
232  public:
233   // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
234   // around as long as we are running.
235   Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
236          int thread_number,
237          const std::string& thread_name_prefix);
238   virtual ~Worker();
239 
240   // SimpleThread implementation. This actually runs the background thread.
241   virtual void Run() OVERRIDE;
242 
set_running_task_info(SequenceToken token,WorkerShutdown shutdown_behavior)243   void set_running_task_info(SequenceToken token,
244                              WorkerShutdown shutdown_behavior) {
245     running_sequence_ = token;
246     running_shutdown_behavior_ = shutdown_behavior;
247   }
248 
running_sequence() const249   SequenceToken running_sequence() const {
250     return running_sequence_;
251   }
252 
running_shutdown_behavior() const253   WorkerShutdown running_shutdown_behavior() const {
254     return running_shutdown_behavior_;
255   }
256 
257  private:
258   scoped_refptr<SequencedWorkerPool> worker_pool_;
259   SequenceToken running_sequence_;
260   WorkerShutdown running_shutdown_behavior_;
261 
262   DISALLOW_COPY_AND_ASSIGN(Worker);
263 };
264 
265 // Inner ----------------------------------------------------------------------
266 
267 class SequencedWorkerPool::Inner {
268  public:
269   // Take a raw pointer to |worker| to avoid cycles (since we're owned
270   // by it).
271   Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
272         const std::string& thread_name_prefix,
273         TestingObserver* observer);
274 
275   ~Inner();
276 
277   SequenceToken GetSequenceToken();
278 
279   SequenceToken GetNamedSequenceToken(const std::string& name);
280 
281   // This function accepts a name and an ID. If the name is null, the
282   // token ID is used. This allows us to implement the optional name lookup
283   // from a single function without having to enter the lock a separate time.
284   bool PostTask(const std::string* optional_token_name,
285                 SequenceToken sequence_token,
286                 WorkerShutdown shutdown_behavior,
287                 const tracked_objects::Location& from_here,
288                 const Closure& task,
289                 TimeDelta delay);
290 
291   bool RunsTasksOnCurrentThread() const;
292 
293   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
294 
295   void CleanupForTesting();
296 
297   void SignalHasWorkForTesting();
298 
299   int GetWorkSignalCountForTesting() const;
300 
301   void Shutdown(int max_blocking_tasks_after_shutdown);
302 
303   bool IsShutdownInProgress();
304 
305   // Runs the worker loop on the background thread.
306   void ThreadLoop(Worker* this_worker);
307 
308  private:
309   enum GetWorkStatus {
310     GET_WORK_FOUND,
311     GET_WORK_NOT_FOUND,
312     GET_WORK_WAIT,
313   };
314 
315   enum CleanupState {
316     CLEANUP_REQUESTED,
317     CLEANUP_STARTING,
318     CLEANUP_RUNNING,
319     CLEANUP_FINISHING,
320     CLEANUP_DONE,
321   };
322 
323   // Called from within the lock, this converts the given token name into a
324   // token ID, creating a new one if necessary.
325   int LockedGetNamedTokenID(const std::string& name);
326 
327   // Called from within the lock, this returns the next sequence task number.
328   int64 LockedGetNextSequenceTaskNumber();
329 
330   // Called from within the lock, returns the shutdown behavior of the task
331   // running on the currently executing worker thread. If invoked from a thread
332   // that is not one of the workers, returns CONTINUE_ON_SHUTDOWN.
333   WorkerShutdown LockedCurrentThreadShutdownBehavior() const;
334 
335   // Gets new task. There are 3 cases depending on the return value:
336   //
337   // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
338   //    be run immediately.
339   // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
340   //    and |task| is not filled in. In this case, the caller should wait until
341   //    a task is posted.
342   // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
343   //    immediately, and |task| is not filled in. Likewise, |wait_time| is
344   //    filled in the time to wait until the next task to run. In this case, the
345   //    caller should wait the time.
346   //
347   // In any case, the calling code should clear the given
348   // delete_these_outside_lock vector the next time the lock is released.
349   // See the implementation for a more detailed description.
350   GetWorkStatus GetWork(SequencedTask* task,
351                         TimeDelta* wait_time,
352                         std::vector<Closure>* delete_these_outside_lock);
353 
354   void HandleCleanup();
355 
356   // Peforms init and cleanup around running the given task. WillRun...
357   // returns the value from PrepareToStartAdditionalThreadIfNecessary.
358   // The calling code should call FinishStartingAdditionalThread once the
359   // lock is released if the return values is nonzero.
360   int WillRunWorkerTask(const SequencedTask& task);
361   void DidRunWorkerTask(const SequencedTask& task);
362 
363   // Returns true if there are no threads currently running the given
364   // sequence token.
365   bool IsSequenceTokenRunnable(int sequence_token_id) const;
366 
367   // Checks if all threads are busy and the addition of one more could run an
368   // additional task waiting in the queue. This must be called from within
369   // the lock.
370   //
371   // If another thread is helpful, this will mark the thread as being in the
372   // process of starting and returns the index of the new thread which will be
373   // 0 or more. The caller should then call FinishStartingAdditionalThread to
374   // complete initialization once the lock is released.
375   //
376   // If another thread is not necessary, returne 0;
377   //
378   // See the implementedion for more.
379   int PrepareToStartAdditionalThreadIfHelpful();
380 
381   // The second part of thread creation after
382   // PrepareToStartAdditionalThreadIfHelpful with the thread number it
383   // generated. This actually creates the thread and should be called outside
384   // the lock to avoid blocking important work starting a thread in the lock.
385   void FinishStartingAdditionalThread(int thread_number);
386 
387   // Signal |has_work_| and increment |has_work_signal_count_|.
388   void SignalHasWork();
389 
390   // Checks whether there is work left that's blocking shutdown. Must be
391   // called inside the lock.
392   bool CanShutdown() const;
393 
394   SequencedWorkerPool* const worker_pool_;
395 
396   // The last sequence number used. Managed by GetSequenceToken, since this
397   // only does threadsafe increment operations, you do not need to hold the
398   // lock. This is class-static to make SequenceTokens issued by
399   // GetSequenceToken unique across SequencedWorkerPool instances.
400   static base::StaticAtomicSequenceNumber g_last_sequence_number_;
401 
402   // This lock protects |everything in this class|. Do not read or modify
403   // anything without holding this lock. Do not block while holding this
404   // lock.
405   mutable Lock lock_;
406 
407   // Condition variable that is waited on by worker threads until new
408   // tasks are posted or shutdown starts.
409   ConditionVariable has_work_cv_;
410 
411   // Condition variable that is waited on by non-worker threads (in
412   // Shutdown()) until CanShutdown() goes to true.
413   ConditionVariable can_shutdown_cv_;
414 
415   // The maximum number of worker threads we'll create.
416   const size_t max_threads_;
417 
418   const std::string thread_name_prefix_;
419 
420   // Associates all known sequence token names with their IDs.
421   std::map<std::string, int> named_sequence_tokens_;
422 
423   // Owning pointers to all threads we've created so far, indexed by
424   // ID. Since we lazily create threads, this may be less than
425   // max_threads_ and will be initially empty.
426   typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
427   ThreadMap threads_;
428 
429   // Set to true when we're in the process of creating another thread.
430   // See PrepareToStartAdditionalThreadIfHelpful for more.
431   bool thread_being_created_;
432 
433   // Number of threads currently waiting for work.
434   size_t waiting_thread_count_;
435 
436   // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
437   // or SKIP_ON_SHUTDOWN flag set.
438   size_t blocking_shutdown_thread_count_;
439 
440   // A set of all pending tasks in time-to-run order. These are tasks that are
441   // either waiting for a thread to run on, waiting for their time to run,
442   // or blocked on a previous task in their sequence. We have to iterate over
443   // the tasks by time-to-run order, so we use the set instead of the
444   // traditional priority_queue.
445   typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
446   PendingTaskSet pending_tasks_;
447 
448   // The next sequence number for a new sequenced task.
449   int64 next_sequence_task_number_;
450 
451   // Number of tasks in the pending_tasks_ list that are marked as blocking
452   // shutdown.
453   size_t blocking_shutdown_pending_task_count_;
454 
455   // Lists all sequence tokens currently executing.
456   std::set<int> current_sequences_;
457 
458   // An ID for each posted task to distinguish the task from others in traces.
459   int trace_id_;
460 
461   // Set when Shutdown is called and no further tasks should be
462   // allowed, though we may still be running existing tasks.
463   bool shutdown_called_;
464 
465   // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
466   // has been called.
467   int max_blocking_tasks_after_shutdown_;
468 
469   // State used to cleanup for testing, all guarded by lock_.
470   CleanupState cleanup_state_;
471   size_t cleanup_idlers_;
472   ConditionVariable cleanup_cv_;
473 
474   TestingObserver* const testing_observer_;
475 
476   DISALLOW_COPY_AND_ASSIGN(Inner);
477 };
478 
479 // Worker definitions ---------------------------------------------------------
480 
Worker(const scoped_refptr<SequencedWorkerPool> & worker_pool,int thread_number,const std::string & prefix)481 SequencedWorkerPool::Worker::Worker(
482     const scoped_refptr<SequencedWorkerPool>& worker_pool,
483     int thread_number,
484     const std::string& prefix)
485     : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
486       worker_pool_(worker_pool),
487       running_shutdown_behavior_(CONTINUE_ON_SHUTDOWN) {
488   Start();
489 }
490 
~Worker()491 SequencedWorkerPool::Worker::~Worker() {
492 }
493 
Run()494 void SequencedWorkerPool::Worker::Run() {
495 #if defined(OS_WIN)
496   win::ScopedCOMInitializer com_initializer;
497 #endif
498 
499   // Store a pointer to the running sequence in thread local storage for
500   // static function access.
501   g_lazy_tls_ptr.Get().Set(&running_sequence_);
502 
503   // Just jump back to the Inner object to run the thread, since it has all the
504   // tracking information and queues. It might be more natural to implement
505   // using DelegateSimpleThread and have Inner implement the Delegate to avoid
506   // having these worker objects at all, but that method lacks the ability to
507   // send thread-specific information easily to the thread loop.
508   worker_pool_->inner_->ThreadLoop(this);
509   // Release our cyclic reference once we're done.
510   worker_pool_ = NULL;
511 }
512 
513 // Inner definitions ---------------------------------------------------------
514 
Inner(SequencedWorkerPool * worker_pool,size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)515 SequencedWorkerPool::Inner::Inner(
516     SequencedWorkerPool* worker_pool,
517     size_t max_threads,
518     const std::string& thread_name_prefix,
519     TestingObserver* observer)
520     : worker_pool_(worker_pool),
521       lock_(),
522       has_work_cv_(&lock_),
523       can_shutdown_cv_(&lock_),
524       max_threads_(max_threads),
525       thread_name_prefix_(thread_name_prefix),
526       thread_being_created_(false),
527       waiting_thread_count_(0),
528       blocking_shutdown_thread_count_(0),
529       next_sequence_task_number_(0),
530       blocking_shutdown_pending_task_count_(0),
531       trace_id_(0),
532       shutdown_called_(false),
533       max_blocking_tasks_after_shutdown_(0),
534       cleanup_state_(CLEANUP_DONE),
535       cleanup_idlers_(0),
536       cleanup_cv_(&lock_),
537       testing_observer_(observer) {}
538 
~Inner()539 SequencedWorkerPool::Inner::~Inner() {
540   // You must call Shutdown() before destroying the pool.
541   DCHECK(shutdown_called_);
542 
543   // Need to explicitly join with the threads before they're destroyed or else
544   // they will be running when our object is half torn down.
545   for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
546     it->second->Join();
547   threads_.clear();
548 
549   if (testing_observer_)
550     testing_observer_->OnDestruct();
551 }
552 
553 SequencedWorkerPool::SequenceToken
GetSequenceToken()554 SequencedWorkerPool::Inner::GetSequenceToken() {
555   // Need to add one because StaticAtomicSequenceNumber starts at zero, which
556   // is used as a sentinel value in SequenceTokens.
557   return SequenceToken(g_last_sequence_number_.GetNext() + 1);
558 }
559 
560 SequencedWorkerPool::SequenceToken
GetNamedSequenceToken(const std::string & name)561 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
562   AutoLock lock(lock_);
563   return SequenceToken(LockedGetNamedTokenID(name));
564 }
565 
PostTask(const std::string * optional_token_name,SequenceToken sequence_token,WorkerShutdown shutdown_behavior,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)566 bool SequencedWorkerPool::Inner::PostTask(
567     const std::string* optional_token_name,
568     SequenceToken sequence_token,
569     WorkerShutdown shutdown_behavior,
570     const tracked_objects::Location& from_here,
571     const Closure& task,
572     TimeDelta delay) {
573   DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
574   SequencedTask sequenced(from_here);
575   sequenced.sequence_token_id = sequence_token.id_;
576   sequenced.shutdown_behavior = shutdown_behavior;
577   sequenced.posted_from = from_here;
578   sequenced.task =
579       shutdown_behavior == BLOCK_SHUTDOWN ?
580       base::MakeCriticalClosure(task) : task;
581   sequenced.time_to_run = TimeTicks::Now() + delay;
582 
583   int create_thread_id = 0;
584   {
585     AutoLock lock(lock_);
586     if (shutdown_called_) {
587       if (shutdown_behavior != BLOCK_SHUTDOWN ||
588           LockedCurrentThreadShutdownBehavior() == CONTINUE_ON_SHUTDOWN) {
589         return false;
590       }
591       if (max_blocking_tasks_after_shutdown_ <= 0) {
592         DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
593         return false;
594       }
595       max_blocking_tasks_after_shutdown_ -= 1;
596     }
597 
598     // The trace_id is used for identifying the task in about:tracing.
599     sequenced.trace_id = trace_id_++;
600 
601     TRACE_EVENT_FLOW_BEGIN0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
602         "SequencedWorkerPool::PostTask",
603         TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))));
604 
605     sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
606 
607     // Now that we have the lock, apply the named token rules.
608     if (optional_token_name)
609       sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
610 
611     pending_tasks_.insert(sequenced);
612     if (shutdown_behavior == BLOCK_SHUTDOWN)
613       blocking_shutdown_pending_task_count_++;
614 
615     create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
616   }
617 
618   // Actually start the additional thread or signal an existing one now that
619   // we're outside the lock.
620   if (create_thread_id)
621     FinishStartingAdditionalThread(create_thread_id);
622   else
623     SignalHasWork();
624 
625   return true;
626 }
627 
RunsTasksOnCurrentThread() const628 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
629   AutoLock lock(lock_);
630   return ContainsKey(threads_, PlatformThread::CurrentId());
631 }
632 
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const633 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
634     SequenceToken sequence_token) const {
635   AutoLock lock(lock_);
636   ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
637   if (found == threads_.end())
638     return false;
639   return sequence_token.Equals(found->second->running_sequence());
640 }
641 
642 // See https://code.google.com/p/chromium/issues/detail?id=168415
CleanupForTesting()643 void SequencedWorkerPool::Inner::CleanupForTesting() {
644   DCHECK(!RunsTasksOnCurrentThread());
645   base::ThreadRestrictions::ScopedAllowWait allow_wait;
646   AutoLock lock(lock_);
647   CHECK_EQ(CLEANUP_DONE, cleanup_state_);
648   if (shutdown_called_)
649     return;
650   if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
651     return;
652   cleanup_state_ = CLEANUP_REQUESTED;
653   cleanup_idlers_ = 0;
654   has_work_cv_.Signal();
655   while (cleanup_state_ != CLEANUP_DONE)
656     cleanup_cv_.Wait();
657 }
658 
SignalHasWorkForTesting()659 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
660   SignalHasWork();
661 }
662 
Shutdown(int max_new_blocking_tasks_after_shutdown)663 void SequencedWorkerPool::Inner::Shutdown(
664     int max_new_blocking_tasks_after_shutdown) {
665   DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
666   {
667     AutoLock lock(lock_);
668     // Cleanup and Shutdown should not be called concurrently.
669     CHECK_EQ(CLEANUP_DONE, cleanup_state_);
670     if (shutdown_called_)
671       return;
672     shutdown_called_ = true;
673     max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
674 
675     // Tickle the threads. This will wake up a waiting one so it will know that
676     // it can exit, which in turn will wake up any other waiting ones.
677     SignalHasWork();
678 
679     // There are no pending or running tasks blocking shutdown, we're done.
680     if (CanShutdown())
681       return;
682   }
683 
684   // If we're here, then something is blocking shutdown.  So wait for
685   // CanShutdown() to go to true.
686 
687   if (testing_observer_)
688     testing_observer_->WillWaitForShutdown();
689 
690 #if !defined(OS_NACL)
691   TimeTicks shutdown_wait_begin = TimeTicks::Now();
692 #endif
693 
694   {
695     base::ThreadRestrictions::ScopedAllowWait allow_wait;
696     AutoLock lock(lock_);
697     while (!CanShutdown())
698       can_shutdown_cv_.Wait();
699   }
700 #if !defined(OS_NACL)
701   UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
702                       TimeTicks::Now() - shutdown_wait_begin);
703 #endif
704 }
705 
IsShutdownInProgress()706 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
707     AutoLock lock(lock_);
708     return shutdown_called_;
709 }
710 
ThreadLoop(Worker * this_worker)711 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
712   {
713     AutoLock lock(lock_);
714     DCHECK(thread_being_created_);
715     thread_being_created_ = false;
716     std::pair<ThreadMap::iterator, bool> result =
717         threads_.insert(
718             std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
719     DCHECK(result.second);
720 
721     while (true) {
722 #if defined(OS_MACOSX)
723       base::mac::ScopedNSAutoreleasePool autorelease_pool;
724 #endif
725 
726       HandleCleanup();
727 
728       // See GetWork for what delete_these_outside_lock is doing.
729       SequencedTask task;
730       TimeDelta wait_time;
731       std::vector<Closure> delete_these_outside_lock;
732       GetWorkStatus status =
733           GetWork(&task, &wait_time, &delete_these_outside_lock);
734       if (status == GET_WORK_FOUND) {
735         TRACE_EVENT_FLOW_END0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
736             "SequencedWorkerPool::PostTask",
737             TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))));
738         TRACE_EVENT2("toplevel", "SequencedWorkerPool::ThreadLoop",
739                      "src_file", task.posted_from.file_name(),
740                      "src_func", task.posted_from.function_name());
741         int new_thread_id = WillRunWorkerTask(task);
742         {
743           AutoUnlock unlock(lock_);
744           // There may be more work available, so wake up another
745           // worker thread. (Technically not required, since we
746           // already get a signal for each new task, but it doesn't
747           // hurt.)
748           SignalHasWork();
749           delete_these_outside_lock.clear();
750 
751           // Complete thread creation outside the lock if necessary.
752           if (new_thread_id)
753             FinishStartingAdditionalThread(new_thread_id);
754 
755           this_worker->set_running_task_info(
756               SequenceToken(task.sequence_token_id), task.shutdown_behavior);
757 
758           tracked_objects::TrackedTime start_time =
759               tracked_objects::ThreadData::NowForStartOfRun(task.birth_tally);
760 
761           task.task.Run();
762 
763           tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(task,
764               start_time, tracked_objects::ThreadData::NowForEndOfRun());
765 
766           // Make sure our task is erased outside the lock for the
767           // same reason we do this with delete_these_oustide_lock.
768           // Also, do it before calling set_running_task_info() so
769           // that sequence-checking from within the task's destructor
770           // still works.
771           task.task = Closure();
772 
773           this_worker->set_running_task_info(
774               SequenceToken(), CONTINUE_ON_SHUTDOWN);
775         }
776         DidRunWorkerTask(task);  // Must be done inside the lock.
777       } else if (cleanup_state_ == CLEANUP_RUNNING) {
778         switch (status) {
779           case GET_WORK_WAIT: {
780               AutoUnlock unlock(lock_);
781               delete_these_outside_lock.clear();
782             }
783             break;
784           case GET_WORK_NOT_FOUND:
785             CHECK(delete_these_outside_lock.empty());
786             cleanup_state_ = CLEANUP_FINISHING;
787             cleanup_cv_.Broadcast();
788             break;
789           default:
790             NOTREACHED();
791         }
792       } else {
793         // When we're terminating and there's no more work, we can
794         // shut down, other workers can complete any pending or new tasks.
795         // We can get additional tasks posted after shutdown_called_ is set
796         // but only worker threads are allowed to post tasks at that time, and
797         // the workers responsible for posting those tasks will be available
798         // to run them. Also, there may be some tasks stuck behind running
799         // ones with the same sequence token, but additional threads won't
800         // help this case.
801         if (shutdown_called_ &&
802             blocking_shutdown_pending_task_count_ == 0)
803           break;
804         waiting_thread_count_++;
805 
806         switch (status) {
807           case GET_WORK_NOT_FOUND:
808             has_work_cv_.Wait();
809             break;
810           case GET_WORK_WAIT:
811             has_work_cv_.TimedWait(wait_time);
812             break;
813           default:
814             NOTREACHED();
815         }
816         waiting_thread_count_--;
817       }
818     }
819   }  // Release lock_.
820 
821   // We noticed we should exit. Wake up the next worker so it knows it should
822   // exit as well (because the Shutdown() code only signals once).
823   SignalHasWork();
824 
825   // Possibly unblock shutdown.
826   can_shutdown_cv_.Signal();
827 }
828 
HandleCleanup()829 void SequencedWorkerPool::Inner::HandleCleanup() {
830   lock_.AssertAcquired();
831   if (cleanup_state_ == CLEANUP_DONE)
832     return;
833   if (cleanup_state_ == CLEANUP_REQUESTED) {
834     // We win, we get to do the cleanup as soon as the others wise up and idle.
835     cleanup_state_ = CLEANUP_STARTING;
836     while (thread_being_created_ ||
837            cleanup_idlers_ != threads_.size() - 1) {
838       has_work_cv_.Signal();
839       cleanup_cv_.Wait();
840     }
841     cleanup_state_ = CLEANUP_RUNNING;
842     return;
843   }
844   if (cleanup_state_ == CLEANUP_STARTING) {
845     // Another worker thread is cleaning up, we idle here until thats done.
846     ++cleanup_idlers_;
847     cleanup_cv_.Broadcast();
848     while (cleanup_state_ != CLEANUP_FINISHING) {
849       cleanup_cv_.Wait();
850     }
851     --cleanup_idlers_;
852     cleanup_cv_.Broadcast();
853     return;
854   }
855   if (cleanup_state_ == CLEANUP_FINISHING) {
856     // We wait for all idlers to wake up prior to being DONE.
857     while (cleanup_idlers_ != 0) {
858       cleanup_cv_.Broadcast();
859       cleanup_cv_.Wait();
860     }
861     if (cleanup_state_ == CLEANUP_FINISHING) {
862       cleanup_state_ = CLEANUP_DONE;
863       cleanup_cv_.Signal();
864     }
865     return;
866   }
867 }
868 
LockedGetNamedTokenID(const std::string & name)869 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
870     const std::string& name) {
871   lock_.AssertAcquired();
872   DCHECK(!name.empty());
873 
874   std::map<std::string, int>::const_iterator found =
875       named_sequence_tokens_.find(name);
876   if (found != named_sequence_tokens_.end())
877     return found->second;  // Got an existing one.
878 
879   // Create a new one for this name.
880   SequenceToken result = GetSequenceToken();
881   named_sequence_tokens_.insert(std::make_pair(name, result.id_));
882   return result.id_;
883 }
884 
LockedGetNextSequenceTaskNumber()885 int64 SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
886   lock_.AssertAcquired();
887   // We assume that we never create enough tasks to wrap around.
888   return next_sequence_task_number_++;
889 }
890 
891 SequencedWorkerPool::WorkerShutdown
LockedCurrentThreadShutdownBehavior() const892 SequencedWorkerPool::Inner::LockedCurrentThreadShutdownBehavior() const {
893   lock_.AssertAcquired();
894   ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
895   if (found == threads_.end())
896     return CONTINUE_ON_SHUTDOWN;
897   return found->second->running_shutdown_behavior();
898 }
899 
GetWork(SequencedTask * task,TimeDelta * wait_time,std::vector<Closure> * delete_these_outside_lock)900 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
901     SequencedTask* task,
902     TimeDelta* wait_time,
903     std::vector<Closure>* delete_these_outside_lock) {
904   lock_.AssertAcquired();
905 
906 #if !defined(OS_NACL)
907   UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.TaskCount",
908                            static_cast<int>(pending_tasks_.size()));
909 #endif
910 
911   // Find the next task with a sequence token that's not currently in use.
912   // If the token is in use, that means another thread is running something
913   // in that sequence, and we can't run it without going out-of-order.
914   //
915   // This algorithm is simple and fair, but inefficient in some cases. For
916   // example, say somebody schedules 1000 slow tasks with the same sequence
917   // number. We'll have to go through all those tasks each time we feel like
918   // there might be work to schedule. If this proves to be a problem, we
919   // should make this more efficient.
920   //
921   // One possible enhancement would be to keep a map from sequence ID to a
922   // list of pending but currently blocked SequencedTasks for that ID.
923   // When a worker finishes a task of one sequence token, it can pick up the
924   // next one from that token right away.
925   //
926   // This may lead to starvation if there are sufficient numbers of sequences
927   // in use. To alleviate this, we could add an incrementing priority counter
928   // to each SequencedTask. Then maintain a priority_queue of all runnable
929   // tasks, sorted by priority counter. When a sequenced task is completed
930   // we would pop the head element off of that tasks pending list and add it
931   // to the priority queue. Then we would run the first item in the priority
932   // queue.
933 
934   GetWorkStatus status = GET_WORK_NOT_FOUND;
935   int unrunnable_tasks = 0;
936   PendingTaskSet::iterator i = pending_tasks_.begin();
937   // We assume that the loop below doesn't take too long and so we can just do
938   // a single call to TimeTicks::Now().
939   const TimeTicks current_time = TimeTicks::Now();
940   while (i != pending_tasks_.end()) {
941     if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
942       unrunnable_tasks++;
943       ++i;
944       continue;
945     }
946 
947     if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
948       // We're shutting down and the task we just found isn't blocking
949       // shutdown. Delete it and get more work.
950       //
951       // Note that we do not want to delete unrunnable tasks. Deleting a task
952       // can have side effects (like freeing some objects) and deleting a
953       // task that's supposed to run after one that's currently running could
954       // cause an obscure crash.
955       //
956       // We really want to delete these tasks outside the lock in case the
957       // closures are holding refs to objects that want to post work from
958       // their destructorss (which would deadlock). The closures are
959       // internally refcounted, so we just need to keep a copy of them alive
960       // until the lock is exited. The calling code can just clear() the
961       // vector they passed to us once the lock is exited to make this
962       // happen.
963       delete_these_outside_lock->push_back(i->task);
964       pending_tasks_.erase(i++);
965       continue;
966     }
967 
968     if (i->time_to_run > current_time) {
969       // The time to run has not come yet.
970       *wait_time = i->time_to_run - current_time;
971       status = GET_WORK_WAIT;
972       if (cleanup_state_ == CLEANUP_RUNNING) {
973         // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
974         delete_these_outside_lock->push_back(i->task);
975         pending_tasks_.erase(i);
976       }
977       break;
978     }
979 
980     // Found a runnable task.
981     *task = *i;
982     pending_tasks_.erase(i);
983     if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
984       blocking_shutdown_pending_task_count_--;
985     }
986 
987     status = GET_WORK_FOUND;
988     break;
989   }
990 
991   // Track the number of tasks we had to skip over to see if we should be
992   // making this more efficient. If this number ever becomes large or is
993   // frequently "some", we should consider the optimization above.
994 #if !defined(OS_NACL)
995   UMA_HISTOGRAM_COUNTS_100("SequencedWorkerPool.UnrunnableTaskCount",
996                            unrunnable_tasks);
997 #endif
998   return status;
999 }
1000 
WillRunWorkerTask(const SequencedTask & task)1001 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1002   lock_.AssertAcquired();
1003 
1004   // Mark the task's sequence number as in use.
1005   if (task.sequence_token_id)
1006     current_sequences_.insert(task.sequence_token_id);
1007 
1008   // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1009   // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1010   // completes.
1011   if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1012     blocking_shutdown_thread_count_++;
1013 
1014   // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1015   // creates a new thread if there is no free one, there is a race when posting
1016   // tasks that many tasks could have been posted before a thread started
1017   // running them, so only one thread would have been created. So we also check
1018   // whether we should create more threads after removing our task from the
1019   // queue, which also has the nice side effect of creating the workers from
1020   // background threads rather than the main thread of the app.
1021   //
1022   // If another thread wasn't created, we want to wake up an existing thread
1023   // if there is one waiting to pick up the next task.
1024   //
1025   // Note that we really need to do this *before* running the task, not
1026   // after. Otherwise, if more than one task is posted, the creation of the
1027   // second thread (since we only create one at a time) will be blocked by
1028   // the execution of the first task, which could be arbitrarily long.
1029   return PrepareToStartAdditionalThreadIfHelpful();
1030 }
1031 
DidRunWorkerTask(const SequencedTask & task)1032 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1033   lock_.AssertAcquired();
1034 
1035   if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1036     DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1037     blocking_shutdown_thread_count_--;
1038   }
1039 
1040   if (task.sequence_token_id)
1041     current_sequences_.erase(task.sequence_token_id);
1042 }
1043 
IsSequenceTokenRunnable(int sequence_token_id) const1044 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1045     int sequence_token_id) const {
1046   lock_.AssertAcquired();
1047   return !sequence_token_id ||
1048       current_sequences_.find(sequence_token_id) ==
1049           current_sequences_.end();
1050 }
1051 
PrepareToStartAdditionalThreadIfHelpful()1052 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1053   lock_.AssertAcquired();
1054   // How thread creation works:
1055   //
1056   // We'de like to avoid creating threads with the lock held. However, we
1057   // need to be sure that we have an accurate accounting of the threads for
1058   // proper Joining and deltion on shutdown.
1059   //
1060   // We need to figure out if we need another thread with the lock held, which
1061   // is what this function does. It then marks us as in the process of creating
1062   // a thread. When we do shutdown, we wait until the thread_being_created_
1063   // flag is cleared, which ensures that the new thread is properly added to
1064   // all the data structures and we can't leak it. Once shutdown starts, we'll
1065   // refuse to create more threads or they would be leaked.
1066   //
1067   // Note that this creates a mostly benign race condition on shutdown that
1068   // will cause fewer workers to be created than one would expect. It isn't
1069   // much of an issue in real life, but affects some tests. Since we only spawn
1070   // one worker at a time, the following sequence of events can happen:
1071   //
1072   //  1. Main thread posts a bunch of unrelated tasks that would normally be
1073   //     run on separate threads.
1074   //  2. The first task post causes us to start a worker. Other tasks do not
1075   //     cause a worker to start since one is pending.
1076   //  3. Main thread initiates shutdown.
1077   //  4. No more threads are created since the shutdown_called_ flag is set.
1078   //
1079   // The result is that one may expect that max_threads_ workers to be created
1080   // given the workload, but in reality fewer may be created because the
1081   // sequence of thread creation on the background threads is racing with the
1082   // shutdown call.
1083   if (!shutdown_called_ &&
1084       !thread_being_created_ &&
1085       cleanup_state_ == CLEANUP_DONE &&
1086       threads_.size() < max_threads_ &&
1087       waiting_thread_count_ == 0) {
1088     // We could use an additional thread if there's work to be done.
1089     for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1090          i != pending_tasks_.end(); ++i) {
1091       if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1092         // Found a runnable task, mark the thread as being started.
1093         thread_being_created_ = true;
1094         return static_cast<int>(threads_.size() + 1);
1095       }
1096     }
1097   }
1098   return 0;
1099 }
1100 
FinishStartingAdditionalThread(int thread_number)1101 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1102     int thread_number) {
1103   // Called outside of the lock.
1104   DCHECK(thread_number > 0);
1105 
1106   // The worker is assigned to the list when the thread actually starts, which
1107   // will manage the memory of the pointer.
1108   new Worker(worker_pool_, thread_number, thread_name_prefix_);
1109 }
1110 
SignalHasWork()1111 void SequencedWorkerPool::Inner::SignalHasWork() {
1112   has_work_cv_.Signal();
1113   if (testing_observer_) {
1114     testing_observer_->OnHasWork();
1115   }
1116 }
1117 
CanShutdown() const1118 bool SequencedWorkerPool::Inner::CanShutdown() const {
1119   lock_.AssertAcquired();
1120   // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1121   return !thread_being_created_ &&
1122          blocking_shutdown_thread_count_ == 0 &&
1123          blocking_shutdown_pending_task_count_ == 0;
1124 }
1125 
1126 base::StaticAtomicSequenceNumber
1127 SequencedWorkerPool::Inner::g_last_sequence_number_;
1128 
1129 // SequencedWorkerPool --------------------------------------------------------
1130 
1131 // static
1132 SequencedWorkerPool::SequenceToken
GetSequenceTokenForCurrentThread()1133 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1134   // Don't construct lazy instance on check.
1135   if (g_lazy_tls_ptr == NULL)
1136     return SequenceToken();
1137 
1138   SequencedWorkerPool::SequenceToken* token = g_lazy_tls_ptr.Get().Get();
1139   if (!token)
1140     return SequenceToken();
1141   return *token;
1142 }
1143 
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix)1144 SequencedWorkerPool::SequencedWorkerPool(
1145     size_t max_threads,
1146     const std::string& thread_name_prefix)
1147     : constructor_message_loop_(MessageLoopProxy::current()),
1148       inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1149 }
1150 
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)1151 SequencedWorkerPool::SequencedWorkerPool(
1152     size_t max_threads,
1153     const std::string& thread_name_prefix,
1154     TestingObserver* observer)
1155     : constructor_message_loop_(MessageLoopProxy::current()),
1156       inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1157 }
1158 
~SequencedWorkerPool()1159 SequencedWorkerPool::~SequencedWorkerPool() {}
1160 
OnDestruct() const1161 void SequencedWorkerPool::OnDestruct() const {
1162   DCHECK(constructor_message_loop_.get());
1163   // Avoid deleting ourselves on a worker thread (which would
1164   // deadlock).
1165   if (RunsTasksOnCurrentThread()) {
1166     constructor_message_loop_->DeleteSoon(FROM_HERE, this);
1167   } else {
1168     delete this;
1169   }
1170 }
1171 
GetSequenceToken()1172 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1173   return inner_->GetSequenceToken();
1174 }
1175 
GetNamedSequenceToken(const std::string & name)1176 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1177     const std::string& name) {
1178   return inner_->GetNamedSequenceToken(name);
1179 }
1180 
GetSequencedTaskRunner(SequenceToken token)1181 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1182     SequenceToken token) {
1183   return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1184 }
1185 
1186 scoped_refptr<SequencedTaskRunner>
GetSequencedTaskRunnerWithShutdownBehavior(SequenceToken token,WorkerShutdown shutdown_behavior)1187 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1188     SequenceToken token, WorkerShutdown shutdown_behavior) {
1189   return new SequencedWorkerPoolSequencedTaskRunner(
1190       this, token, shutdown_behavior);
1191 }
1192 
1193 scoped_refptr<TaskRunner>
GetTaskRunnerWithShutdownBehavior(WorkerShutdown shutdown_behavior)1194 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1195     WorkerShutdown shutdown_behavior) {
1196   return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1197 }
1198 
PostWorkerTask(const tracked_objects::Location & from_here,const Closure & task)1199 bool SequencedWorkerPool::PostWorkerTask(
1200     const tracked_objects::Location& from_here,
1201     const Closure& task) {
1202   return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1203                           from_here, task, TimeDelta());
1204 }
1205 
PostDelayedWorkerTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1206 bool SequencedWorkerPool::PostDelayedWorkerTask(
1207     const tracked_objects::Location& from_here,
1208     const Closure& task,
1209     TimeDelta delay) {
1210   WorkerShutdown shutdown_behavior =
1211       delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1212   return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1213                           from_here, task, delay);
1214 }
1215 
PostWorkerTaskWithShutdownBehavior(const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1216 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1217     const tracked_objects::Location& from_here,
1218     const Closure& task,
1219     WorkerShutdown shutdown_behavior) {
1220   return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1221                           from_here, task, TimeDelta());
1222 }
1223 
PostSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task)1224 bool SequencedWorkerPool::PostSequencedWorkerTask(
1225     SequenceToken sequence_token,
1226     const tracked_objects::Location& from_here,
1227     const Closure& task) {
1228   return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1229                           from_here, task, TimeDelta());
1230 }
1231 
PostDelayedSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1232 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1233     SequenceToken sequence_token,
1234     const tracked_objects::Location& from_here,
1235     const Closure& task,
1236     TimeDelta delay) {
1237   WorkerShutdown shutdown_behavior =
1238       delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1239   return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1240                           from_here, task, delay);
1241 }
1242 
PostNamedSequencedWorkerTask(const std::string & token_name,const tracked_objects::Location & from_here,const Closure & task)1243 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1244     const std::string& token_name,
1245     const tracked_objects::Location& from_here,
1246     const Closure& task) {
1247   DCHECK(!token_name.empty());
1248   return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1249                           from_here, task, TimeDelta());
1250 }
1251 
PostSequencedWorkerTaskWithShutdownBehavior(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1252 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1253     SequenceToken sequence_token,
1254     const tracked_objects::Location& from_here,
1255     const Closure& task,
1256     WorkerShutdown shutdown_behavior) {
1257   return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1258                           from_here, task, TimeDelta());
1259 }
1260 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1261 bool SequencedWorkerPool::PostDelayedTask(
1262     const tracked_objects::Location& from_here,
1263     const Closure& task,
1264     TimeDelta delay) {
1265   return PostDelayedWorkerTask(from_here, task, delay);
1266 }
1267 
RunsTasksOnCurrentThread() const1268 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1269   return inner_->RunsTasksOnCurrentThread();
1270 }
1271 
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const1272 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1273     SequenceToken sequence_token) const {
1274   return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1275 }
1276 
FlushForTesting()1277 void SequencedWorkerPool::FlushForTesting() {
1278   inner_->CleanupForTesting();
1279 }
1280 
SignalHasWorkForTesting()1281 void SequencedWorkerPool::SignalHasWorkForTesting() {
1282   inner_->SignalHasWorkForTesting();
1283 }
1284 
Shutdown(int max_new_blocking_tasks_after_shutdown)1285 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1286   DCHECK(constructor_message_loop_->BelongsToCurrentThread());
1287   inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1288 }
1289 
IsShutdownInProgress()1290 bool SequencedWorkerPool::IsShutdownInProgress() {
1291   return inner_->IsShutdownInProgress();
1292 }
1293 
1294 }  // namespace base
1295