1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef ART_RUNTIME_THREAD_POOL_H_ 18 #define ART_RUNTIME_THREAD_POOL_H_ 19 20 #include <deque> 21 #include <vector> 22 23 #include "barrier.h" 24 #include "base/mutex.h" 25 #include "closure.h" 26 #include "locks.h" 27 28 namespace art { 29 30 class ThreadPool; 31 32 class Task : public Closure { 33 public: 34 // Called when references reaches 0. Finalize()35 virtual void Finalize() { } 36 }; 37 38 class ThreadPoolWorker { 39 public: 40 static const size_t kDefaultStackSize = 1 * MB; 41 GetStackSize()42 size_t GetStackSize() const { 43 return stack_size_; 44 } 45 46 virtual ~ThreadPoolWorker(); 47 48 protected: 49 ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 50 static void* Callback(void* arg) LOCKS_EXCLUDED(Locks::mutator_lock_); 51 virtual void Run(); 52 53 ThreadPool* const thread_pool_; 54 const std::string name_; 55 const size_t stack_size_; 56 pthread_t pthread_; 57 58 private: 59 friend class ThreadPool; 60 DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker); 61 }; 62 63 class ThreadPool { 64 public: 65 // Returns the number of threads in the thread pool. GetThreadCount()66 size_t GetThreadCount() const { 67 return threads_.size(); 68 } 69 70 // Broadcast to the workers and tell them to empty out the work queue. 71 void StartWorkers(Thread* self); 72 73 // Do not allow workers to grab any new tasks. 74 void StopWorkers(Thread* self); 75 76 // Add a new task, the first available started worker will process it. Does not delete the task 77 // after running it, it is the caller's responsibility. 78 void AddTask(Thread* self, Task* task); 79 80 explicit ThreadPool(size_t num_threads); 81 virtual ~ThreadPool(); 82 83 // Wait for all tasks currently on queue to get completed. 84 void Wait(Thread* self, bool do_work, bool may_hold_locks); 85 86 size_t GetTaskCount(Thread* self); 87 88 // Returns the total amount of workers waited for tasks. GetWaitTime()89 uint64_t GetWaitTime() const { 90 return total_wait_time_; 91 } 92 93 // Provides a way to bound the maximum number of worker threads, threads must be less the the 94 // thread count of the thread pool. 95 void SetMaxActiveWorkers(size_t threads); 96 97 protected: 98 // Get a task to run, blocks if there are no tasks left 99 virtual Task* GetTask(Thread* self); 100 101 // Try to get a task, returning NULL if there is none available. 102 Task* TryGetTask(Thread* self); 103 Task* TryGetTaskLocked(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_); 104 105 // Are we shutting down? IsShuttingDown()106 bool IsShuttingDown() const EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_) { 107 return shutting_down_; 108 } 109 110 Mutex task_queue_lock_; 111 ConditionVariable task_queue_condition_ GUARDED_BY(task_queue_lock_); 112 ConditionVariable completion_condition_ GUARDED_BY(task_queue_lock_); 113 volatile bool started_ GUARDED_BY(task_queue_lock_); 114 volatile bool shutting_down_ GUARDED_BY(task_queue_lock_); 115 // How many worker threads are waiting on the condition. 116 volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_); 117 std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_); 118 // TODO: make this immutable/const? 119 std::vector<ThreadPoolWorker*> threads_; 120 // Work balance detection. 121 uint64_t start_time_ GUARDED_BY(task_queue_lock_); 122 uint64_t total_wait_time_; 123 Barrier creation_barier_; 124 size_t max_active_workers_ GUARDED_BY(task_queue_lock_); 125 126 private: 127 friend class ThreadPoolWorker; 128 friend class WorkStealingWorker; 129 DISALLOW_COPY_AND_ASSIGN(ThreadPool); 130 }; 131 132 class WorkStealingTask : public Task { 133 public: WorkStealingTask()134 WorkStealingTask() : ref_count_(0) {} 135 GetRefCount()136 size_t GetRefCount() const { 137 return ref_count_; 138 } 139 140 virtual void StealFrom(Thread* self, WorkStealingTask* source) = 0; 141 142 private: 143 // How many people are referencing this task. 144 size_t ref_count_; 145 146 friend class WorkStealingWorker; 147 }; 148 149 class WorkStealingWorker : public ThreadPoolWorker { 150 public: 151 virtual ~WorkStealingWorker(); 152 IsRunningTask()153 bool IsRunningTask() const { 154 return task_ != NULL; 155 } 156 157 protected: 158 WorkStealingTask* task_; 159 160 WorkStealingWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 161 virtual void Run(); 162 163 private: 164 friend class WorkStealingThreadPool; 165 DISALLOW_COPY_AND_ASSIGN(WorkStealingWorker); 166 }; 167 168 class WorkStealingThreadPool : public ThreadPool { 169 public: 170 explicit WorkStealingThreadPool(size_t num_threads); 171 virtual ~WorkStealingThreadPool(); 172 173 private: 174 Mutex work_steal_lock_; 175 // Which thread we are stealing from (round robin). 176 size_t steal_index_; 177 178 // Find a task to steal from 179 WorkStealingTask* FindTaskToStealFrom(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(work_steal_lock_); 180 181 friend class WorkStealingWorker; 182 }; 183 184 } // namespace art 185 186 #endif // ART_RUNTIME_THREAD_POOL_H_ 187