• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef ART_RUNTIME_THREAD_POOL_H_
18 #define ART_RUNTIME_THREAD_POOL_H_
19 
20 #include <deque>
21 #include <vector>
22 
23 #include "barrier.h"
24 #include "base/mutex.h"
25 #include "mem_map.h"
26 
27 namespace art {
28 
29 class ThreadPool;
30 
31 class Closure {
32  public:
~Closure()33   virtual ~Closure() { }
34   virtual void Run(Thread* self) = 0;
35 };
36 
37 class Task : public Closure {
38  public:
39   // Called after Closure::Run has been called.
Finalize()40   virtual void Finalize() { }
41 };
42 
43 class SelfDeletingTask : public Task {
44  public:
~SelfDeletingTask()45   virtual ~SelfDeletingTask() { }
Finalize()46   virtual void Finalize() {
47     delete this;
48   }
49 };
50 
51 class ThreadPoolWorker {
52  public:
53   static const size_t kDefaultStackSize = 1 * MB;
54 
GetStackSize()55   size_t GetStackSize() const {
56     DCHECK(stack_.get() != nullptr);
57     return stack_->Size();
58   }
59 
60   virtual ~ThreadPoolWorker();
61 
62   // Set the "nice" priorty for this worker.
63   void SetPthreadPriority(int priority);
64 
65  protected:
66   ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size);
67   static void* Callback(void* arg) REQUIRES(!Locks::mutator_lock_);
68   virtual void Run();
69 
70   ThreadPool* const thread_pool_;
71   const std::string name_;
72   std::unique_ptr<MemMap> stack_;
73   pthread_t pthread_;
74 
75  private:
76   friend class ThreadPool;
77   DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker);
78 };
79 
80 class ThreadPool {
81  public:
82   // Returns the number of threads in the thread pool.
GetThreadCount()83   size_t GetThreadCount() const {
84     return threads_.size();
85   }
86 
87   // Broadcast to the workers and tell them to empty out the work queue.
88   void StartWorkers(Thread* self) REQUIRES(!task_queue_lock_);
89 
90   // Do not allow workers to grab any new tasks.
91   void StopWorkers(Thread* self) REQUIRES(!task_queue_lock_);
92 
93   // Add a new task, the first available started worker will process it. Does not delete the task
94   // after running it, it is the caller's responsibility.
95   void AddTask(Thread* self, Task* task) REQUIRES(!task_queue_lock_);
96 
97   // Remove all tasks in the queue.
98   void RemoveAllTasks(Thread* self) REQUIRES(!task_queue_lock_);
99 
100   ThreadPool(const char* name, size_t num_threads);
101   virtual ~ThreadPool();
102 
103   // Wait for all tasks currently on queue to get completed.
104   void Wait(Thread* self, bool do_work, bool may_hold_locks) REQUIRES(!task_queue_lock_);
105 
106   size_t GetTaskCount(Thread* self) REQUIRES(!task_queue_lock_);
107 
108   // Returns the total amount of workers waited for tasks.
GetWaitTime()109   uint64_t GetWaitTime() const {
110     return total_wait_time_;
111   }
112 
113   // Provides a way to bound the maximum number of worker threads, threads must be less the the
114   // thread count of the thread pool.
115   void SetMaxActiveWorkers(size_t threads) REQUIRES(!task_queue_lock_);
116 
117   // Set the "nice" priorty for threads in the pool.
118   void SetPthreadPriority(int priority);
119 
120  protected:
121   // get a task to run, blocks if there are no tasks left
122   virtual Task* GetTask(Thread* self) REQUIRES(!task_queue_lock_);
123 
124   // Try to get a task, returning null if there is none available.
125   Task* TryGetTask(Thread* self) REQUIRES(!task_queue_lock_);
126   Task* TryGetTaskLocked() REQUIRES(task_queue_lock_);
127 
128   // Are we shutting down?
IsShuttingDown()129   bool IsShuttingDown() const REQUIRES(task_queue_lock_) {
130     return shutting_down_;
131   }
132 
133   const std::string name_;
134   Mutex task_queue_lock_;
135   ConditionVariable task_queue_condition_ GUARDED_BY(task_queue_lock_);
136   ConditionVariable completion_condition_ GUARDED_BY(task_queue_lock_);
137   volatile bool started_ GUARDED_BY(task_queue_lock_);
138   volatile bool shutting_down_ GUARDED_BY(task_queue_lock_);
139   // How many worker threads are waiting on the condition.
140   volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_);
141   std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_);
142   // TODO: make this immutable/const?
143   std::vector<ThreadPoolWorker*> threads_;
144   // Work balance detection.
145   uint64_t start_time_ GUARDED_BY(task_queue_lock_);
146   uint64_t total_wait_time_;
147   Barrier creation_barier_;
148   size_t max_active_workers_ GUARDED_BY(task_queue_lock_);
149 
150  private:
151   friend class ThreadPoolWorker;
152   friend class WorkStealingWorker;
153   DISALLOW_COPY_AND_ASSIGN(ThreadPool);
154 };
155 
156 }  // namespace art
157 
158 #endif  // ART_RUNTIME_THREAD_POOL_H_
159